From 67bae1cf2a57a75b5bc07d1138d6121685ce5d5e Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Feb 2026 06:02:24 +0300 Subject: [PATCH] [network] in upstream Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/upstream.rs | 264 ++++++++++++++++++++++---------------- 1 file changed, 155 insertions(+), 109 deletions(-) diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index db0d366..bf73408 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -355,6 +355,8 @@ impl UpstreamManager { &self, prefer_ipv6: bool, dc_overrides: &HashMap>, + ipv4_enabled: bool, + ipv6_enabled: bool, ) -> Vec { let upstreams: Vec<(usize, UpstreamConfig)> = { let guard = self.upstreams.read().await; @@ -374,85 +376,106 @@ impl UpstreamManager { UpstreamType::Socks5 { address, .. } => format!("socks5://{}", address), }; - let mut v6_results = Vec::new(); - let mut v4_results = Vec::new(); + let mut v6_results = Vec::with_capacity(NUM_DCS); + if ipv6_enabled { + for dc_zero_idx in 0..NUM_DCS { + let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; + let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); - // === Ping IPv6 first === - for dc_zero_idx in 0..NUM_DCS { - let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; - let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); + let result = tokio::time::timeout( + Duration::from_secs(DC_PING_TIMEOUT_SECS), + self.ping_single_dc(&upstream_config, addr_v6) + ).await; - let result = tokio::time::timeout( - Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, addr_v6) - ).await; - - let ping_result = match result { - Ok(Ok(rtt_ms)) => { - let mut guard = self.upstreams.write().await; - if let Some(u) = guard.get_mut(*upstream_idx) { - u.dc_latency[dc_zero_idx].update(rtt_ms); + let ping_result = match result { + Ok(Ok(rtt_ms)) => { + let mut guard = self.upstreams.write().await; + if let Some(u) = guard.get_mut(*upstream_idx) { + u.dc_latency[dc_zero_idx].update(rtt_ms); + } + DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v6, + rtt_ms: Some(rtt_ms), + error: None, + } } - DcPingResult { + Ok(Err(e)) => DcPingResult { dc_idx: dc_zero_idx + 1, dc_addr: addr_v6, - rtt_ms: Some(rtt_ms), - error: None, - } - } - Ok(Err(e)) => DcPingResult { + rtt_ms: None, + error: Some(e.to_string()), + }, + Err(_) => DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v6, + rtt_ms: None, + error: Some("timeout".to_string()), + }, + }; + v6_results.push(ping_result); + } + } else { + for dc_zero_idx in 0..NUM_DCS { + let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; + v6_results.push(DcPingResult { dc_idx: dc_zero_idx + 1, - dc_addr: addr_v6, + dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT), rtt_ms: None, - error: Some(e.to_string()), - }, - Err(_) => DcPingResult { - dc_idx: dc_zero_idx + 1, - dc_addr: addr_v6, - rtt_ms: None, - error: Some("timeout".to_string()), - }, - }; - v6_results.push(ping_result); + error: Some("ipv6 disabled".to_string()), + }); + } } - // === Then ping IPv4 === - for dc_zero_idx in 0..NUM_DCS { - let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; - let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT); + let mut v4_results = Vec::with_capacity(NUM_DCS); + if ipv4_enabled { + for dc_zero_idx in 0..NUM_DCS { + let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; + let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT); - let result = tokio::time::timeout( - Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, addr_v4) - ).await; + let result = tokio::time::timeout( + Duration::from_secs(DC_PING_TIMEOUT_SECS), + self.ping_single_dc(&upstream_config, addr_v4) + ).await; - let ping_result = match result { - Ok(Ok(rtt_ms)) => { - let mut guard = self.upstreams.write().await; - if let Some(u) = guard.get_mut(*upstream_idx) { - u.dc_latency[dc_zero_idx].update(rtt_ms); + let ping_result = match result { + Ok(Ok(rtt_ms)) => { + let mut guard = self.upstreams.write().await; + if let Some(u) = guard.get_mut(*upstream_idx) { + u.dc_latency[dc_zero_idx].update(rtt_ms); + } + DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v4, + rtt_ms: Some(rtt_ms), + error: None, + } } - DcPingResult { + Ok(Err(e)) => DcPingResult { dc_idx: dc_zero_idx + 1, dc_addr: addr_v4, - rtt_ms: Some(rtt_ms), - error: None, - } - } - Ok(Err(e)) => DcPingResult { + rtt_ms: None, + error: Some(e.to_string()), + }, + Err(_) => DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v4, + rtt_ms: None, + error: Some("timeout".to_string()), + }, + }; + v4_results.push(ping_result); + } + } else { + for dc_zero_idx in 0..NUM_DCS { + let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; + v4_results.push(DcPingResult { dc_idx: dc_zero_idx + 1, - dc_addr: addr_v4, + dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT), rtt_ms: None, - error: Some(e.to_string()), - }, - Err(_) => DcPingResult { - dc_idx: dc_zero_idx + 1, - dc_addr: addr_v4, - rtt_ms: None, - error: Some("timeout".to_string()), - }, - }; - v4_results.push(ping_result); + error: Some("ipv4 disabled".to_string()), + }); + } } // === Ping DC overrides (v4/v6) === @@ -470,6 +493,9 @@ impl UpstreamManager { match addr_str.parse::() { Ok(addr) => { let is_v6 = addr.is_ipv6(); + if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) { + continue; + } let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), self.ping_single_dc(&upstream_config, addr) @@ -551,7 +577,7 @@ impl UpstreamManager { /// Background health check: rotates through DCs, 30s interval. /// Uses preferred IP version based on config. - pub async fn run_health_checks(&self, prefer_ipv6: bool) { + pub async fn run_health_checks(&self, prefer_ipv6: bool, ipv4_enabled: bool, ipv6_enabled: bool) { let mut dc_rotation = 0usize; loop { @@ -560,16 +586,24 @@ impl UpstreamManager { let dc_zero_idx = dc_rotation % NUM_DCS; dc_rotation += 1; - let dc_addr = if prefer_ipv6 { - SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) + let primary_v6 = SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT); + let primary_v4 = SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT); + let dc_addr = if prefer_ipv6 && ipv6_enabled { + primary_v6 + } else if ipv4_enabled { + primary_v4 + } else if ipv6_enabled { + primary_v6 } else { - SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) + continue; }; - let fallback_addr = if prefer_ipv6 { - SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) + let fallback_addr = if dc_addr.is_ipv6() && ipv4_enabled { + Some(primary_v4) + } else if dc_addr.is_ipv4() && ipv6_enabled { + Some(primary_v6) } else { - SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) + None }; let count = self.upstreams.read().await.len(); @@ -608,48 +642,60 @@ impl UpstreamManager { // Try fallback debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback"); - let start2 = Instant::now(); - let result2 = tokio::time::timeout( - Duration::from_secs(10), - self.connect_via_upstream(&config, fallback_addr) - ).await; + if let Some(fallback_addr) = fallback_addr { + let start2 = Instant::now(); + let result2 = tokio::time::timeout( + Duration::from_secs(10), + self.connect_via_upstream(&config, fallback_addr) + ).await; + + let mut guard = self.upstreams.write().await; + let u = &mut guard[i]; + + match result2 { + Ok(Ok(_stream)) => { + let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0; + u.dc_latency[dc_zero_idx].update(rtt_ms); + + if !u.healthy { + info!( + rtt = format!("{:.0} ms", rtt_ms), + dc = dc_zero_idx + 1, + "Upstream recovered (fallback)" + ); + } + u.healthy = true; + u.fails = 0; + } + Ok(Err(e)) => { + u.fails += 1; + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check failed (both): {}", e); + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (fails)"); + } + } + Err(_) => { + u.fails += 1; + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check timeout (both)"); + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (timeout)"); + } + } + } + u.last_check = std::time::Instant::now(); + continue; + } let mut guard = self.upstreams.write().await; let u = &mut guard[i]; - - match result2 { - Ok(Ok(_stream)) => { - let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0; - u.dc_latency[dc_zero_idx].update(rtt_ms); - - if !u.healthy { - info!( - rtt = format!("{:.0} ms", rtt_ms), - dc = dc_zero_idx + 1, - "Upstream recovered (fallback)" - ); - } - u.healthy = true; - u.fails = 0; - } - Ok(Err(e)) => { - u.fails += 1; - debug!(dc = dc_zero_idx + 1, fails = u.fails, - "Health check failed (both): {}", e); - if u.fails > 3 { - u.healthy = false; - warn!("Upstream unhealthy (fails)"); - } - } - Err(_) => { - u.fails += 1; - debug!(dc = dc_zero_idx + 1, fails = u.fails, - "Health check timeout (both)"); - if u.fails > 3 { - u.healthy = false; - warn!("Upstream unhealthy (timeout)"); - } - } + u.fails += 1; + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (no fallback family)"); } u.last_check = std::time::Instant::now(); }