diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index c309a0c..5ea7c66 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -201,35 +201,55 @@ pub static RESERVED_NONCE_CONTINUES: &[[u8; 4]] = &[ // ============= RPC Constants (for Middle Proxy) ============= -/// RPC Proxy Request -pub const RPC_PROXY_REQ: [u8; 4] = [0xee, 0xf1, 0xce, 0x36]; -/// RPC Proxy Answer -pub const RPC_PROXY_ANS: [u8; 4] = [0x0d, 0xda, 0x03, 0x44]; -/// RPC Close Extended -pub const RPC_CLOSE_EXT: [u8; 4] = [0xa2, 0x34, 0xb6, 0x5e]; -/// RPC Simple ACK -pub const RPC_SIMPLE_ACK: [u8; 4] = [0x9b, 0x40, 0xac, 0x3b]; -/// RPC Unknown -pub const RPC_UNKNOWN: [u8; 4] = [0xdf, 0xa2, 0x30, 0x57]; -/// RPC Handshake -pub const RPC_HANDSHAKE: [u8; 4] = [0xf5, 0xee, 0x82, 0x76]; -/// RPC Nonce -pub const RPC_NONCE: [u8; 4] = [0xaa, 0x87, 0xcb, 0x7a]; +/// RPC Proxy Request bytes (LE for 0x36cef1ee) +pub const RPC_PROXY_REQ_BYTES: [u8; 4] = [0xee, 0xf1, 0xce, 0x36]; +/// RPC Proxy Answer bytes (LE for 0x4403da0d) +pub const RPC_PROXY_ANS_BYTES: [u8; 4] = [0x0d, 0xda, 0x03, 0x44]; +/// RPC Close Extended bytes (LE for 0x5eb634a2) +pub const RPC_CLOSE_EXT_BYTES: [u8; 4] = [0xa2, 0x34, 0xb6, 0x5e]; +/// RPC Simple ACK bytes (LE for 0x3bac409b) +pub const RPC_SIMPLE_ACK_BYTES: [u8; 4] = [0x9b, 0x40, 0xac, 0x3b]; +/// RPC Unknown bytes (LE for 0x5730a2df) +pub const RPC_UNKNOWN_BYTES: [u8; 4] = [0xdf, 0xa2, 0x30, 0x57]; +/// RPC Handshake bytes (LE for 0x7682eef5) +pub const RPC_HANDSHAKE_BYTES: [u8; 4] = [0xf5, 0xee, 0x82, 0x76]; +/// RPC Nonce bytes (LE for 0x7acb87aa) +pub const RPC_NONCE_BYTES: [u8; 4] = [0xaa, 0x87, 0xcb, 0x7a]; -/// RPC Flags +// === RPC Proxy Flags (from Erlang mtp_rpc.erl) === +pub const RPC_FLAG_NOT_ENCRYPTED: u32 = 0x2; +pub const RPC_FLAG_HAS_AD_TAG: u32 = 0x8; +pub const RPC_FLAG_MAGIC: u32 = 0x1000; +pub const RPC_FLAG_EXTMODE2: u32 = 0x20000; +pub const RPC_FLAG_PAD: u32 = 0x8000000; +pub const RPC_FLAG_INTERMEDIATE: u32 = 0x20000000; +pub const RPC_FLAG_ABRIDGED: u32 = 0x40000000; +pub const RPC_FLAG_QUICKACK: u32 = 0x80000000; + +pub const RPC_PROXY_REQ: u32 = 0x36cef1ee; +pub const RPC_PROXY_ANS: u32 = 0x4403da0d; +pub const RPC_CLOSE_CONN: u32 = 0x1fcf425d; +pub const RPC_CLOSE_EXT: u32 = 0x5eb634a2; +pub const RPC_SIMPLE_ACK: u32 = 0x3bac409b; +pub const TL_PROXY_TAG: u32 = 0xdb1e26ae; + +pub const RPC_PING_U32: u32 = 0x5730a2df; +pub const RPC_PONG_U32: u32 = 0x8430eaa7; + +/// Backward-compatible names for legacy call sites. pub mod rpc_flags { - pub const FLAG_NOT_ENCRYPTED: u32 = 0x2; - pub const FLAG_HAS_AD_TAG: u32 = 0x8; - pub const FLAG_MAGIC: u32 = 0x1000; - pub const FLAG_EXTMODE2: u32 = 0x20000; - pub const FLAG_PAD: u32 = 0x8000000; - pub const FLAG_INTERMEDIATE: u32 = 0x20000000; - pub const FLAG_ABRIDGED: u32 = 0x40000000; - pub const FLAG_QUICKACK: u32 = 0x80000000; + use super::*; + pub const FLAG_NOT_ENCRYPTED: u32 = RPC_FLAG_NOT_ENCRYPTED; + pub const FLAG_HAS_AD_TAG: u32 = RPC_FLAG_HAS_AD_TAG; + pub const FLAG_MAGIC: u32 = RPC_FLAG_MAGIC; + pub const FLAG_EXTMODE2: u32 = RPC_FLAG_EXTMODE2; + pub const FLAG_PAD: u32 = RPC_FLAG_PAD; + pub const FLAG_INTERMEDIATE: u32 = RPC_FLAG_INTERMEDIATE; + pub const FLAG_ABRIDGED: u32 = RPC_FLAG_ABRIDGED; + pub const FLAG_QUICKACK: u32 = RPC_FLAG_QUICKACK; } - - // ============= Middle-End Proxy Servers ============= +// ============= Middle-End Proxy Servers ============= pub const ME_PROXY_PORT: u16 = 8888; pub static TG_MIDDLE_PROXIES_FLAT_V4: LazyLock> = LazyLock::new(|| { @@ -242,40 +262,41 @@ pub mod rpc_flags { ] }); - // ============= RPC Constants (u32 native endian) ============= - // From mtproto-common.h + net-tcp-rpc-common.h + mtproto-proxy.c +// ============= RPC Constants (u32 native endian) ============= +// From mtproto-common.h + net-tcp-rpc-common.h + mtproto-proxy.c + +pub const RPC_NONCE_U32: u32 = 0x7acb87aa; +pub const RPC_HANDSHAKE_U32: u32 = 0x7682eef5; +pub const RPC_HANDSHAKE_ERROR_U32: u32 = 0x6a27beda; +pub const TL_PROXY_TAG_U32: u32 = TL_PROXY_TAG; // mtproto-proxy.c:121 + +// mtproto-common.h +pub const RPC_PROXY_REQ_U32: u32 = RPC_PROXY_REQ; +pub const RPC_PROXY_ANS_U32: u32 = RPC_PROXY_ANS; +pub const RPC_CLOSE_CONN_U32: u32 = RPC_CLOSE_CONN; +pub const RPC_CLOSE_EXT_U32: u32 = RPC_CLOSE_EXT; +pub const RPC_SIMPLE_ACK_U32: u32 = RPC_SIMPLE_ACK; - pub const RPC_NONCE_U32: u32 = 0x7acb87aa; - pub const RPC_HANDSHAKE_U32: u32 = 0x7682eef5; - pub const RPC_HANDSHAKE_ERROR_U32: u32 = 0x6a27beda; - pub const TL_PROXY_TAG_U32: u32 = 0xdb1e26ae; // mtproto-proxy.c:121 - - // mtproto-common.h - pub const RPC_PROXY_REQ_U32: u32 = 0x36cef1ee; - pub const RPC_PROXY_ANS_U32: u32 = 0x4403da0d; - pub const RPC_CLOSE_CONN_U32: u32 = 0x1fcf425d; - pub const RPC_CLOSE_EXT_U32: u32 = 0x5eb634a2; - pub const RPC_SIMPLE_ACK_U32: u32 = 0x3bac409b; - - pub const RPC_CRYPTO_NONE_U32: u32 = 0; - pub const RPC_CRYPTO_AES_U32: u32 = 1; - - pub mod proxy_flags { - pub const FLAG_HAS_AD_TAG: u32 = 1; - pub const FLAG_NOT_ENCRYPTED: u32 = 0x2; - pub const FLAG_HAS_AD_TAG2: u32 = 0x8; - pub const FLAG_MAGIC: u32 = 0x1000; - pub const FLAG_EXTMODE2: u32 = 0x20000; - pub const FLAG_PAD: u32 = 0x8000000; - pub const FLAG_INTERMEDIATE: u32 = 0x20000000; - pub const FLAG_ABRIDGED: u32 = 0x40000000; - pub const FLAG_QUICKACK: u32 = 0x80000000; - } - - pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5; - pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10; - - #[cfg(test)] +pub const RPC_CRYPTO_NONE_U32: u32 = 0; +pub const RPC_CRYPTO_AES_U32: u32 = 1; + +pub mod proxy_flags { + use super::*; + pub const FLAG_HAS_AD_TAG: u32 = 1; + pub const FLAG_NOT_ENCRYPTED: u32 = RPC_FLAG_NOT_ENCRYPTED; + pub const FLAG_HAS_AD_TAG2: u32 = RPC_FLAG_HAS_AD_TAG; + pub const FLAG_MAGIC: u32 = RPC_FLAG_MAGIC; + pub const FLAG_EXTMODE2: u32 = RPC_FLAG_EXTMODE2; + pub const FLAG_PAD: u32 = RPC_FLAG_PAD; + pub const FLAG_INTERMEDIATE: u32 = RPC_FLAG_INTERMEDIATE; + pub const FLAG_ABRIDGED: u32 = RPC_FLAG_ABRIDGED; + pub const FLAG_QUICKACK: u32 = RPC_FLAG_QUICKACK; +} + +pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5; +pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10; + +#[cfg(test)] mod tests { use super::*; @@ -306,4 +327,4 @@ mod tests { assert_eq!(TG_DATACENTERS_V4.len(), 5); assert_eq!(TG_DATACENTERS_V6.len(), 5); } -} \ No newline at end of file +} diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 4a8d7ac..b90d447 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -14,7 +14,7 @@ use crate::protocol::tls; use crate::stats::{Stats, ReplayChecker}; use crate::transport::{configure_client_socket, UpstreamManager}; - use crate::transport::middle_proxy::{MePool, MeResponse}; + use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool}; use crate::crypto::{AesCtr, SecureRandom}; @@ -373,42 +373,58 @@ stats.increment_user_connects(&user); stats.increment_user_curr_connects(&user); - debug!(user = %user, conn_id, "ME relay started"); + let proto_flags = proto_flags_for_tag(Some(success.proto_tag as u32)); + let is_secure = matches!(success.proto_tag, ProtoTag::Secure); + debug!(user = %user, conn_id, proto_flags = format_args!("0x{:08x}", proto_flags), "ME relay started"); // Bidirectional relay loop: client ↔ ME pool // - // C→S direction: read raw bytes from client_reader, wrap in RPC_PROXY_REQ, send via ME - // S→C direction: receive MeResponse::Data from registry channel, write to client_writer + // C→S direction: read intermediate frame from client, send payload via RPC_PROXY_REQ + // S→C direction: receive raw payload from ME, add client intermediate framing // // We use tokio::select! to handle both directions concurrently. // Unlike direct mode (copy_bidirectional on two TCP streams), // here one side is a channel (mpsc::Receiver), not a stream. - let mut client_buf = vec![0u8; 64 * 1024]; let mut client_closed = false; let mut server_closed = false; let result: Result<()> = loop { tokio::select! { - // C→S: client sends data, we forward to ME - read_result = client_reader.read(&mut client_buf), if !client_closed => { + // C→S: client sends one intermediate frame, we forward payload to ME + read_result = async { + let mut len_buf = [0u8; 4]; + client_reader.read_exact(&mut len_buf).await?; + let raw_len = u32::from_le_bytes(len_buf); + let payload_len = (raw_len & 0x7fff_ffff) as usize; + if payload_len > MAX_MSG_LEN { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("client frame too large: {}", payload_len), + )); + } + + let mut payload = vec![0u8; payload_len]; + client_reader.read_exact(&mut payload).await?; + Ok::, std::io::Error>(payload) + }, if !client_closed => { match read_result { - Ok(0) => { + Ok(payload) => { + trace!(conn_id, bytes = payload.len(), "C frame -> ME payload"); + stats.add_user_octets_from(&user, payload.len() as u64); + if let Err(e) = me_pool.send_proxy_req( + conn_id, peer, our_addr, &payload, proto_flags + ).await { + break Err(e); + } + } + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { debug!(conn_id, "Client EOF"); client_closed = true; if server_closed { break Ok(()); } // Signal ME to close this connection let _ = me_pool.send_close(conn_id).await; } - Ok(n) => { - trace!(conn_id, bytes = n, "C→ME"); - stats.add_user_octets_from(&user, n as u64); - if let Err(e) = me_pool.send_proxy_req( - conn_id, peer, our_addr, &client_buf[..n] - ).await { - break Err(e); - } - } Err(e) => { debug!(conn_id, error = %e, "Client read error"); break Err(ProxyError::Io(e)); @@ -420,12 +436,30 @@ me_msg = me_rx.recv(), if !server_closed => { match me_msg { Some(MeResponse::Data(data)) => { - trace!(conn_id, bytes = data.len(), "ME→C"); - stats.add_user_octets_to(&user, data.len() as u64); + let mut frame_len = data.len() as u32; + let mut secure_padding = [0u8; 1]; + if is_secure && data.len() % 4 == 0 { + frame_len += 1; + secure_padding[0] = 0; + } + + trace!(conn_id, bytes = data.len(), frame_len, "ME payload -> C frame"); + stats.add_user_octets_to(&user, frame_len as u64 + 4); + + if let Err(e) = client_writer.write_all(&frame_len.to_le_bytes()).await { + debug!(conn_id, error = %e, "Client write header error"); + break Err(ProxyError::Io(e)); + } if let Err(e) = client_writer.write_all(&data).await { debug!(conn_id, error = %e, "Client write error"); break Err(ProxyError::Io(e)); } + if frame_len as usize > data.len() { + if let Err(e) = client_writer.write_all(&secure_padding).await { + debug!(conn_id, error = %e, "Client write padding error"); + break Err(ProxyError::Io(e)); + } + } if let Err(e) = client_writer.flush().await { break Err(ProxyError::Io(e)); } @@ -443,7 +477,6 @@ None => { // Channel closed — ME connection died debug!(conn_id, "ME channel closed"); - server_closed = true; if client_closed { break Ok(()); } break Err(ProxyError::Proxy("ME connection lost".into())); } @@ -580,4 +613,4 @@ )) } } - \ No newline at end of file + diff --git a/src/transport/middle_proxy.rs b/src/transport/middle_proxy.rs index b2eb872..6759bb7 100644 --- a/src/transport/middle_proxy.rs +++ b/src/transport/middle_proxy.rs @@ -249,12 +249,8 @@ fn ipv4_to_mapped_v6(ip: Ipv4Addr) -> [u8; 16] { fn addr_to_ip_u32(addr: &SocketAddr) -> u32 { match addr.ip() { - IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()), - IpAddr::V6(v6) => { - if let Some(v4) = v6.to_ipv4_mapped() { - u32::from_le_bytes(v4.octets()) - } else { 0 } - } + IpAddr::V4(v4) => u32::from_be_bytes(v4.octets()), + IpAddr::V6(_) => 0, } } @@ -338,12 +334,10 @@ fn build_proxy_req_payload( our_addr: SocketAddr, data: &[u8], proxy_tag: Option<&[u8]>, + proto_flags: u32, ) -> Vec { - let mut flags: u32 = 0; - let has_tag = proxy_tag.is_some(); - if has_tag { - flags |= 8; // C: `flags |= 8` when proxy_tag_set - } + // proto_flags are provided by caller and must already match C/Erlang transport flags. + let flags: u32 = proto_flags; let mut b = Vec::with_capacity(128 + data.len()); b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); @@ -365,7 +359,7 @@ fn build_proxy_req_payload( b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes()); // Extra section (proxy_tag) - if flags & 12 != 0 { + if flags & (RPC_FLAG_HAS_AD_TAG | 0x4) != 0 { let extra_start = b.len(); b.extend_from_slice(&0u32.to_le_bytes()); // placeholder @@ -400,7 +394,7 @@ fn build_proxy_req_payload( pub struct MePool { registry: Arc, - writers: RwLock>>>, + writers: Arc>>>>, rr: AtomicU64, proxy_tag: Option>, /// Telegram proxy-secret (binary, 32-512 bytes) @@ -412,7 +406,7 @@ impl MePool { pub fn new(proxy_tag: Option>, proxy_secret: Vec) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), - writers: RwLock::new(Vec::new()), + writers: Arc::new(RwLock::new(Vec::new())), rr: AtomicU64::new(0), proxy_tag, proxy_secret, @@ -424,6 +418,10 @@ impl MePool { &self.registry } + fn writers_arc(&self) -> Arc>>>> { + self.writers.clone() + } + /// key_selector = first 4 bytes of proxy-secret as LE u32 /// C: main_secret.key_signature via union { char secret[]; int key_signature; } fn key_selector(&self) -> u32 { @@ -692,18 +690,25 @@ impl MePool { info!(%addr, "RPC handshake OK"); // ===== 6. Setup writer + reader ===== - self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter { + let rpc_w = Arc::new(Mutex::new(RpcWriter { writer: wr, key: wk, iv: write_iv, seq_no: 0, - }))); + })); + self.writers.write().await.push(rpc_w.clone()); let reg = self.registry.clone(); + let w_pong = rpc_w.clone(); + let w_pool = self.writers_arc(); tokio::spawn(async move { - if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf).await { + if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf, w_pong.clone()).await { warn!(error = %e, "ME reader ended"); } + // Remove dead writer from pool + let mut ws = w_pool.write().await; + ws.retain(|w| !Arc::ptr_eq(w, &w_pong)); + info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); Ok(()) @@ -715,17 +720,32 @@ impl MePool { client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8], + proto_flags: u32, ) -> Result<()> { - let ws = self.writers.read().await; - if ws.is_empty() { - return Err(ProxyError::Proxy("No ME connections".into())); - } - let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone(); - drop(ws); let payload = build_proxy_req_payload( - conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref(), + conn_id, client_addr, our_addr, data, + self.proxy_tag.as_deref(), proto_flags, ); - w.lock().await.send(&payload).await + loop { + let ws = self.writers.read().await; + if ws.is_empty() { + return Err(ProxyError::Proxy("All ME connections dead".into())); + } + let idx = self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len(); + let w = ws[idx].clone(); + drop(ws); + match w.lock().await.send(&payload).await { + Ok(()) => return Ok(()), + Err(e) => { + warn!(error = %e, "ME write failed, removing dead conn"); + let mut ws = self.writers.write().await; + ws.retain(|o| !Arc::ptr_eq(o, &w)); + if ws.is_empty() { + return Err(ProxyError::Proxy("All ME connections dead".into())); + } + } + } + } } pub async fn send_close(&self, conn_id: u64) -> Result<()> { @@ -736,7 +756,11 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - let _ = w.lock().await.send(&p).await; + if let Err(e) = w.lock().await.send(&p).await { + debug!(error = %e, "ME close write failed"); + let mut ws = self.writers.write().await; + ws.retain(|o| !Arc::ptr_eq(o, &w)); + } } self.registry.unregister(conn_id).await; Ok(()) @@ -756,6 +780,7 @@ async fn reader_loop( reg: Arc, mut enc_leftover: BytesMut, mut dec: BytesMut, + writer: Arc>, ) -> Result<()> { let mut raw = enc_leftover; loop { @@ -824,6 +849,16 @@ async fn reader_loop( debug!(cid, "CLOSE_CONN from ME"); reg.route(cid, MeResponse::Close).await; reg.unregister(cid).await; + } else if pt == RPC_PING_U32 && body.len() >= 8 { + let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); + trace!(ping_id, "RPC_PING -> PONG"); + let mut pong = Vec::with_capacity(12); + pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); + pong.extend_from_slice(&ping_id.to_le_bytes()); + if let Err(e) = writer.lock().await.send(&pong).await { + warn!(error = %e, "PONG send failed"); + break; + } } else { debug!(rpc_type = format_args!("0x{:08x}", pt), len = body.len(), "Unknown RPC"); } @@ -831,6 +866,47 @@ async fn reader_loop( } } +// ========== Proto flags ========== + +/// Map ProtoTag to C-compatible RPC_PROXY_REQ transport flags. +/// Returned value already includes ad_tag/magic/extmode2. +pub fn proto_flags_for_tag(proto_tag: Option) -> u32 { + let mut flags = RPC_FLAG_HAS_AD_TAG | RPC_FLAG_MAGIC | RPC_FLAG_EXTMODE2; + match proto_tag { + Some(0xdddddddd) => { + // Secure + flags |= RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE; + } + Some(0xeeeeeeee) => { + // Intermediate + flags |= RPC_FLAG_INTERMEDIATE; + } + Some(0xefefefef) => { + // Abridged + flags |= RPC_FLAG_ABRIDGED; + } + _ => { + flags |= RPC_FLAG_INTERMEDIATE; + } + } + flags +} + +#[cfg(test)] +mod tests { + use super::proto_flags_for_tag; + + #[test] + fn proto_flags_secure_matches_reference() { + assert_eq!(proto_flags_for_tag(Some(0xdddddddd)), 0x28021008); + } + + #[test] + fn proto_flags_intermediate_matches_reference() { + assert_eq!(proto_flags_for_tag(Some(0xeeeeeeee)), 0x20021008); + } +} + // ========== Health Monitor (Phase 4) ========== pub async fn me_health_monitor(