Create Iniitial System Metrics Exporter w/Docker Metrics

This commit is contained in:
2023-09-11 16:20:45 -03:00
parent 1c273270cb
commit a787bb36ee
11 changed files with 2341 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
.idea/

1833
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

22
Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "server_metrics"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = "0.6.7"
clap = { version = "4.4.2", features = ["derive"] }
http = "0.2.8"
http-body = "0.4.5"
hyper = { version = "0.14.24", features = ["full"] }
prometheus = "0.13.3"
rust_decimal = { version = "1.29.1", features = ["serde", "db-postgres"] }
rust_decimal_macros = "1.27"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91"
thiserror = "1.0.38"
tokio = { version = "1.28.2", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.3.5", features = ["full"] }

42
src/docker/collector.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::sync::Arc;
use crate::{docker::DockerStats, error::SystemExporterResult, metrics::Metrics};
pub struct DockerCollector {
metrics: Arc<Metrics>,
}
impl DockerCollector {
pub fn new(metrics: Arc<Metrics>) -> Self {
Self { metrics }
}
pub fn poll(&self) -> SystemExporterResult<Vec<DockerStats>> {
let output = std::process::Command::new("sh")
.arg("-c")
.arg(
r#"docker stats --format='{
"BlockIO": "{{.BlockIO}}",
"CPUPerc": "{{.CPUPerc}}",
"Container": "{{.Container}}",
"ID": "{{.ID}}",
"MemPerc": "{{.MemPerc}}",
"MemUsage": "{{.MemUsage}}",
"Name": "{{.Name}}",
"NetIO": "{{.NetIO}}",
"PIDs": "{{.PIDs}}"
}' --no-stream | jq -s '.'"#,
)
.output()
.expect("Failed to execute command");
let output_str = String::from_utf8(output.stdout).expect("Not UTF-8");
let res = serde_json::from_str::<Vec<DockerStats>>(&output_str)?;
res.iter()
.for_each(|e| e.post_metrics(&self.metrics.docker));
Ok(res)
}
}

76
src/docker/metrics.rs Normal file
View File

@@ -0,0 +1,76 @@
use crate::error::SystemExporterResult;
use prometheus::{opts, GaugeVec, Registry};
pub struct DockerMetrics {
pub container_cpu_perc: GaugeVec,
pub container_mem_percentage: GaugeVec,
pub container_mem_used: GaugeVec,
pub container_mem_available: GaugeVec,
pub container_block_io_read: GaugeVec,
pub container_block_io_write: GaugeVec,
pub container_net_io_received: GaugeVec,
pub container_net_io_transmitted: GaugeVec,
}
impl DockerMetrics {
pub(crate) fn new(registry: Registry) -> SystemExporterResult<Self> {
let container_block_io_read = GaugeVec::new(
opts!("container_block_io_read", "Container Block I/O Read"),
&["container"],
)?;
let container_block_io_write = GaugeVec::new(
opts!("container_block_io_write", "Container Block I/O Write"),
&["container"],
)?;
let container_mem_used = GaugeVec::new(
opts!("container_mem_used", "Container Memory Used"),
&["container"],
)?;
let container_mem_available = GaugeVec::new(
opts!("container_mem_available", "Container Memory Available"),
&["container"],
)?;
let container_mem_percentage = GaugeVec::new(
opts!("container_mem_percentage", "Container Memory Percentage"),
&["container"],
)?;
let container_net_io_received = GaugeVec::new(
opts!(
"container_net_io_received",
"Container Network I/O Received"
),
&["container"],
)?;
let container_net_io_transmitted = GaugeVec::new(
opts!(
"container_net_io_transmitted",
"Container Network I/O Transmitted"
),
&["container"],
)?;
let container_cpu_perc = GaugeVec::new(
opts!("container_cpu_perc", "Container CPU Percentage"),
&["container"],
)?;
registry.register(Box::new(container_block_io_read.clone()))?;
registry.register(Box::new(container_block_io_write.clone()))?;
registry.register(Box::new(container_mem_used.clone()))?;
registry.register(Box::new(container_mem_available.clone()))?;
registry.register(Box::new(container_mem_percentage.clone()))?;
registry.register(Box::new(container_net_io_received.clone()))?;
registry.register(Box::new(container_net_io_transmitted.clone()))?;
registry.register(Box::new(container_cpu_perc.clone()))?;
Ok(Self {
container_block_io_read,
container_block_io_write,
container_mem_used,
container_mem_available,
container_mem_percentage,
container_net_io_received,
container_net_io_transmitted,
container_cpu_perc,
})
}
}

46
src/docker/mod.rs Normal file
View File

@@ -0,0 +1,46 @@
use rust_decimal::Decimal;
use serde::Deserialize;
pub mod collector;
pub mod metrics;
mod util;
#[derive(Debug, Deserialize)]
pub struct DockerStats {
#[serde(rename = "BlockIO", with = "util::block_io_format")]
pub block_io: BlockIo,
#[serde(rename = "CPUPerc", with = "util::perc_format")]
pub cpu_perc: Decimal,
#[serde(rename = "Container")]
pub container: String,
#[serde(rename = "ID")]
pub id: String,
#[serde(rename = "MemPerc", with = "util::perc_format")]
pub mem_perc: Decimal,
#[serde(rename = "MemUsage", with = "util::mem_usage_format")]
pub mem_usage: MemUsage,
#[serde(rename = "Name")]
pub name: String,
#[serde(rename = "NetIO", with = "util::net_io_format")]
pub net_io: NetIo,
// #[serde(rename = "PIDs")]
// pub pids: String,
}
#[derive(Debug, Deserialize)]
pub struct BlockIo {
pub read: Decimal,
pub write: Decimal,
}
#[derive(Debug, Deserialize)]
pub struct MemUsage {
pub used: Decimal,
pub total: Decimal,
}
#[derive(Debug, Deserialize)]
pub struct NetIo {
pub received: Decimal,
pub transmitted: Decimal,
}

193
src/docker/util.rs Normal file
View File

@@ -0,0 +1,193 @@
use crate::docker::{metrics::DockerMetrics, DockerStats};
use rust_decimal::Decimal;
use std::str::FromStr;
pub(crate) mod block_io_format {
use crate::docker::{util::parse_to_decimal, BlockIo};
use serde::{self, Deserialize, Deserializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<BlockIo, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let parts: Vec<&str> = s.split(" / ").collect();
if parts.len() == 2 {
match (parse_to_decimal(parts[0]), parse_to_decimal(parts[1])) {
(Ok(read), Ok(write)) => Ok(BlockIo { read, write }),
_ => Err(serde::de::Error::custom("Invalid decimal format")),
}
} else {
Err(serde::de::Error::custom("Invalid format"))
}
}
}
pub(crate) mod mem_usage_format {
use crate::docker::{util::parse_to_decimal, MemUsage};
use serde::{self, Deserialize, Deserializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<MemUsage, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let parts: Vec<&str> = s.split(" / ").collect();
if parts.len() == 2 {
match (parse_to_decimal(parts[0]), parse_to_decimal(parts[1])) {
(Ok(used), Ok(total)) => Ok(MemUsage { used, total }),
_ => Err(serde::de::Error::custom("Invalid decimal format")),
}
} else {
Err(serde::de::Error::custom("Invalid format"))
}
}
}
pub(crate) mod net_io_format {
use crate::docker::{util::parse_to_decimal, NetIo};
use serde::{self, Deserialize, Deserializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<NetIo, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let parts: Vec<&str> = s.split(" / ").collect();
if parts.len() == 2 {
match (parse_to_decimal(parts[0]), parse_to_decimal(parts[1])) {
(Ok(received), Ok(transmitted)) => Ok(NetIo {
received,
transmitted,
}),
_ => Err(serde::de::Error::custom("Invalid decimal format")),
}
} else {
Err(serde::de::Error::custom("Invalid format"))
}
}
}
pub(crate) mod perc_format {
use rust_decimal::Decimal;
use serde::{self, Deserialize, Deserializer};
use std::str::FromStr;
pub fn deserialize<'de, D>(deserializer: D) -> Result<Decimal, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let number_part: &str = s.trim_end_matches('%');
match Decimal::from_str(number_part) {
Ok(value) => Ok(value),
Err(_) => Err(serde::de::Error::custom("Invalid decimal format")),
}
}
}
fn parse_to_decimal(input: &str) -> Result<Decimal, rust_decimal::Error> {
let multiplier = if input.ends_with("kB") {
1_000
} else if input.ends_with("MB") {
1_000_000
} else if input.ends_with("GB") {
1_000_000_000
} else if input.ends_with("KiB") {
1_000
} else if input.ends_with("MiB") {
1_000_000
} else if input.ends_with("GiB") {
1_000_000_000
} else {
1
};
let number_part: &str = input.trim_end_matches(|c: char| !c.is_numeric() && c != '.');
let value = Decimal::from_str(number_part)?;
Ok(value * Decimal::from(multiplier))
}
impl DockerStats {
pub fn post_metrics(&self, metrics: &DockerMetrics) {
metrics
.container_cpu_perc
.with_label_values(&[&self.name])
.set(
self.cpu_perc
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_mem_percentage
.with_label_values(&[&self.name])
.set(
self.mem_perc
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_mem_used
.with_label_values(&[&self.name])
.set(
self.mem_usage
.used
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_mem_available
.with_label_values(&[&self.name])
.set(
self.mem_usage
.total
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_block_io_read
.with_label_values(&[&self.name])
.set(
self.block_io
.read
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_block_io_write
.with_label_values(&[&self.name])
.set(
self.block_io
.write
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_net_io_received
.with_label_values(&[&self.name])
.set(
self.net_io
.received
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
metrics
.container_net_io_transmitted
.with_label_values(&[&self.name])
.set(
self.net_io
.transmitted
.to_string()
.parse::<f64>()
.expect("Failed to parse"),
);
}
}

15
src/error.rs Normal file
View File

@@ -0,0 +1,15 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SystemExporterError {
#[error(transparent)]
Hyper(#[from] hyper::Error),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error(transparent)]
Prometheus(#[from] prometheus::Error),
#[error(transparent)]
AddrParse(#[from] std::net::AddrParseError),
}
pub type SystemExporterResult<T, E = SystemExporterError> = Result<T, E>;

45
src/main.rs Normal file
View File

@@ -0,0 +1,45 @@
use crate::{error::SystemExporterResult, metrics::Metrics};
use clap::Parser;
use std::{sync::Arc, time::Duration};
use crate::docker::collector::DockerCollector;
use crate::server::Server;
mod docker;
mod error;
mod metrics;
mod server;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// The address to bind the metrics server on
#[arg(long, default_value = "0.0.0.0")]
pub bind_address: String,
/// The port to bind the metrics server to
#[arg(long, default_value_t = 45454)]
pub bind_port: u32,
/// The metrics registry prefix
#[arg(long, default_value = "system_exporter")]
pub metrics_prefix: String,
/// The interval to poll docker stats
#[arg(long, default_value_t = 5)]
pub docker_stats_seconds: u64,
}
#[tokio::main]
async fn main() -> SystemExporterResult<()> {
let args = Args::parse();
let metrics = Arc::new(Metrics::new(args.metrics_prefix.as_str())?);
let server = Server::new(&args, metrics.clone())?;
let collector = DockerCollector::new(metrics);
tokio::spawn(async move {
loop {
let _ = collector.poll();
tokio::time::sleep(Duration::from_secs(args.docker_stats_seconds)).await;
}
});
server.serve().await
}

18
src/metrics.rs Normal file
View File

@@ -0,0 +1,18 @@
use crate::{docker::metrics::DockerMetrics, error::SystemExporterResult};
use prometheus::Registry;
use std::sync::Arc;
pub struct Metrics {
pub registry: Arc<Registry>,
pub docker: DockerMetrics,
}
impl Metrics {
pub fn new(prefix: &str) -> SystemExporterResult<Self> {
let registry = Registry::new_custom(Some(prefix.to_string()), None)?;
Ok(Self {
docker: DockerMetrics::new(registry.clone())?,
registry: Arc::new(registry),
})
}
}

49
src/server.rs Normal file
View File

@@ -0,0 +1,49 @@
use std::{net::SocketAddr, sync::Arc};
use crate::{error::SystemExporterResult, Args};
use axum::{routing::get, Extension, Router};
use http::{Response, StatusCode};
use hyper::Body;
use prometheus::{Encoder, TextEncoder};
use tower_http::add_extension::AddExtensionLayer;
use crate::metrics::Metrics;
pub struct Server {
pub metrics: Arc<Metrics>,
bind_addr: SocketAddr,
}
impl Server {
pub fn new(args: &Args, metrics: Arc<Metrics>) -> SystemExporterResult<Self> {
let bind_addr: SocketAddr = format!("{}:{}", args.bind_address, args.bind_port).parse()?;
Ok(Self { metrics, bind_addr })
}
}
impl Server {
pub async fn serve(&self) -> SystemExporterResult<()> {
let app = Router::new()
.route("/metrics", get(get_metrics))
.layer(AddExtensionLayer::new(self.metrics.clone()));
Ok(axum::Server::bind(&self.bind_addr)
.serve(app.into_make_service())
.await?)
}
}
async fn get_metrics(
Extension(metrics): Extension<Arc<Metrics>>,
) -> Result<Response<Body>, StatusCode> {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = metrics.registry.gather();
encoder
.encode(&metric_families, &mut buffer)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let metrics_str = String::from_utf8(buffer).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let mut response = Response::new(Body::from(metrics_str));
*response.status_mut() = StatusCode::OK;
Ok(response)
}