From a1db082ec0bb77f33e7c2f9659c97866bd1ca068 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Tue, 17 Feb 2026 01:11:01 +0300 Subject: [PATCH] Add Prometheus /metrics HTTP endpoint Wire up unused metrics_port/metrics_whitelist config into working HTTP server exposing proxy stats in Prometheus text format. --- Cargo.toml | 3 + Dockerfile | 1 + docker-compose.yml | 1 + src/main.rs | 9 +++ src/metrics.rs | 197 +++++++++++++++++++++++++++++++++++++++++++++ src/stats/mod.rs | 6 ++ 6 files changed, 217 insertions(+) create mode 100644 src/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index 4e5a82d..b685175 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,9 @@ num-traits = "0.2" # HTTP reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false } +hyper = { version = "1", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio", "server-auto"] } +http-body-util = "0.1" [dev-dependencies] tokio-test = "0.4" diff --git a/Dockerfile b/Dockerfile index 9e600c2..662ec22 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,6 +37,7 @@ RUN chown -R telemt:telemt /app USER telemt EXPOSE 443 +EXPOSE 9090 ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index f72b66d..8386caf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: restart: unless-stopped ports: - "443:443" + - "9090:9090" # Allow caching 'proxy-secret' in read-only container working_dir: /run/telemt volumes: diff --git a/src/main.rs b/src/main.rs index ba00879..b820785 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ mod config; mod crypto; mod error; mod ip_tracker; +mod metrics; mod protocol; mod proxy; mod stats; @@ -707,6 +708,14 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai .reload(runtime_filter) .expect("Failed to switch log filter"); + if let Some(port) = config.server.metrics_port { + let stats = stats.clone(); + let whitelist = config.server.metrics_whitelist.clone(); + tokio::spawn(async move { + metrics::serve(port, stats, whitelist).await; + }); + } + for listener in listeners { let config = config.clone(); let stats = stats.clone(); diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..24acf30 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,197 @@ +use std::convert::Infallible; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; + +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use tokio::net::TcpListener; +use tracing::{info, warn, debug}; + +use crate::stats::Stats; + +pub async fn serve(port: u16, stats: Arc, whitelist: Vec) { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + warn!(error = %e, "Failed to bind metrics on {}", addr); + return; + } + }; + info!("Metrics endpoint: http://{}/metrics", addr); + + loop { + let (stream, peer) = match listener.accept().await { + Ok(v) => v, + Err(e) => { + warn!(error = %e, "Metrics accept error"); + continue; + } + }; + + if !whitelist.is_empty() && !whitelist.contains(&peer.ip()) { + debug!(peer = %peer, "Metrics request denied by whitelist"); + continue; + } + + let stats = stats.clone(); + tokio::spawn(async move { + let svc = service_fn(move |req| { + let stats = stats.clone(); + async move { handle(req, &stats) } + }); + if let Err(e) = http1::Builder::new() + .serve_connection(hyper_util::rt::TokioIo::new(stream), svc) + .await + { + debug!(error = %e, "Metrics connection error"); + } + }); + } +} + +fn handle(req: Request, stats: &Stats) -> Result>, Infallible> { + if req.uri().path() != "/metrics" { + let resp = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found\n"))) + .unwrap(); + return Ok(resp); + } + + let body = render_metrics(stats); + let resp = Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/plain; version=0.0.4; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap(); + Ok(resp) +} + +fn render_metrics(stats: &Stats) -> String { + use std::fmt::Write; + let mut out = String::with_capacity(4096); + + let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime"); + let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge"); + let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs()); + + let _ = writeln!(out, "# HELP telemt_connections_total Total accepted connections"); + let _ = writeln!(out, "# TYPE telemt_connections_total counter"); + let _ = writeln!(out, "telemt_connections_total {}", stats.get_connects_all()); + + let _ = writeln!(out, "# HELP telemt_connections_bad_total Bad/rejected connections"); + let _ = writeln!(out, "# TYPE telemt_connections_bad_total counter"); + let _ = writeln!(out, "telemt_connections_bad_total {}", stats.get_connects_bad()); + + let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts"); + let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); + let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); + + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); + let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); + let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); + let _ = writeln!(out, "# TYPE telemt_user_connections_current gauge"); + let _ = writeln!(out, "# HELP telemt_user_octets_from_client Per-user bytes received"); + let _ = writeln!(out, "# TYPE telemt_user_octets_from_client counter"); + let _ = writeln!(out, "# HELP telemt_user_octets_to_client Per-user bytes sent"); + let _ = writeln!(out, "# TYPE telemt_user_octets_to_client counter"); + let _ = writeln!(out, "# HELP telemt_user_msgs_from_client Per-user messages received"); + let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter"); + let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent"); + let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter"); + + for entry in stats.iter_user_stats() { + let user = entry.key(); + let s = entry.value(); + let _ = writeln!(out, "telemt_user_connections_total{{user=\"{}\"}} {}", user, s.connects.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_connections_current{{user=\"{}\"}} {}", user, s.curr_connects.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_octets_from_client{{user=\"{}\"}} {}", user, s.octets_from_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_octets_to_client{{user=\"{}\"}} {}", user, s.octets_to_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_msgs_from_client{{user=\"{}\"}} {}", user, s.msgs_from_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_msgs_to_client{{user=\"{}\"}} {}", user, s.msgs_to_client.load(std::sync::atomic::Ordering::Relaxed)); + } + + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_render_metrics_format() { + let stats = Arc::new(Stats::new()); + stats.increment_connects_all(); + stats.increment_connects_all(); + stats.increment_connects_bad(); + stats.increment_handshake_timeouts(); + stats.increment_user_connects("alice"); + stats.increment_user_curr_connects("alice"); + stats.add_user_octets_from("alice", 1024); + stats.add_user_octets_to("alice", 2048); + stats.increment_user_msgs_from("alice"); + stats.increment_user_msgs_to("alice"); + stats.increment_user_msgs_to("alice"); + + let output = render_metrics(&stats); + + assert!(output.contains("telemt_connections_total 2")); + assert!(output.contains("telemt_connections_bad_total 1")); + assert!(output.contains("telemt_handshake_timeouts_total 1")); + assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); + assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); + assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); + assert!(output.contains("telemt_user_octets_to_client{user=\"alice\"} 2048")); + assert!(output.contains("telemt_user_msgs_from_client{user=\"alice\"} 1")); + assert!(output.contains("telemt_user_msgs_to_client{user=\"alice\"} 2")); + } + + #[test] + fn test_render_empty_stats() { + let stats = Stats::new(); + let output = render_metrics(&stats); + assert!(output.contains("telemt_connections_total 0")); + assert!(output.contains("telemt_connections_bad_total 0")); + assert!(output.contains("telemt_handshake_timeouts_total 0")); + assert!(!output.contains("user=")); + } + + #[test] + fn test_render_has_type_annotations() { + let stats = Stats::new(); + let output = render_metrics(&stats); + assert!(output.contains("# TYPE telemt_uptime_seconds gauge")); + assert!(output.contains("# TYPE telemt_connections_total counter")); + assert!(output.contains("# TYPE telemt_connections_bad_total counter")); + assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); + } + + #[tokio::test] + async fn test_endpoint_integration() { + let stats = Arc::new(Stats::new()); + stats.increment_connects_all(); + stats.increment_connects_all(); + stats.increment_connects_all(); + + let port = 19091u16; + let s = stats.clone(); + tokio::spawn(async move { + serve(port, s, vec![]).await; + }); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let resp = reqwest::get(format!("http://127.0.0.1:{}/metrics", port)) + .await.unwrap(); + assert_eq!(resp.status(), 200); + let body = resp.text().await.unwrap(); + assert!(body.contains("telemt_connections_total 3")); + + let resp404 = reqwest::get(format!("http://127.0.0.1:{}/other", port)) + .await.unwrap(); + assert_eq!(resp404.status(), 404); + } +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 5c3a084..2cdcdf9 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -97,6 +97,12 @@ impl Stats { .unwrap_or(0) } + pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) } + + pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, UserStats> { + self.user_stats.iter() + } + pub fn uptime_secs(&self) -> f64 { self.start_time.read() .map(|t| t.elapsed().as_secs_f64())