Autofallback on ME-Init

This commit is contained in:
Alexey
2026-02-18 19:50:16 +03:00
parent d3302d77d2
commit c7bd1c98e7
5 changed files with 131 additions and 13 deletions

View File

@@ -46,6 +46,13 @@ pub(crate) fn default_keepalive() -> u64 {
pub(crate) fn default_ack_timeout() -> u64 { pub(crate) fn default_ack_timeout() -> u64 {
300 300
} }
pub(crate) fn default_me_one_retry() -> u8 {
3
}
pub(crate) fn default_me_one_timeout() -> u64 {
1500
}
pub(crate) fn default_listen_addr() -> String { pub(crate) fn default_listen_addr() -> String {
"0.0.0.0".to_string() "0.0.0.0".to_string()

View File

@@ -264,6 +264,14 @@ pub struct TimeoutsConfig {
#[serde(default = "default_ack_timeout")] #[serde(default = "default_ack_timeout")]
pub client_ack: u64, pub client_ack: u64,
/// Number of quick ME reconnect attempts for single-address DC.
#[serde(default = "default_me_one_retry")]
pub me_one_retry: u8,
/// Timeout per quick attempt in milliseconds for single-address DC.
#[serde(default = "default_me_one_timeout")]
pub me_one_timeout_ms: u64,
} }
impl Default for TimeoutsConfig { impl Default for TimeoutsConfig {
@@ -273,6 +281,8 @@ impl Default for TimeoutsConfig {
tg_connect: default_connect_timeout(), tg_connect: default_connect_timeout(),
client_keepalive: default_keepalive(), client_keepalive: default_keepalive(),
client_ack: default_ack_timeout(), client_ack: default_ack_timeout(),
me_one_retry: default_me_one_retry(),
me_one_timeout_ms: default_me_one_timeout(),
} }
} }
} }

View File

@@ -318,6 +318,8 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_probe,
config.general.middle_proxy_nat_stun.clone(), config.general.middle_proxy_nat_stun.clone(),
probe.detected_ipv6, probe.detected_ipv6,
config.timeouts.me_one_retry,
config.timeouts.me_one_timeout_ms,
cfg_v4.map.clone(), cfg_v4.map.clone(),
cfg_v6.map.clone(), cfg_v6.map.clone(),
cfg_v4.default_dc.or(cfg_v6.default_dc), cfg_v4.default_dc.or(cfg_v6.default_dc),

View File

@@ -14,10 +14,27 @@ use super::MePool;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
loop { loop {
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
check_family(IpFamily::V4, &pool, &rng, &mut backoff, &mut last_attempt).await; check_family(
check_family(IpFamily::V6, &pool, &rng, &mut backoff, &mut last_attempt).await; IpFamily::V4,
&pool,
&rng,
&mut backoff,
&mut last_attempt,
&mut inflight_single,
)
.await;
check_family(
IpFamily::V6,
&pool,
&rng,
&mut backoff,
&mut last_attempt,
&mut inflight_single,
)
.await;
} }
} }
@@ -27,6 +44,7 @@ async fn check_family(
rng: &Arc<SecureRandom>, rng: &Arc<SecureRandom>,
backoff: &mut HashMap<(i32, IpFamily), u64>, backoff: &mut HashMap<(i32, IpFamily), u64>,
last_attempt: &mut HashMap<(i32, IpFamily), Instant>, last_attempt: &mut HashMap<(i32, IpFamily), Instant>,
inflight_single: &mut HashSet<(i32, IpFamily)>,
) { ) {
let enabled = match family { let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me, IpFamily::V4 => pool.decision.ipv4_me,
@@ -48,16 +66,24 @@ async fn check_family(
.map(|w| w.addr) .map(|w| w.addr)
.collect(); .collect();
for (dc, addrs) in map.iter() { let entries: Vec<(i32, Vec<SocketAddr>)> = map
let dc_addrs: Vec<SocketAddr> = addrs .iter()
.map(|(dc, addrs)| {
let list = addrs
.iter() .iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port)) .map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect::<Vec<_>>();
(*dc, list)
})
.collect(); .collect();
for (dc, dc_addrs) in entries {
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
if has_coverage { if has_coverage {
inflight_single.remove(&(dc, family));
continue; continue;
} }
let key = (*dc, family); let key = (dc, family);
let delay = *backoff.get(&key).unwrap_or(&30); let delay = *backoff.get(&key).unwrap_or(&30);
let now = Instant::now(); let now = Instant::now();
if let Some(last) = last_attempt.get(&key) { if let Some(last) = last_attempt.get(&key) {
@@ -65,6 +91,64 @@ async fn check_family(
continue; continue;
} }
} }
if dc_addrs.len() == 1 {
// Single ME address: fast retries then slower background retries.
if inflight_single.contains(&key) {
continue;
}
inflight_single.insert(key);
let addr = dc_addrs[0];
let dc_id = dc;
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let timeout = pool.me_one_timeout;
let quick_attempts = pool.me_one_retry.max(1);
tokio::spawn(async move {
let mut success = false;
for _ in 0..quick_attempts {
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
success = true;
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
if success {
return;
}
let timeout_ms = timeout.as_millis();
warn!(
dc = %dc_id,
?family,
attempts = quick_attempts,
timeout_ms,
"DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...",
dc_id,
quick_attempts,
timeout_ms
);
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
match res {
Ok(Ok(())) => {
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
break;
}
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
}
}
// will drop inflight flag in outer loop when coverage detected
});
continue;
}
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting...");
let mut shuffled = dc_addrs.clone(); let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng()); shuffled.shuffle(&mut rand::rng());

View File

@@ -50,6 +50,8 @@ pub struct MePool {
pub(super) detected_ipv6: Option<Ipv6Addr>, pub(super) detected_ipv6: Option<Ipv6Addr>,
pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8,
pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool,
pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>, pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32, pub(super) default_dc: AtomicI32,
@@ -74,6 +76,8 @@ impl MePool {
nat_probe: bool, nat_probe: bool,
nat_stun: Option<String>, nat_stun: Option<String>,
detected_ipv6: Option<Ipv6Addr>, detected_ipv6: Option<Ipv6Addr>,
me_one_retry: u8,
me_one_timeout_ms: u64,
proxy_map_v4: HashMap<i32, Vec<(IpAddr, u16)>>, proxy_map_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
proxy_map_v6: HashMap<i32, Vec<(IpAddr, u16)>>, proxy_map_v6: HashMap<i32, Vec<(IpAddr, u16)>>,
default_dc: Option<i32>, default_dc: Option<i32>,
@@ -95,6 +99,8 @@ impl MePool {
detected_ipv6, detected_ipv6,
nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), nat_probe_attempts: std::sync::atomic::AtomicU8::new(0),
nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), nat_probe_disabled: std::sync::atomic::AtomicBool::new(false),
me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
pool_size: 2, pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@@ -251,6 +257,7 @@ impl MePool {
// Ensure at least one connection per DC; run DCs in parallel. // Ensure at least one connection per DC; run DCs in parallel.
let mut join = tokio::task::JoinSet::new(); let mut join = tokio::task::JoinSet::new();
let mut dc_failures = 0usize;
for (dc, addrs) in dc_addrs.iter().cloned() { for (dc, addrs) in dc_addrs.iter().cloned() {
if addrs.is_empty() { if addrs.is_empty() {
continue; continue;
@@ -258,10 +265,17 @@ impl MePool {
let pool = Arc::clone(self); let pool = Arc::clone(self);
let rng_clone = Arc::clone(rng); let rng_clone = Arc::clone(rng);
join.spawn(async move { join.spawn(async move {
pool.connect_primary_for_dc(dc, addrs, rng_clone).await; pool.connect_primary_for_dc(dc, addrs, rng_clone).await
}); });
} }
while let Some(_res) = join.join_next().await {} while let Some(res) = join.join_next().await {
if let Ok(false) = res {
dc_failures += 1;
}
}
if dc_failures > 2 {
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
}
// Additional connections up to pool_size total (round-robin across DCs) // Additional connections up to pool_size total (round-robin across DCs)
for (dc, addrs) in dc_addrs.iter() { for (dc, addrs) in dc_addrs.iter() {
@@ -397,9 +411,9 @@ impl MePool {
dc: i32, dc: i32,
mut addrs: Vec<(IpAddr, u16)>, mut addrs: Vec<(IpAddr, u16)>,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
) { ) -> bool {
if addrs.is_empty() { if addrs.is_empty() {
return; return false;
} }
addrs.shuffle(&mut rand::rng()); addrs.shuffle(&mut rand::rng());
for (ip, port) in addrs { for (ip, port) in addrs {
@@ -407,12 +421,13 @@ impl MePool {
match self.connect_one(addr, rng.as_ref()).await { match self.connect_one(addr, rng.as_ref()).await {
Ok(()) => { Ok(()) => {
info!(%addr, dc = %dc, "ME connected"); info!(%addr, dc = %dc, "ME connected");
return; return true;
} }
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"), Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"),
} }
} }
warn!(dc = %dc, "All ME servers for DC failed at init"); warn!(dc = %dc, "All ME servers for DC failed at init");
false
} }
pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(&self, writer_id: u64) {