From c7bd1c98e7b71a948a0b99de22d975570073b16a Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Feb 2026 19:50:16 +0300 Subject: [PATCH] Autofallback on ME-Init --- src/config/defaults.rs | 7 ++ src/config/types.rs | 10 +++ src/main.rs | 2 + src/transport/middle_proxy/health.rs | 100 ++++++++++++++++++++++++--- src/transport/middle_proxy/pool.rs | 25 +++++-- 5 files changed, 131 insertions(+), 13 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 9df2813..3f8254c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -46,6 +46,13 @@ pub(crate) fn default_keepalive() -> u64 { pub(crate) fn default_ack_timeout() -> u64 { 300 } +pub(crate) fn default_me_one_retry() -> u8 { + 3 +} + +pub(crate) fn default_me_one_timeout() -> u64 { + 1500 +} pub(crate) fn default_listen_addr() -> String { "0.0.0.0".to_string() diff --git a/src/config/types.rs b/src/config/types.rs index c961808..ef8fef7 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -264,6 +264,14 @@ pub struct TimeoutsConfig { #[serde(default = "default_ack_timeout")] pub client_ack: u64, + + /// Number of quick ME reconnect attempts for single-address DC. + #[serde(default = "default_me_one_retry")] + pub me_one_retry: u8, + + /// Timeout per quick attempt in milliseconds for single-address DC. + #[serde(default = "default_me_one_timeout")] + pub me_one_timeout_ms: u64, } impl Default for TimeoutsConfig { @@ -273,6 +281,8 @@ impl Default for TimeoutsConfig { tg_connect: default_connect_timeout(), client_keepalive: default_keepalive(), client_ack: default_ack_timeout(), + me_one_retry: default_me_one_retry(), + me_one_timeout_ms: default_me_one_timeout(), } } } diff --git a/src/main.rs b/src/main.rs index 46d73c7..2660213 100644 --- a/src/main.rs +++ b/src/main.rs @@ -318,6 +318,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_stun.clone(), probe.detected_ipv6, + config.timeouts.me_one_retry, + config.timeouts.me_one_timeout_ms, cfg_v4.map.clone(), cfg_v6.map.clone(), cfg_v4.default_dc.or(cfg_v6.default_dc), diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 348e1d7..7141f94 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -14,10 +14,27 @@ use super::MePool; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); loop { tokio::time::sleep(Duration::from_secs(30)).await; - check_family(IpFamily::V4, &pool, &rng, &mut backoff, &mut last_attempt).await; - check_family(IpFamily::V6, &pool, &rng, &mut backoff, &mut last_attempt).await; + check_family( + IpFamily::V4, + &pool, + &rng, + &mut backoff, + &mut last_attempt, + &mut inflight_single, + ) + .await; + check_family( + IpFamily::V6, + &pool, + &rng, + &mut backoff, + &mut last_attempt, + &mut inflight_single, + ) + .await; } } @@ -27,6 +44,7 @@ async fn check_family( rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, last_attempt: &mut HashMap<(i32, IpFamily), Instant>, + inflight_single: &mut HashSet<(i32, IpFamily)>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -48,16 +66,24 @@ async fn check_family( .map(|w| w.addr) .collect(); - for (dc, addrs) in map.iter() { - let dc_addrs: Vec = addrs - .iter() - .map(|(ip, port)| SocketAddr::new(*ip, *port)) - .collect(); + let entries: Vec<(i32, Vec)> = map + .iter() + .map(|(dc, addrs)| { + let list = addrs + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect::>(); + (*dc, list) + }) + .collect(); + + for (dc, dc_addrs) in entries { let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); if has_coverage { + inflight_single.remove(&(dc, family)); continue; } - let key = (*dc, family); + let key = (dc, family); let delay = *backoff.get(&key).unwrap_or(&30); let now = Instant::now(); if let Some(last) = last_attempt.get(&key) { @@ -65,6 +91,64 @@ async fn check_family( continue; } } + if dc_addrs.len() == 1 { + // Single ME address: fast retries then slower background retries. + if inflight_single.contains(&key) { + continue; + } + inflight_single.insert(key); + let addr = dc_addrs[0]; + let dc_id = dc; + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let timeout = pool.me_one_timeout; + let quick_attempts = pool.me_one_retry.max(1); + tokio::spawn(async move { + let mut success = false; + for _ in 0..quick_attempts { + let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; + match res { + Ok(Ok(())) => { + info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); + success = true; + break; + } + Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), + Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), + } + tokio::time::sleep(Duration::from_millis(1000)).await; + } + if success { + return; + } + let timeout_ms = timeout.as_millis(); + warn!( + dc = %dc_id, + ?family, + attempts = quick_attempts, + timeout_ms, + "DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...", + dc_id, + quick_attempts, + timeout_ms + ); + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; + match res { + Ok(Ok(())) => { + info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); + break; + } + Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), + Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), + } + } + // will drop inflight flag in outer loop when coverage detected + }); + continue; + } + warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 888cb23..00041fd 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -50,6 +50,8 @@ pub struct MePool { pub(super) detected_ipv6: Option, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, + pub(super) me_one_retry: u8, + pub(super) me_one_timeout: Duration, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -74,6 +76,8 @@ impl MePool { nat_probe: bool, nat_stun: Option, detected_ipv6: Option, + me_one_retry: u8, + me_one_timeout_ms: u64, proxy_map_v4: HashMap>, proxy_map_v6: HashMap>, default_dc: Option, @@ -95,6 +99,8 @@ impl MePool { detected_ipv6, nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), + me_one_retry, + me_one_timeout: Duration::from_millis(me_one_timeout_ms), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -251,6 +257,7 @@ impl MePool { // Ensure at least one connection per DC; run DCs in parallel. let mut join = tokio::task::JoinSet::new(); + let mut dc_failures = 0usize; for (dc, addrs) in dc_addrs.iter().cloned() { if addrs.is_empty() { continue; @@ -258,10 +265,17 @@ impl MePool { let pool = Arc::clone(self); let rng_clone = Arc::clone(rng); join.spawn(async move { - pool.connect_primary_for_dc(dc, addrs, rng_clone).await; + pool.connect_primary_for_dc(dc, addrs, rng_clone).await }); } - while let Some(_res) = join.join_next().await {} + while let Some(res) = join.join_next().await { + if let Ok(false) = res { + dc_failures += 1; + } + } + if dc_failures > 2 { + return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into())); + } // Additional connections up to pool_size total (round-robin across DCs) for (dc, addrs) in dc_addrs.iter() { @@ -397,9 +411,9 @@ impl MePool { dc: i32, mut addrs: Vec<(IpAddr, u16)>, rng: Arc, - ) { + ) -> bool { if addrs.is_empty() { - return; + return false; } addrs.shuffle(&mut rand::rng()); for (ip, port) in addrs { @@ -407,12 +421,13 @@ impl MePool { match self.connect_one(addr, rng.as_ref()).await { Ok(()) => { info!(%addr, dc = %dc, "ME connected"); - return; + return true; } Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"), } } warn!(dc = %dc, "All ME servers for DC failed at init"); + false } pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) {