From a8c3128c500811d2bd552151ad5828d2440bac24 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 14 Feb 2026 01:51:10 +0300 Subject: [PATCH] Middle Proxy Magics Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/mod.rs | 1 + src/transport/middle_proxy/pool.rs | 91 ++++++---------------- src/transport/middle_proxy/registry.rs | 4 +- src/transport/middle_proxy/send.rs | 102 +++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 68 deletions(-) create mode 100644 src/transport/middle_proxy/send.rs diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 8aad640..6cb38cb 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -5,6 +5,7 @@ mod health; mod pool; mod reader; mod registry; +mod send; mod secret; mod wire; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index f256aab..c915886 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::AtomicU64; use std::time::Duration; use bytes::BytesMut; @@ -20,13 +20,15 @@ use super::codec::{ cbc_encrypt_padded, parse_nonce_payload, read_rpc_frame_plaintext, }; use super::reader::reader_loop; -use super::wire::{IpMaterial, build_proxy_req_payload, extract_ip_material}; +use super::wire::{IpMaterial, extract_ip_material}; + +const ME_ACTIVE_PING_SECS: u64 = 25; pub struct MePool { - registry: Arc, - writers: Arc>>>>, - rr: AtomicU64, - proxy_tag: Option>, + pub(super) registry: Arc, + pub(super) writers: Arc>>>>, + pub(super) rr: AtomicU64, + pub(super) proxy_tag: Option>, proxy_secret: Vec, nat_ip: Option, pool_size: usize, @@ -351,6 +353,8 @@ impl MePool { let reg = self.registry.clone(); let w_pong = rpc_w.clone(); let w_pool = self.writers_arc(); + let w_ping = rpc_w.clone(); + let w_pool_ping = self.writers_arc(); tokio::spawn(async move { if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf, w_pong.clone()).await @@ -361,71 +365,24 @@ impl MePool { ws.retain(|w| !Arc::ptr_eq(w, &w_pong)); info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); - - Ok(()) - } - - pub async fn send_proxy_req( - &self, - conn_id: u64, - client_addr: SocketAddr, - our_addr: SocketAddr, - data: &[u8], - proto_flags: u32, - ) -> Result<()> { - let payload = build_proxy_req_payload( - conn_id, - client_addr, - our_addr, - data, - self.proxy_tag.as_deref(), - proto_flags, - ); - - loop { - let ws = self.writers.read().await; - if ws.is_empty() { - return Err(ProxyError::Proxy("All ME connections dead".into())); - } - - let idx = self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len(); - let w = ws[idx].clone(); - drop(ws); - - match w.lock().await.send(&payload).await { - Ok(()) => return Ok(()), - Err(e) => { - warn!(error = %e, "ME write failed, removing dead conn"); - let mut ws = self.writers.write().await; - ws.retain(|o| !Arc::ptr_eq(o, &w)); - if ws.is_empty() { - return Err(ProxyError::Proxy("All ME connections dead".into())); - } + tokio::spawn(async move { + let mut ping_id: i64 = rand::random::(); + loop { + tokio::time::sleep(Duration::from_secs(ME_ACTIVE_PING_SECS)).await; + let mut p = Vec::with_capacity(12); + p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); + p.extend_from_slice(&ping_id.to_le_bytes()); + ping_id = ping_id.wrapping_add(1); + if let Err(e) = w_ping.lock().await.send(&p).await { + debug!(error = %e, "Active ME ping failed, removing dead writer"); + let mut ws = w_pool_ping.write().await; + ws.retain(|w| !Arc::ptr_eq(w, &w_ping)); + break; } } - } - } + }); - pub async fn send_close(&self, conn_id: u64) -> Result<()> { - let ws = self.writers.read().await; - if !ws.is_empty() { - let w = ws[0].clone(); - drop(ws); - let mut p = Vec::with_capacity(12); - p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); - p.extend_from_slice(&conn_id.to_le_bytes()); - if let Err(e) = w.lock().await.send(&p).await { - debug!(error = %e, "ME close write failed"); - let mut ws = self.writers.write().await; - ws.retain(|o| !Arc::ptr_eq(o, &w)); - } - } - - self.registry.unregister(conn_id).await; Ok(()) } - pub fn connection_count(&self) -> usize { - self.writers.try_read().map(|w| w.len()).unwrap_or(0) - } } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 40c2803..1f13025 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -12,9 +12,11 @@ pub struct ConnRegistry { impl ConnRegistry { pub fn new() -> Self { + // Avoid fully predictable conn_id sequence from 1. + let start = rand::random::() | 1; Self { map: RwLock::new(HashMap::new()), - next_id: AtomicU64::new(1), + next_id: AtomicU64::new(start), } } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs new file mode 100644 index 0000000..12082e5 --- /dev/null +++ b/src/transport/middle_proxy/send.rs @@ -0,0 +1,102 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::Ordering; + +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +use crate::error::{ProxyError, Result}; +use crate::protocol::constants::RPC_CLOSE_EXT_U32; + +use super::MePool; +use super::codec::RpcWriter; +use super::wire::build_proxy_req_payload; + +impl MePool { + pub async fn send_proxy_req( + &self, + conn_id: u64, + client_addr: SocketAddr, + our_addr: SocketAddr, + data: &[u8], + proto_flags: u32, + ) -> Result<()> { + let payload = build_proxy_req_payload( + conn_id, + client_addr, + our_addr, + data, + self.proxy_tag.as_deref(), + proto_flags, + ); + + loop { + let ws = self.writers.read().await; + if ws.is_empty() { + return Err(ProxyError::Proxy("All ME connections dead".into())); + } + let writers: Vec>> = ws.iter().cloned().collect(); + let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % writers.len(); + drop(ws); + + // Prefer immediately available writer to avoid waiting on stalled connection. + for offset in 0..writers.len() { + let idx = (start + offset) % writers.len(); + let w = writers[idx].clone(); + if let Ok(mut guard) = w.try_lock() { + let send_res = guard.send(&payload).await; + drop(guard); + match send_res { + Ok(()) => return Ok(()), + Err(e) => { + warn!(error = %e, "ME write failed, removing dead conn"); + let mut ws = self.writers.write().await; + ws.retain(|o| !Arc::ptr_eq(o, &w)); + if ws.is_empty() { + return Err(ProxyError::Proxy("All ME connections dead".into())); + } + continue; + } + } + } + } + + // All writers are currently busy, wait for the selected one. + let w = writers[start].clone(); + match w.lock().await.send(&payload).await { + Ok(()) => return Ok(()), + Err(e) => { + warn!(error = %e, "ME write failed, removing dead conn"); + let mut ws = self.writers.write().await; + ws.retain(|o| !Arc::ptr_eq(o, &w)); + if ws.is_empty() { + return Err(ProxyError::Proxy("All ME connections dead".into())); + } + } + } + } + } + + pub async fn send_close(&self, conn_id: u64) -> Result<()> { + let ws = self.writers.read().await; + if !ws.is_empty() { + let w = ws[0].clone(); + drop(ws); + let mut p = Vec::with_capacity(12); + p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); + p.extend_from_slice(&conn_id.to_le_bytes()); + if let Err(e) = w.lock().await.send(&p).await { + debug!(error = %e, "ME close write failed"); + let mut ws = self.writers.write().await; + ws.retain(|o| !Arc::ptr_eq(o, &w)); + } + } + + self.registry.unregister(conn_id).await; + Ok(()) + } + + pub fn connection_count(&self) -> usize { + self.writers.try_read().map(|w| w.len()).unwrap_or(0) + } +}