From da108b2d8c14af6708d222d9dcf7b9de1fc4ccb3 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 15 Feb 2026 14:02:00 +0300 Subject: [PATCH] =?UTF-8?q?Middle=20Proxy=20l=C3=A4uft=20wie=20auf=20Schie?= =?UTF-8?q?nen...?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/mod.rs | 13 +++- src/main.rs | 2 + src/transport/middle_proxy/config_updater.rs | 8 ++- src/transport/middle_proxy/health.rs | 46 ++++++++------ src/transport/middle_proxy/pool.rs | 65 +++++++++++++++----- src/transport/middle_proxy/pool_nat.rs | 2 +- src/transport/middle_proxy/registry.rs | 16 +++++ src/transport/middle_proxy/send.rs | 34 +++++++--- src/transport/middle_proxy/wire.rs | 2 +- 9 files changed, 143 insertions(+), 45 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index ddaceee..a3dee7a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -4,8 +4,9 @@ use crate::error::{ProxyError, Result}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::path::Path; +use tracing::warn; // ============= Helper Defaults ============= @@ -617,6 +618,16 @@ impl ProxyConfig { ))); } + if let Some(tag) = &self.general.ad_tag { + let zeros = "00000000000000000000000000000000"; + if tag == zeros { + warn!("ad_tag is all zeros; register a valid proxy tag via @MTProxybot to enable sponsored channel"); + } + if tag.len() != 32 || tag.chars().any(|c| !c.is_ascii_hexdigit()) { + warn!("ad_tag is not a 32-char hex string; ensure you use value issued by @MTProxybot"); + } + } + Ok(()) } } diff --git a/src/main.rs b/src/main.rs index 7cb23a0..8a974be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -297,9 +297,11 @@ async fn main() -> std::result::Result<(), Box> { // Periodic updater: getProxyConfig + proxy-secret let pool_clone2 = pool.clone(); + let rng_clone2 = rng.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_config_updater( pool_clone2, + rng_clone2, std::time::Duration::from_secs(12 * 3600), ) .await; diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 3c16d2f..aed5a54 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -10,6 +10,7 @@ use crate::error::Result; use super::MePool; use super::secret::download_proxy_secret; +use crate::crypto::SecureRandom; #[derive(Debug, Clone, Default)] pub struct ProxyConfigData { @@ -49,8 +50,10 @@ pub async fn fetch_proxy_config(url: &str) -> Result { Ok(ProxyConfigData { map, default_dc }) } -pub async fn me_config_updater(pool: Arc, interval: Duration) { +pub async fn me_config_updater(pool: Arc, rng: Arc, interval: Duration) { let mut tick = tokio::time::interval(interval); + // skip immediate tick to avoid double-fetch right after startup + tick.tick().await; loop { tick.tick().await; @@ -61,7 +64,8 @@ pub async fn me_config_updater(pool: Arc, interval: Duration) { pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); } if changed { - info!("ME config updated (v4)"); + info!("ME config updated (v4), reconciling connections"); + pool.reconcile_connections(&rng).await; } else { debug!("ME config v4 unchanged"); } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index bebd0b1..8f0f5a6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -3,32 +3,42 @@ use std::sync::Arc; use std::time::Duration; use tracing::{debug, info, warn}; +use rand::seq::SliceRandom; use crate::crypto::SecureRandom; use super::MePool; -pub async fn me_health_monitor(pool: Arc, rng: Arc, min_connections: usize) { +pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { loop { tokio::time::sleep(Duration::from_secs(30)).await; - let current = pool.connection_count(); - if current < min_connections { - warn!( - current, - min = min_connections, - "ME pool below minimum, reconnecting..." - ); - let map = pool.proxy_map_v4.read().await.clone(); - for (_dc, addrs) in map.iter() { - for &(ip, port) in addrs { - let needed = min_connections.saturating_sub(pool.connection_count()); - if needed == 0 { - break; - } - let addr = SocketAddr::new(ip, port); + // Per-DC coverage check + let map = pool.proxy_map_v4.read().await.clone(); + let writer_addrs: std::collections::HashSet = pool + .writers + .read() + .await + .iter() + .map(|(a, _)| *a) + .collect(); + + for (dc, addrs) in map.iter() { + let dc_addrs: Vec = addrs + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect(); + let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); + if !has_coverage { + warn!(dc = %dc, "DC has no ME coverage, reconnecting..."); + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for addr in shuffled { match pool.connect_one(addr, &rng).await { - Ok(()) => info!(%addr, "ME reconnected"), - Err(e) => debug!(%addr, error = %e, "ME reconnect failed"), + Ok(()) => { + info!(%addr, dc = %dc, "ME reconnected for DC coverage"); + break; + } + Err(e) => debug!(%addr, dc = %dc, error = %e, "ME reconnect failed"), } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index f095b32..f38a81c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -6,6 +6,7 @@ use std::time::Duration; use bytes::BytesMut; use rand::Rng; +use rand::seq::SliceRandom; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::{Mutex, RwLock}; @@ -89,6 +90,30 @@ impl MePool { self.writers.clone() } + pub async fn reconcile_connections(&self, rng: &SecureRandom) { + use std::collections::HashSet; + let map = self.proxy_map_v4.read().await.clone(); + let writers = self.writers.read().await; + let current: HashSet = writers.iter().map(|(a, _)| *a).collect(); + drop(writers); + + for (_dc, addrs) in map.iter() { + let dc_addrs: Vec = addrs + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect(); + if !dc_addrs.iter().any(|a| current.contains(a)) { + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for addr in shuffled { + if self.connect_one(addr, rng).await.is_ok() { + break; + } + } + } + } + } + pub async fn update_proxy_maps( &self, new_v4: HashMap>, @@ -127,8 +152,9 @@ impl MePool { } pub async fn reconnect_all(&self) { - let mut ws = self.writers.write().await; - ws.clear(); + // Graceful: do not drop all at once. New connections will use updated secret. + // Existing writers remain until health monitor replaces them. + // No-op here to avoid total outage. } async fn key_selector(&self) -> u32 { @@ -151,30 +177,39 @@ impl MePool { "Initializing ME pool" ); - // Ensure at least one connection per DC + // Ensure at least one connection per DC with failover over all addresses for (dc, addrs) in map.iter() { if addrs.is_empty() { continue; } - // round-robin first address - let &(ip, port) = addrs.get(0).unwrap(); - let addr = SocketAddr::new(ip, port); - match self.connect_one(addr, rng).await { - Ok(()) => info!(%addr, dc = %dc, "ME connected"), - Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed"), + let mut connected = false; + let mut shuffled = addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for (ip, port) in shuffled { + let addr = SocketAddr::new(ip, port); + match self.connect_one(addr, rng).await { + Ok(()) => { + info!(%addr, dc = %dc, "ME connected"); + connected = true; + break; + } + Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"), + } + } + if !connected { + warn!(dc = %dc, "All ME servers for DC failed at init"); } } - // Additional connections up to pool_size total + // Additional connections up to pool_size total (round-robin across DCs) for (dc, addrs) in map.iter() { - for (i, (ip, port)) in addrs.iter().enumerate().skip(1) { + for (ip, port) in addrs { if self.connection_count() >= pool_size { break; } let addr = SocketAddr::new(*ip, *port); - match self.connect_one(addr, rng).await { - Ok(()) => info!(%addr, dc = %dc, idx = i, "ME connected"), - Err(e) => warn!(%addr, dc = %dc, idx = i, error = %e, "ME connect failed"), + if let Err(e) = self.connect_one(addr, rng).await { + debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed"); } } if self.connection_count() >= pool_size { @@ -189,7 +224,7 @@ impl MePool { } pub(crate) async fn connect_one( - self: &Arc, + &self, addr: SocketAddr, rng: &SecureRandom, ) -> Result<()> { diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index b5848be..633d0af 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -145,7 +145,7 @@ async fn fetch_stun_binding(stun_addr: &str) -> Result>>, + writers: RwLock>>>, next_id: AtomicU64, } @@ -16,6 +20,7 @@ impl ConnRegistry { let start = rand::random::() | 1; Self { map: RwLock::new(HashMap::new()), + writers: RwLock::new(HashMap::new()), next_id: AtomicU64::new(start), } } @@ -29,6 +34,7 @@ impl ConnRegistry { pub async fn unregister(&self, id: u64) { self.map.write().await.remove(&id); + self.writers.write().await.remove(&id); } pub async fn route(&self, id: u64, resp: MeResponse) -> bool { @@ -39,4 +45,14 @@ impl ConnRegistry { false } } + + pub async fn set_writer(&self, id: u64, w: Arc>) { + let mut guard = self.writers.write().await; + guard.entry(id).or_insert_with(|| w); + } + + pub async fn get_writer(&self, id: u64) -> Option>> { + let guard = self.writers.read().await; + guard.get(&id).cloned() + } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 302b187..29e6e50 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,11 +6,13 @@ use tokio::sync::Mutex; use tracing::{debug, warn}; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::{RPC_CLOSE_EXT_U32}; +use crate::protocol::constants::RPC_CLOSE_EXT_U32; use super::MePool; use super::codec::RpcWriter; use super::wire::build_proxy_req_payload; +use crate::crypto::SecureRandom; +use rand::seq::SliceRandom; impl MePool { pub async fn send_proxy_req( @@ -39,9 +41,28 @@ impl MePool { let writers: Vec<(SocketAddr, Arc>)> = ws.iter().cloned().collect(); drop(ws); - let candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await; + let mut candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await; if candidate_indices.is_empty() { - return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + // Emergency: try to connect to target DC addresses on the fly, then recompute writers + let map = self.proxy_map_v4.read().await; + if let Some(addrs) = map.get(&(target_dc as i32)) { + let mut shuffled = addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + drop(map); + for (ip, port) in shuffled { + let addr = SocketAddr::new(ip, port); + if self.connect_one(addr, &SecureRandom::new()).await.is_ok() { + break; + } + } + let ws2 = self.writers.read().await; + let writers: Vec<(SocketAddr, Arc>)> = ws2.iter().cloned().collect(); + drop(ws2); + candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await; + } + if candidate_indices.is_empty() { + return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + } } let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); @@ -85,10 +106,7 @@ impl MePool { } pub async fn send_close(&self, conn_id: u64) -> Result<()> { - let ws = self.writers.read().await; - if !ws.is_empty() { - let w = ws[0].1.clone(); - drop(ws); + if let Some(w) = self.registry.get_writer(conn_id).await { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); @@ -97,6 +115,8 @@ impl MePool { let mut ws = self.writers.write().await; ws.retain(|(_, o)| !Arc::ptr_eq(o, &w)); } + } else { + debug!(conn_id, "ME close skipped (writer missing)"); } self.registry.unregister(conn_id).await; diff --git a/src/transport/middle_proxy/wire.rs b/src/transport/middle_proxy/wire.rs index cecfb98..3f78f20 100644 --- a/src/transport/middle_proxy/wire.rs +++ b/src/transport/middle_proxy/wire.rs @@ -58,7 +58,7 @@ pub(crate) fn build_proxy_req_payload( append_mapped_addr_and_port(&mut b, client_addr); append_mapped_addr_and_port(&mut b, our_addr); - if proto_flags & 12 != 0 { + if proto_flags & RPC_FLAG_HAS_AD_TAG != 0 { let extra_start = b.len(); b.extend_from_slice(&0u32.to_le_bytes());