From d405756b9475f38ac83f902f36db8fcdee6434d8 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 14 Feb 2026 01:52:49 +0300 Subject: [PATCH] HOL Minimized + Random conn_id + Target DC Magics Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/middle_relay.rs | 9 +++- src/transport/middle_proxy/pool.rs | 11 ++--- src/transport/middle_proxy/send.rs | 66 +++++++++++++++++++++++++----- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index fe15d32..0882d0e 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -61,7 +61,14 @@ where Ok(Some(payload)) => { trace!(conn_id, bytes = payload.len(), "C->ME frame"); stats.add_user_octets_from(&user, payload.len() as u64); - me_pool.send_proxy_req(conn_id, peer, translated_local_addr, &payload, proto_flags).await?; + me_pool.send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + &payload, + proto_flags, + ).await?; } Ok(None) => { debug!(conn_id, "Client EOF"); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index c915886..c59df36 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -26,7 +26,7 @@ const ME_ACTIVE_PING_SECS: u64 = 25; pub struct MePool { pub(super) registry: Arc, - pub(super) writers: Arc>>>>, + pub(super) writers: Arc>)>>> , pub(super) rr: AtomicU64, pub(super) proxy_tag: Option>, proxy_secret: Vec, @@ -82,7 +82,8 @@ impl MePool { } } - fn writers_arc(&self) -> Arc>>>> { + fn writers_arc(&self) -> Arc>)>>> + { self.writers.clone() } @@ -348,7 +349,7 @@ impl MePool { iv: write_iv, seq_no: 0, })); - self.writers.write().await.push(rpc_w.clone()); + self.writers.write().await.push((addr, rpc_w.clone())); let reg = self.registry.clone(); let w_pong = rpc_w.clone(); @@ -362,7 +363,7 @@ impl MePool { warn!(error = %e, "ME reader ended"); } let mut ws = w_pool.write().await; - ws.retain(|w| !Arc::ptr_eq(w, &w_pong)); + ws.retain(|(_, w)| !Arc::ptr_eq(w, &w_pong)); info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); tokio::spawn(async move { @@ -376,7 +377,7 @@ impl MePool { if let Err(e) = w_ping.lock().await.send(&p).await { debug!(error = %e, "Active ME ping failed, removing dead writer"); let mut ws = w_pool_ping.write().await; - ws.retain(|w| !Arc::ptr_eq(w, &w_ping)); + ws.retain(|(_, w)| !Arc::ptr_eq(w, &w_ping)); break; } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 12082e5..192a560 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,7 +6,7 @@ use tokio::sync::Mutex; use tracing::{debug, warn}; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::RPC_CLOSE_EXT_U32; +use crate::protocol::constants::{RPC_CLOSE_EXT_U32, TG_MIDDLE_PROXIES_V4}; use super::MePool; use super::codec::RpcWriter; @@ -16,6 +16,7 @@ impl MePool { pub async fn send_proxy_req( &self, conn_id: u64, + target_dc: i16, client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8], @@ -35,14 +36,20 @@ impl MePool { if ws.is_empty() { return Err(ProxyError::Proxy("All ME connections dead".into())); } - let writers: Vec>> = ws.iter().cloned().collect(); - let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % writers.len(); + let writers: Vec<(SocketAddr, Arc>)> = ws.iter().cloned().collect(); drop(ws); + let candidate_indices = candidate_indices_for_dc(&writers, target_dc); + if candidate_indices.is_empty() { + return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + } + let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); + // Prefer immediately available writer to avoid waiting on stalled connection. - for offset in 0..writers.len() { - let idx = (start + offset) % writers.len(); - let w = writers[idx].clone(); + for offset in 0..candidate_indices.len() { + let cidx = (start + offset) % candidate_indices.len(); + let idx = candidate_indices[cidx]; + let w = writers[idx].1.clone(); if let Ok(mut guard) = w.try_lock() { let send_res = guard.send(&payload).await; drop(guard); @@ -51,7 +58,7 @@ impl MePool { Err(e) => { warn!(error = %e, "ME write failed, removing dead conn"); let mut ws = self.writers.write().await; - ws.retain(|o| !Arc::ptr_eq(o, &w)); + ws.retain(|(_, o)| !Arc::ptr_eq(o, &w)); if ws.is_empty() { return Err(ProxyError::Proxy("All ME connections dead".into())); } @@ -62,13 +69,13 @@ impl MePool { } // All writers are currently busy, wait for the selected one. - let w = writers[start].clone(); + let w = writers[candidate_indices[start]].1.clone(); match w.lock().await.send(&payload).await { Ok(()) => return Ok(()), Err(e) => { warn!(error = %e, "ME write failed, removing dead conn"); let mut ws = self.writers.write().await; - ws.retain(|o| !Arc::ptr_eq(o, &w)); + ws.retain(|(_, o)| !Arc::ptr_eq(o, &w)); if ws.is_empty() { return Err(ProxyError::Proxy("All ME connections dead".into())); } @@ -80,7 +87,7 @@ impl MePool { pub async fn send_close(&self, conn_id: u64) -> Result<()> { let ws = self.writers.read().await; if !ws.is_empty() { - let w = ws[0].clone(); + let w = ws[0].1.clone(); drop(ws); let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); @@ -88,7 +95,7 @@ impl MePool { if let Err(e) = w.lock().await.send(&p).await { debug!(error = %e, "ME close write failed"); let mut ws = self.writers.write().await; - ws.retain(|o| !Arc::ptr_eq(o, &w)); + ws.retain(|(_, o)| !Arc::ptr_eq(o, &w)); } } @@ -100,3 +107,40 @@ impl MePool { self.writers.try_read().map(|w| w.len()).unwrap_or(0) } } + +fn candidate_indices_for_dc( + writers: &[(SocketAddr, Arc>)], + target_dc: i16, +) -> Vec { + let mut preferred = Vec::::new(); + let key = target_dc as i32; + if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&key) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + if preferred.is_empty() { + let abs = key.abs(); + if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&abs) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + } + if preferred.is_empty() { + let abs = key.abs(); + if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&-abs) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + } + if preferred.is_empty() { + return (0..writers.len()).collect(); + } + + let mut out = Vec::new(); + for (idx, (addr, _)) in writers.iter().enumerate() { + if preferred.iter().any(|p| p == addr) { + out.push(idx); + } + } + if out.is_empty() { + return (0..writers.len()).collect(); + } + out +}