[network] in upstream

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-18 06:02:24 +03:00
parent eb9ac7fae4
commit 67bae1cf2a

View File

@@ -355,6 +355,8 @@ impl UpstreamManager {
&self, &self,
prefer_ipv6: bool, prefer_ipv6: bool,
dc_overrides: &HashMap<String, Vec<String>>, dc_overrides: &HashMap<String, Vec<String>>,
ipv4_enabled: bool,
ipv6_enabled: bool,
) -> Vec<StartupPingResult> { ) -> Vec<StartupPingResult> {
let upstreams: Vec<(usize, UpstreamConfig)> = { let upstreams: Vec<(usize, UpstreamConfig)> = {
let guard = self.upstreams.read().await; let guard = self.upstreams.read().await;
@@ -374,85 +376,106 @@ impl UpstreamManager {
UpstreamType::Socks5 { address, .. } => format!("socks5://{}", address), UpstreamType::Socks5 { address, .. } => format!("socks5://{}", address),
}; };
let mut v6_results = Vec::new(); let mut v6_results = Vec::with_capacity(NUM_DCS);
let mut v4_results = Vec::new(); if ipv6_enabled {
for dc_zero_idx in 0..NUM_DCS {
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT);
// === Ping IPv6 first === let result = tokio::time::timeout(
for dc_zero_idx in 0..NUM_DCS { Duration::from_secs(DC_PING_TIMEOUT_SECS),
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; self.ping_single_dc(&upstream_config, addr_v6)
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); ).await;
let result = tokio::time::timeout( let ping_result = match result {
Duration::from_secs(DC_PING_TIMEOUT_SECS), Ok(Ok(rtt_ms)) => {
self.ping_single_dc(&upstream_config, addr_v6) let mut guard = self.upstreams.write().await;
).await; if let Some(u) = guard.get_mut(*upstream_idx) {
u.dc_latency[dc_zero_idx].update(rtt_ms);
let ping_result = match result { }
Ok(Ok(rtt_ms)) => { DcPingResult {
let mut guard = self.upstreams.write().await; dc_idx: dc_zero_idx + 1,
if let Some(u) = guard.get_mut(*upstream_idx) { dc_addr: addr_v6,
u.dc_latency[dc_zero_idx].update(rtt_ms); rtt_ms: Some(rtt_ms),
error: None,
}
} }
DcPingResult { Ok(Err(e)) => DcPingResult {
dc_idx: dc_zero_idx + 1, dc_idx: dc_zero_idx + 1,
dc_addr: addr_v6, dc_addr: addr_v6,
rtt_ms: Some(rtt_ms), rtt_ms: None,
error: None, error: Some(e.to_string()),
} },
} Err(_) => DcPingResult {
Ok(Err(e)) => DcPingResult { dc_idx: dc_zero_idx + 1,
dc_addr: addr_v6,
rtt_ms: None,
error: Some("timeout".to_string()),
},
};
v6_results.push(ping_result);
}
} else {
for dc_zero_idx in 0..NUM_DCS {
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
v6_results.push(DcPingResult {
dc_idx: dc_zero_idx + 1, dc_idx: dc_zero_idx + 1,
dc_addr: addr_v6, dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT),
rtt_ms: None, rtt_ms: None,
error: Some(e.to_string()), error: Some("ipv6 disabled".to_string()),
}, });
Err(_) => DcPingResult { }
dc_idx: dc_zero_idx + 1,
dc_addr: addr_v6,
rtt_ms: None,
error: Some("timeout".to_string()),
},
};
v6_results.push(ping_result);
} }
// === Then ping IPv4 === let mut v4_results = Vec::with_capacity(NUM_DCS);
for dc_zero_idx in 0..NUM_DCS { if ipv4_enabled {
let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; for dc_zero_idx in 0..NUM_DCS {
let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT); let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx];
let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT);
let result = tokio::time::timeout( let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS), Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, addr_v4) self.ping_single_dc(&upstream_config, addr_v4)
).await; ).await;
let ping_result = match result { let ping_result = match result {
Ok(Ok(rtt_ms)) => { Ok(Ok(rtt_ms)) => {
let mut guard = self.upstreams.write().await; let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(*upstream_idx) { if let Some(u) = guard.get_mut(*upstream_idx) {
u.dc_latency[dc_zero_idx].update(rtt_ms); u.dc_latency[dc_zero_idx].update(rtt_ms);
}
DcPingResult {
dc_idx: dc_zero_idx + 1,
dc_addr: addr_v4,
rtt_ms: Some(rtt_ms),
error: None,
}
} }
DcPingResult { Ok(Err(e)) => DcPingResult {
dc_idx: dc_zero_idx + 1, dc_idx: dc_zero_idx + 1,
dc_addr: addr_v4, dc_addr: addr_v4,
rtt_ms: Some(rtt_ms), rtt_ms: None,
error: None, error: Some(e.to_string()),
} },
} Err(_) => DcPingResult {
Ok(Err(e)) => DcPingResult { dc_idx: dc_zero_idx + 1,
dc_addr: addr_v4,
rtt_ms: None,
error: Some("timeout".to_string()),
},
};
v4_results.push(ping_result);
}
} else {
for dc_zero_idx in 0..NUM_DCS {
let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx];
v4_results.push(DcPingResult {
dc_idx: dc_zero_idx + 1, dc_idx: dc_zero_idx + 1,
dc_addr: addr_v4, dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT),
rtt_ms: None, rtt_ms: None,
error: Some(e.to_string()), error: Some("ipv4 disabled".to_string()),
}, });
Err(_) => DcPingResult { }
dc_idx: dc_zero_idx + 1,
dc_addr: addr_v4,
rtt_ms: None,
error: Some("timeout".to_string()),
},
};
v4_results.push(ping_result);
} }
// === Ping DC overrides (v4/v6) === // === Ping DC overrides (v4/v6) ===
@@ -470,6 +493,9 @@ impl UpstreamManager {
match addr_str.parse::<SocketAddr>() { match addr_str.parse::<SocketAddr>() {
Ok(addr) => { Ok(addr) => {
let is_v6 = addr.is_ipv6(); let is_v6 = addr.is_ipv6();
if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) {
continue;
}
let result = tokio::time::timeout( let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS), Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, addr) self.ping_single_dc(&upstream_config, addr)
@@ -551,7 +577,7 @@ impl UpstreamManager {
/// Background health check: rotates through DCs, 30s interval. /// Background health check: rotates through DCs, 30s interval.
/// Uses preferred IP version based on config. /// Uses preferred IP version based on config.
pub async fn run_health_checks(&self, prefer_ipv6: bool) { pub async fn run_health_checks(&self, prefer_ipv6: bool, ipv4_enabled: bool, ipv6_enabled: bool) {
let mut dc_rotation = 0usize; let mut dc_rotation = 0usize;
loop { loop {
@@ -560,16 +586,24 @@ impl UpstreamManager {
let dc_zero_idx = dc_rotation % NUM_DCS; let dc_zero_idx = dc_rotation % NUM_DCS;
dc_rotation += 1; dc_rotation += 1;
let dc_addr = if prefer_ipv6 { let primary_v6 = SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT);
SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) let primary_v4 = SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT);
let dc_addr = if prefer_ipv6 && ipv6_enabled {
primary_v6
} else if ipv4_enabled {
primary_v4
} else if ipv6_enabled {
primary_v6
} else { } else {
SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) continue;
}; };
let fallback_addr = if prefer_ipv6 { let fallback_addr = if dc_addr.is_ipv6() && ipv4_enabled {
SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) Some(primary_v4)
} else if dc_addr.is_ipv4() && ipv6_enabled {
Some(primary_v6)
} else { } else {
SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) None
}; };
let count = self.upstreams.read().await.len(); let count = self.upstreams.read().await.len();
@@ -608,48 +642,60 @@ impl UpstreamManager {
// Try fallback // Try fallback
debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback"); debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback");
let start2 = Instant::now(); if let Some(fallback_addr) = fallback_addr {
let result2 = tokio::time::timeout( let start2 = Instant::now();
Duration::from_secs(10), let result2 = tokio::time::timeout(
self.connect_via_upstream(&config, fallback_addr) Duration::from_secs(10),
).await; self.connect_via_upstream(&config, fallback_addr)
).await;
let mut guard = self.upstreams.write().await;
let u = &mut guard[i];
match result2 {
Ok(Ok(_stream)) => {
let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0;
u.dc_latency[dc_zero_idx].update(rtt_ms);
if !u.healthy {
info!(
rtt = format!("{:.0} ms", rtt_ms),
dc = dc_zero_idx + 1,
"Upstream recovered (fallback)"
);
}
u.healthy = true;
u.fails = 0;
}
Ok(Err(e)) => {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check failed (both): {}", e);
if u.fails > 3 {
u.healthy = false;
warn!("Upstream unhealthy (fails)");
}
}
Err(_) => {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check timeout (both)");
if u.fails > 3 {
u.healthy = false;
warn!("Upstream unhealthy (timeout)");
}
}
}
u.last_check = std::time::Instant::now();
continue;
}
let mut guard = self.upstreams.write().await; let mut guard = self.upstreams.write().await;
let u = &mut guard[i]; let u = &mut guard[i];
u.fails += 1;
match result2 { if u.fails > 3 {
Ok(Ok(_stream)) => { u.healthy = false;
let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0; warn!("Upstream unhealthy (no fallback family)");
u.dc_latency[dc_zero_idx].update(rtt_ms);
if !u.healthy {
info!(
rtt = format!("{:.0} ms", rtt_ms),
dc = dc_zero_idx + 1,
"Upstream recovered (fallback)"
);
}
u.healthy = true;
u.fails = 0;
}
Ok(Err(e)) => {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check failed (both): {}", e);
if u.fails > 3 {
u.healthy = false;
warn!("Upstream unhealthy (fails)");
}
}
Err(_) => {
u.fails += 1;
debug!(dc = dc_zero_idx + 1, fails = u.fails,
"Health check timeout (both)");
if u.fails > 3 {
u.healthy = false;
warn!("Upstream unhealthy (timeout)");
}
}
} }
u.last_check = std::time::Instant::now(); u.last_check = std::time::Instant::now();
} }