From 32f60f34dbf8a2906672071563f3563f4a7eb0db Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 7 Feb 2026 20:24:12 +0300 Subject: [PATCH] Fix Stats + UpstreamState + EMA Latency Tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Per-DC latency tracking in UpstreamState (array of 5 EMA instances, one per DC): - Added `dc_latency: [LatencyEma; 5]` – per‑DC tracking instead of a single global EMA - `effective_latency(dc_idx)` – returns DC‑specific latency, falls back to average if unavailable - `select_upstream(dc_idx)` – now performs latency‑weighted selection: effective_weight = config_weight × (1000 / latency_ms) - Example: two upstreams with equal config weight but latencies of 50ms and 200ms → selection probabilities become 80% / 20% - `connect(target, dc_idx)` – extended signature, dc_idx used for upstream selection and per‑DC RTT recording - All ping/health‑check operations now record RTT into `dc_latency[dc_zero_index]` - `upstream_manager.connect(dc_addr)` changed to `upstream_manager.connect(dc_addr, Some(success.dc_idx))` – DC index now participates in upstream selection and per‑DC RTT logging - `client.rs` – passes dc_idx when connecting to Telegram - Summary: Upstream selection now accounts for per‑DC latency using the formula weight × (1000/ms). With multiple upstreams (e.g., direct + socks5), traffic automatically flows to the faster route for each specific DC. With a single upstream, the data is used for monitoring without affecting routing. Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/client.rs | 172 ++++++++-------------------------- src/transport/upstream.rs | 192 +++++++++++++++++++++++--------------- 2 files changed, 156 insertions(+), 208 deletions(-) diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 2d85a9f..107cb7b 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -17,7 +17,6 @@ use crate::transport::{configure_client_socket, UpstreamManager}; use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool}; use crate::crypto::{AesCtr, SecureRandom}; -// Use absolute paths to avoid confusion use crate::proxy::handshake::{ handle_tls_handshake, handle_mtproto_handshake, HandshakeSuccess, generate_tg_nonce, encrypt_tg_nonce, @@ -25,10 +24,8 @@ use crate::proxy::handshake::{ use crate::proxy::relay::relay_bidirectional; use crate::proxy::masking::handle_bad_client; -/// Client connection handler (builder struct) pub struct ClientHandler; -/// Running client handler with stream and context pub struct RunningClientHandler { stream: TcpStream, peer: SocketAddr, @@ -41,7 +38,6 @@ pub struct RunningClientHandler { } impl ClientHandler { - /// Create new client handler instance pub fn new( stream: TcpStream, peer: SocketAddr, @@ -53,27 +49,19 @@ impl ClientHandler { rng: Arc, ) -> RunningClientHandler { RunningClientHandler { - stream, - peer, - config, - stats, - replay_checker, - upstream_manager, - buffer_pool, - rng, + stream, peer, config, stats, replay_checker, + upstream_manager, buffer_pool, rng, } } } impl RunningClientHandler { - /// Run the client handler pub async fn run(mut self) -> Result<()> { self.stats.increment_connects_all(); let peer = self.peer; debug!(peer = %peer, "New connection"); - // Configure socket if let Err(e) = configure_client_socket( &self.stream, self.config.timeouts.client_keepalive, @@ -82,16 +70,10 @@ impl RunningClientHandler { debug!(peer = %peer, error = %e, "Failed to configure client socket"); } - // Perform handshake with timeout let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); - - // Clone stats for error handling block let stats = self.stats.clone(); - let result = timeout( - handshake_timeout, - self.do_handshake() - ).await; + let result = timeout(handshake_timeout, self.do_handshake()).await; match result { Ok(Ok(())) => { @@ -110,16 +92,14 @@ impl RunningClientHandler { } } - /// Perform handshake and relay async fn do_handshake(mut self) -> Result<()> { - // Read first bytes to determine handshake type let mut first_bytes = [0u8; 5]; self.stream.read_exact(&mut first_bytes).await?; let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let peer = self.peer; - debug!(peer = %peer, is_tls = is_tls, first_bytes = %hex::encode(&first_bytes), "Handshake type detected"); + debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); if is_tls { self.handle_tls_client(first_bytes).await @@ -128,14 +108,9 @@ impl RunningClientHandler { } } - /// Handle TLS-wrapped client - async fn handle_tls_client( - mut self, - first_bytes: [u8; 5], - ) -> Result<()> { + async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> { let peer = self.peer; - // Read TLS handshake length let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; debug!(peer = %peer, tls_len = tls_len, "Reading TLS handshake"); @@ -143,35 +118,25 @@ impl RunningClientHandler { if tls_len < 512 { debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); self.stats.increment_connects_bad(); - // FIX: Split stream into reader/writer for handle_bad_client let (reader, writer) = self.stream.into_split(); handle_bad_client(reader, writer, &first_bytes, &self.config).await; return Ok(()); } - // Read full TLS handshake let mut handshake = vec![0u8; 5 + tls_len]; handshake[..5].copy_from_slice(&first_bytes); self.stream.read_exact(&mut handshake[5..]).await?; - // Extract fields before consuming self.stream let config = self.config.clone(); let replay_checker = self.replay_checker.clone(); let stats = self.stats.clone(); let buffer_pool = self.buffer_pool.clone(); - // Split stream for reading/writing let (read_half, write_half) = self.stream.into_split(); - // Handle TLS handshake let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake( - &handshake, - read_half, - write_half, - peer, - &config, - &replay_checker, - &self.rng, + &handshake, read_half, write_half, peer, + &config, &replay_checker, &self.rng, ).await { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { @@ -182,84 +147,56 @@ impl RunningClientHandler { HandshakeResult::Error(e) => return Err(e), }; - // Read MTProto handshake through TLS debug!(peer = %peer, "Reading MTProto handshake through TLS"); let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into() .map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?; - // Handle MTProto handshake let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( - &mtproto_handshake, - tls_reader, - tls_writer, - peer, - &config, - &replay_checker, - true, + &mtproto_handshake, tls_reader, tls_writer, peer, + &config, &replay_checker, true, ).await { HandshakeResult::Success(result) => result, - HandshakeResult::BadClient { reader, writer } => { + HandshakeResult::BadClient { reader: _, writer: _ } => { stats.increment_connects_bad(); - // Valid TLS but invalid MTProto - drop - debug!(peer = %peer, "Valid TLS but invalid MTProto handshake - dropping"); + debug!(peer = %peer, "Valid TLS but invalid MTProto handshake"); return Ok(()); } HandshakeResult::Error(e) => return Err(e), }; Self::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - self.upstream_manager, - self.stats, - self.config, - buffer_pool, - self.rng + crypto_reader, crypto_writer, success, + self.upstream_manager, self.stats, self.config, + buffer_pool, self.rng, ).await } - /// Handle direct (non-TLS) client - async fn handle_direct_client( - mut self, - first_bytes: [u8; 5], - ) -> Result<()> { + async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> { let peer = self.peer; - // Check if non-TLS modes are enabled if !self.config.general.modes.classic && !self.config.general.modes.secure { debug!(peer = %peer, "Non-TLS modes disabled"); self.stats.increment_connects_bad(); - // FIX: Split stream into reader/writer for handle_bad_client let (reader, writer) = self.stream.into_split(); handle_bad_client(reader, writer, &first_bytes, &self.config).await; return Ok(()); } - // Read rest of handshake let mut handshake = [0u8; HANDSHAKE_LEN]; handshake[..5].copy_from_slice(&first_bytes); self.stream.read_exact(&mut handshake[5..]).await?; - // Extract fields let config = self.config.clone(); let replay_checker = self.replay_checker.clone(); let stats = self.stats.clone(); let buffer_pool = self.buffer_pool.clone(); - // Split stream let (read_half, write_half) = self.stream.into_split(); - // Handle MTProto handshake let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( - &handshake, - read_half, - write_half, - peer, - &config, - &replay_checker, - false, + &handshake, read_half, write_half, peer, + &config, &replay_checker, false, ).await { HandshakeResult::Success(result) => result, HandshakeResult::BadClient { reader, writer } => { @@ -271,18 +208,12 @@ impl RunningClientHandler { }; Self::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - self.upstream_manager, - self.stats, - self.config, - buffer_pool, - self.rng + crypto_reader, crypto_writer, success, + self.upstream_manager, self.stats, self.config, + buffer_pool, self.rng, ).await } - /// Static version of handle_authenticated_inner async fn handle_authenticated_static( client_reader: CryptoReader, client_writer: CryptoWriter, @@ -299,13 +230,11 @@ impl RunningClientHandler { { let user = &success.user; - // Check user limits if let Err(e) = Self::check_user_limits_static(user, &config, &stats) { warn!(user = %user, error = %e, "User limit exceeded"); return Err(e); } - // Get datacenter address let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?; info!( @@ -314,72 +243,54 @@ impl RunningClientHandler { dc = success.dc_idx, dc_addr = %dc_addr, proto = ?success.proto_tag, - fast_mode = config.general.fast_mode, "Connecting to Telegram" ); - // Connect to Telegram via UpstreamManager - let tg_stream = upstream_manager.connect(dc_addr).await?; + // Pass dc_idx for latency-based upstream selection + let tg_stream = upstream_manager.connect(dc_addr, Some(success.dc_idx)).await?; - debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected to Telegram, performing handshake"); + debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); - // Perform Telegram handshake and get crypto streams let (tg_reader, tg_writer) = Self::do_tg_handshake_static( - tg_stream, - &success, - &config, - rng.as_ref(), + tg_stream, &success, &config, rng.as_ref(), ).await?; - debug!(peer = %success.peer, "Telegram handshake complete, starting relay"); + debug!(peer = %success.peer, "TG handshake complete, starting relay"); - // Update stats stats.increment_user_connects(user); stats.increment_user_curr_connects(user); - // Relay traffic using buffer pool let relay_result = relay_bidirectional( - client_reader, - client_writer, - tg_reader, - tg_writer, - user, - Arc::clone(&stats), - buffer_pool, + client_reader, client_writer, + tg_reader, tg_writer, + user, Arc::clone(&stats), buffer_pool, ).await; - // Update stats stats.decrement_user_curr_connects(user); match &relay_result { - Ok(()) => debug!(user = %user, peer = %success.peer, "Relay completed normally"), - Err(e) => debug!(user = %user, peer = %success.peer, error = %e, "Relay ended with error"), + Ok(()) => debug!(user = %user, "Relay completed"), + Err(e) => debug!(user = %user, error = %e, "Relay ended with error"), } relay_result } - /// Check user limits (static version) fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { - // Check expiration if let Some(expiration) = config.access.user_expirations.get(user) { if chrono::Utc::now() > *expiration { return Err(ProxyError::UserExpired { user: user.to_string() }); } } - // Check connection limit if let Some(limit) = config.access.user_max_tcp_conns.get(user) { - let current = stats.get_user_curr_connects(user); - if current >= *limit as u64 { + if stats.get_user_curr_connects(user) >= *limit as u64 { return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() }); } } - // Check data quota if let Some(quota) = config.access.user_data_quota.get(user) { - let used = stats.get_user_total_octets(user); - if used >= *quota { + if stats.get_user_total_octets(user) >= *quota { return Err(ProxyError::DataQuotaExceeded { user: user.to_string() }); } } @@ -387,7 +298,6 @@ impl RunningClientHandler { Ok(()) } - /// Get datacenter address by index (static version) fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { let idx = (dc_idx.abs() - 1) as usize; @@ -404,47 +314,39 @@ impl RunningClientHandler { )) } - /// Perform handshake with Telegram server (static version) async fn do_tg_handshake_static( mut stream: TcpStream, success: &HandshakeSuccess, config: &ProxyConfig, rng: &SecureRandom, ) -> Result<(CryptoReader, CryptoWriter)> { - // Generate nonce with keys for TG let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce( success.proto_tag, - &success.dec_key, // Client's dec key + &success.dec_key, success.dec_iv, rng, config.general.fast_mode, ); - // Encrypt nonce let encrypted_nonce = encrypt_tg_nonce(&nonce); debug!( peer = %success.peer, nonce_head = %hex::encode(&nonce[..16]), - encrypted_head = %hex::encode(&encrypted_nonce[..16]), "Sending nonce to Telegram" ); - // Send to Telegram stream.write_all(&encrypted_nonce).await?; stream.flush().await?; - debug!(peer = %success.peer, "Nonce sent to Telegram"); - - // Split stream and wrap with crypto let (read_half, write_half) = stream.into_split(); let decryptor = AesCtr::new(&tg_dec_key, tg_dec_iv); let encryptor = AesCtr::new(&tg_enc_key, tg_enc_iv); - let tg_reader = CryptoReader::new(read_half, decryptor); - let tg_writer = CryptoWriter::new(write_half, encryptor); - - Ok((tg_reader, tg_writer)) + Ok(( + CryptoReader::new(read_half, decryptor), + CryptoWriter::new(write_half, encryptor), + )) } } \ No newline at end of file diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 242f599..86e6b2a 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -1,4 +1,4 @@ -//! Upstream Management with RTT tracking and startup ping +//! Upstream Management with per-DC latency-weighted selection use std::net::{SocketAddr, IpAddr}; use std::sync::Arc; @@ -7,7 +7,7 @@ use tokio::net::TcpStream; use tokio::sync::RwLock; use tokio::time::Instant; use rand::Rng; -use tracing::{debug, warn, error, info}; +use tracing::{debug, warn, info, trace}; use crate::config::{UpstreamConfig, UpstreamType}; use crate::error::{Result, ProxyError}; @@ -15,19 +15,19 @@ use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACE use crate::transport::socket::create_outgoing_socket_bound; use crate::transport::socks::{connect_socks4, connect_socks5}; +/// Number of Telegram datacenters +const NUM_DCS: usize = 5; + // ============= RTT Tracking ============= -/// Exponential moving average for latency tracking -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] struct LatencyEma { - /// Current EMA value in milliseconds (None = no data yet) value_ms: Option, - /// Smoothing factor (0.0 - 1.0, higher = more weight to recent) alpha: f64, } impl LatencyEma { - fn new(alpha: f64) -> Self { + const fn new(alpha: f64) -> Self { Self { value_ms: None, alpha } } @@ -51,8 +51,43 @@ struct UpstreamState { healthy: bool, fails: u32, last_check: std::time::Instant, - /// Latency EMA (alpha=0.3 — moderate smoothing) - latency: LatencyEma, + /// Per-DC latency EMA (index 0 = DC1, index 4 = DC5) + dc_latency: [LatencyEma; NUM_DCS], +} + +impl UpstreamState { + fn new(config: UpstreamConfig) -> Self { + Self { + config, + healthy: true, + fails: 0, + last_check: std::time::Instant::now(), + dc_latency: [LatencyEma::new(0.3); NUM_DCS], + } + } + + /// Convert dc_idx (1-based, may be negative) to array index 0..4 + fn dc_array_idx(dc_idx: i16) -> Option { + let idx = (dc_idx.unsigned_abs() as usize).checked_sub(1)?; + if idx < NUM_DCS { Some(idx) } else { None } + } + + /// Get latency for a specific DC, falling back to average across all known DCs + fn effective_latency(&self, dc_idx: Option) -> Option { + // Try DC-specific latency first + if let Some(di) = dc_idx.and_then(Self::dc_array_idx) { + if let Some(ms) = self.dc_latency[di].get() { + return Some(ms); + } + } + + // Fallback: average of all known DC latencies + let (sum, count) = self.dc_latency.iter() + .filter_map(|l| l.get()) + .fold((0.0, 0u32), |(s, c), v| (s + v, c + 1)); + + if count > 0 { Some(sum / count as f64) } else { None } + } } /// Result of a single DC ping @@ -64,7 +99,7 @@ pub struct DcPingResult { pub error: Option, } -/// Result of startup ping across all DCs +/// Result of startup ping for one upstream #[derive(Debug, Clone)] pub struct StartupPingResult { pub results: Vec, @@ -82,13 +117,7 @@ impl UpstreamManager { pub fn new(configs: Vec) -> Self { let states = configs.into_iter() .filter(|c| c.enabled) - .map(|c| UpstreamState { - config: c, - healthy: true, - fails: 0, - last_check: std::time::Instant::now(), - latency: LatencyEma::new(0.3), - }) + .map(UpstreamState::new) .collect(); Self { @@ -96,46 +125,78 @@ impl UpstreamManager { } } - /// Select an upstream using weighted selection among healthy upstreams - async fn select_upstream(&self) -> Option { + /// Select upstream using latency-weighted random selection. + /// + /// `effective_weight = config_weight × latency_factor` + /// + /// where `latency_factor = 1000 / latency_ms` if latency is known, + /// or `1.0` if no latency data is available. + /// + /// This means a 50ms upstream gets factor 20, a 200ms upstream gets + /// factor 5 — the faster route is 4× more likely to be chosen + /// (all else being equal). + async fn select_upstream(&self, dc_idx: Option) -> Option { let upstreams = self.upstreams.read().await; if upstreams.is_empty() { return None; } - let healthy_indices: Vec = upstreams.iter() + let healthy: Vec = upstreams.iter() .enumerate() .filter(|(_, u)| u.healthy) .map(|(i, _)| i) .collect(); - if healthy_indices.is_empty() { + if healthy.is_empty() { + // All unhealthy — pick any return Some(rand::rng().gen_range(0..upstreams.len())); } - let total_weight: u32 = healthy_indices.iter() - .map(|&i| upstreams[i].config.weight as u32) - .sum(); - - if total_weight == 0 { - return Some(healthy_indices[rand::rng().gen_range(0..healthy_indices.len())]); + if healthy.len() == 1 { + return Some(healthy[0]); } - let mut choice = rand::rng().gen_range(0..total_weight); + // Calculate latency-weighted scores + let weights: Vec<(usize, f64)> = healthy.iter().map(|&i| { + let base = upstreams[i].config.weight as f64; + let latency_factor = upstreams[i].effective_latency(dc_idx) + .map(|ms| if ms > 1.0 { 1000.0 / ms } else { 1000.0 }) + .unwrap_or(1.0); + + (i, base * latency_factor) + }).collect(); - for &idx in &healthy_indices { - let weight = upstreams[idx].config.weight as u32; + let total: f64 = weights.iter().map(|(_, w)| w).sum(); + + if total <= 0.0 { + return Some(healthy[rand::rng().gen_range(0..healthy.len())]); + } + + let mut choice: f64 = rand::rng().gen_range(0.0..total); + + for &(idx, weight) in &weights { if choice < weight { + trace!( + upstream = idx, + dc = ?dc_idx, + weight = format!("{:.2}", weight), + total = format!("{:.2}", total), + "Upstream selected" + ); return Some(idx); } choice -= weight; } - Some(healthy_indices[0]) + Some(healthy[0]) } - pub async fn connect(&self, target: SocketAddr) -> Result { - let idx = self.select_upstream().await + /// Connect to target through a selected upstream. + /// + /// `dc_idx` is used for latency-based upstream selection and RTT tracking. + /// Pass `None` if DC index is unknown. + pub async fn connect(&self, target: SocketAddr, dc_idx: Option) -> Result { + let idx = self.select_upstream(dc_idx).await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; let upstream = { @@ -151,11 +212,15 @@ impl UpstreamManager { let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { if !u.healthy { - debug!(rtt_ms = rtt_ms, "Upstream recovered: {:?}", u.config); + debug!(rtt_ms = format!("{:.1}", rtt_ms), "Upstream recovered"); } u.healthy = true; u.fails = 0; - u.latency.update(rtt_ms); + + // Store per-DC latency + if let Some(di) = dc_idx.and_then(UpstreamState::dc_array_idx) { + u.dc_latency[di].update(rtt_ms); + } } Ok(stream) }, @@ -163,10 +228,10 @@ impl UpstreamManager { let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { u.fails += 1; - warn!("Upstream {:?} failed: {}. Consecutive fails: {}", u.config, e, u.fails); + warn!(fails = u.fails, "Upstream failed: {}", e); if u.fails > 3 { u.healthy = false; - warn!("Upstream marked unhealthy: {:?}", u.config); + warn!("Upstream marked unhealthy"); } } Err(e) @@ -200,8 +265,6 @@ impl UpstreamManager { Ok(stream) }, UpstreamType::Socks4 { address, interface, user_id } => { - info!("Connecting to {} via SOCKS4 {}", target, address); - let proxy_addr: SocketAddr = address.parse() .map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?; @@ -229,8 +292,6 @@ impl UpstreamManager { Ok(stream) }, UpstreamType::Socks5 { address, interface, username, password } => { - info!("Connecting to {} via SOCKS5 {}", target, address); - let proxy_addr: SocketAddr = address.parse() .map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?; @@ -262,9 +323,7 @@ impl UpstreamManager { // ============= Startup Ping ============= - /// Ping all Telegram DCs through all upstreams and return results. - /// - /// Used at startup to display connectivity and latency info. + /// Ping all Telegram DCs through all upstreams. pub async fn ping_all_dcs(&self, prefer_ipv6: bool) -> Vec { let upstreams: Vec<(usize, UpstreamConfig)> = { let guard = self.upstreams.read().await; @@ -298,10 +357,10 @@ impl UpstreamManager { let result = match ping_result { Ok(Ok(rtt_ms)) => { - // Update latency EMA + // Store per-DC latency let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(*upstream_idx) { - u.latency.update(rtt_ms); + u.dc_latency[dc_zero_idx].update(rtt_ms); } DcPingResult { dc_idx: dc_zero_idx + 1, @@ -336,33 +395,26 @@ impl UpstreamManager { all_results } - /// Ping a single DC: TCP connect, measure RTT, then drop. async fn ping_single_dc(&self, config: &UpstreamConfig, target: SocketAddr) -> Result { let start = Instant::now(); let _stream = self.connect_via_upstream(config, target).await?; - let rtt = start.elapsed(); - Ok(rtt.as_secs_f64() * 1000.0) + Ok(start.elapsed().as_secs_f64() * 1000.0) } // ============= Health Checks ============= - /// Background health check task. - /// - /// Every 30 seconds, pings one representative DC per upstream. - /// Measures RTT and updates health status. + /// Background health check: rotates through DCs, 30s interval. pub async fn run_health_checks(&self, prefer_ipv6: bool) { let datacenters = if prefer_ipv6 { &*TG_DATACENTERS_V6 } else { &*TG_DATACENTERS_V4 }; - - // Rotate through DCs across check cycles let mut dc_rotation = 0usize; loop { tokio::time::sleep(Duration::from_secs(30)).await; - let check_dc_idx = dc_rotation % datacenters.len(); + let dc_zero_idx = dc_rotation % datacenters.len(); dc_rotation += 1; - let check_target = SocketAddr::new(datacenters[check_dc_idx], TG_DATACENTER_PORT); + let check_target = SocketAddr::new(datacenters[dc_zero_idx], TG_DATACENTER_PORT); let count = self.upstreams.read().await.len(); for i in 0..count { @@ -383,13 +435,13 @@ impl UpstreamManager { match result { Ok(Ok(_stream)) => { let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; - u.latency.update(rtt_ms); + u.dc_latency[dc_zero_idx].update(rtt_ms); if !u.healthy { info!( - rtt_ms = format!("{:.1}", rtt_ms), - dc = check_dc_idx + 1, - "Upstream recovered: {:?}", u.config + rtt = format!("{:.0}ms", rtt_ms), + dc = dc_zero_idx + 1, + "Upstream recovered" ); } u.healthy = true; @@ -397,26 +449,20 @@ impl UpstreamManager { } Ok(Err(e)) => { u.fails += 1; - debug!( - dc = check_dc_idx + 1, - fails = u.fails, - "Health check failed for {:?}: {}", u.config, e - ); + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check failed: {}", e); if u.fails > 3 { u.healthy = false; - warn!("Upstream unhealthy (health check): {:?}", u.config); + warn!("Upstream unhealthy (fails)"); } } Err(_) => { u.fails += 1; - debug!( - dc = check_dc_idx + 1, - fails = u.fails, - "Health check timeout for {:?}", u.config - ); + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check timeout"); if u.fails > 3 { u.healthy = false; - warn!("Upstream unhealthy (timeout): {:?}", u.config); + warn!("Upstream unhealthy (timeout)"); } } }