From a494dfa9eb4e03cc976b33be55fc23a226ce5f8f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 13 Feb 2026 03:51:36 +0300 Subject: [PATCH] Middle Proxy Drafts Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- config.toml | 4 +- src/crypto/mod.rs | 2 +- src/protocol/constants.rs | 49 ++++- src/proxy/client.rs | 1 + src/proxy/handshake.rs | 7 +- src/transport/middle_proxy.rs | 371 ++++++++++++++++++++++++++++++++++ src/transport/mod.rs | 3 +- 7 files changed, 430 insertions(+), 7 deletions(-) create mode 100644 src/transport/middle_proxy.rs diff --git a/config.toml b/config.toml index aa31105..7344dc5 100644 --- a/config.toml +++ b/config.toml @@ -44,7 +44,7 @@ client_ack = 300 # === Anti-Censorship & Masking === [censorship] -tls_domain = "google.ru" +tls_domain = "petrovich.ru" mask = true mask_port = 443 # mask_host = "petrovich.ru" # Defaults to tls_domain if not set @@ -75,6 +75,6 @@ weight = 10 # [[upstreams]] # type = "socks5" -# address = "127.0.0.1:9050" +# address = "127.0.0.1:1080" # enabled = false # weight = 1 \ No newline at end of file diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index dfc2be6..ee97b27 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -5,5 +5,5 @@ pub mod hash; pub mod random; pub use aes::{AesCtr, AesCbc}; -pub use hash::{sha256, sha256_hmac, sha1, md5, crc32}; +pub use hash::{sha256, sha256_hmac, sha1, md5, crc32, derive_middleproxy_keys}; pub use random::SecureRandom; \ No newline at end of file diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index 7451c83..c309a0c 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -228,7 +228,54 @@ pub mod rpc_flags { pub const FLAG_QUICKACK: u32 = 0x80000000; } -#[cfg(test)] + + // ============= Middle-End Proxy Servers ============= + pub const ME_PROXY_PORT: u16 = 8888; + + pub static TG_MIDDLE_PROXIES_FLAT_V4: LazyLock> = LazyLock::new(|| { + vec![ + (IpAddr::V4(Ipv4Addr::new(149, 154, 175, 50)), 8888), + (IpAddr::V4(Ipv4Addr::new(149, 154, 161, 144)), 8888), + (IpAddr::V4(Ipv4Addr::new(149, 154, 175, 100)), 8888), + (IpAddr::V4(Ipv4Addr::new(91, 108, 4, 136)), 8888), + (IpAddr::V4(Ipv4Addr::new(91, 108, 56, 183)), 8888), + ] + }); + + // ============= RPC Constants (u32 native endian) ============= + // From mtproto-common.h + net-tcp-rpc-common.h + mtproto-proxy.c + + pub const RPC_NONCE_U32: u32 = 0x7acb87aa; + pub const RPC_HANDSHAKE_U32: u32 = 0x7682eef5; + pub const RPC_HANDSHAKE_ERROR_U32: u32 = 0x6a27beda; + pub const TL_PROXY_TAG_U32: u32 = 0xdb1e26ae; // mtproto-proxy.c:121 + + // mtproto-common.h + pub const RPC_PROXY_REQ_U32: u32 = 0x36cef1ee; + pub const RPC_PROXY_ANS_U32: u32 = 0x4403da0d; + pub const RPC_CLOSE_CONN_U32: u32 = 0x1fcf425d; + pub const RPC_CLOSE_EXT_U32: u32 = 0x5eb634a2; + pub const RPC_SIMPLE_ACK_U32: u32 = 0x3bac409b; + + pub const RPC_CRYPTO_NONE_U32: u32 = 0; + pub const RPC_CRYPTO_AES_U32: u32 = 1; + + pub mod proxy_flags { + pub const FLAG_HAS_AD_TAG: u32 = 1; + pub const FLAG_NOT_ENCRYPTED: u32 = 0x2; + pub const FLAG_HAS_AD_TAG2: u32 = 0x8; + pub const FLAG_MAGIC: u32 = 0x1000; + pub const FLAG_EXTMODE2: u32 = 0x20000; + pub const FLAG_PAD: u32 = 0x8000000; + pub const FLAG_INTERMEDIATE: u32 = 0x20000000; + pub const FLAG_ABRIDGED: u32 = 0x40000000; + pub const FLAG_QUICKACK: u32 = 0x80000000; + } + + pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5; + pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10; + + #[cfg(test)] mod tests { use super::*; diff --git a/src/proxy/client.rs b/src/proxy/client.rs index adcb25b..78db056 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -372,6 +372,7 @@ impl RunningClientHandler { ) -> Result<(CryptoReader, CryptoWriter)> { let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce( success.proto_tag, + success.dc_idx, &success.dec_key, success.dec_iv, rng, diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 81814e2..ad52775 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -256,6 +256,7 @@ where /// Generate nonce for Telegram connection pub fn generate_tg_nonce( proto_tag: ProtoTag, + dc_idx: i16, client_dec_key: &[u8; 32], client_dec_iv: u128, rng: &SecureRandom, @@ -274,6 +275,8 @@ pub fn generate_tg_nonce( if RESERVED_NONCE_CONTINUES.contains(&continue_four) { continue; } nonce[PROTO_TAG_POS..PROTO_TAG_POS + 4].copy_from_slice(&proto_tag.to_bytes()); + // CRITICAL: write dc_idx so upstream DC knows where to route + nonce[DC_IDX_POS..DC_IDX_POS + 2].copy_from_slice(&dc_idx.to_le_bytes()); if fast_mode { nonce[SKIP_LEN..SKIP_LEN + KEY_LEN].copy_from_slice(client_dec_key); @@ -320,7 +323,7 @@ mod tests { let rng = SecureRandom::new(); let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) = - generate_tg_nonce(ProtoTag::Secure, &client_dec_key, client_dec_iv, &rng, false); + generate_tg_nonce(ProtoTag::Secure, 2, &client_dec_key, client_dec_iv, &rng, false); assert_eq!(nonce.len(), HANDSHAKE_LEN); @@ -335,7 +338,7 @@ mod tests { let rng = SecureRandom::new(); let (nonce, _, _, _, _) = - generate_tg_nonce(ProtoTag::Secure, &client_dec_key, client_dec_iv, &rng, false); + generate_tg_nonce(ProtoTag::Secure, 2, &client_dec_key, client_dec_iv, &rng, false); let encrypted = encrypt_tg_nonce(&nonce); diff --git a/src/transport/middle_proxy.rs b/src/transport/middle_proxy.rs new file mode 100644 index 0000000..68603bf --- /dev/null +++ b/src/transport/middle_proxy.rs @@ -0,0 +1,371 @@ +//! Middle Proxy RPC Transport + //! + //! Implements Telegram Middle-End RPC protocol for routing to ALL DCs (including CDN). + //! Uses existing crypto primitives from crate::crypto. + + use std::collections::HashMap; + use std::net::{IpAddr, SocketAddr}; + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::Duration; + use bytes::{Bytes, BytesMut}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; + use tokio::sync::{mpsc, Mutex, RwLock}; + use tokio::time::{timeout, Instant}; + use tracing::{debug, info, trace, warn}; + + use crate::crypto::{sha1, crc32, derive_middleproxy_keys, AesCbc, SecureRandom}; + use crate::error::{ProxyError, Result}; + use crate::protocol::constants::*; + + // ========== RPC Nonce (32 bytes) ========== + + fn build_nonce_packet(key_selector: u32, crypto_ts: u32, nonce: &[u8; 16]) -> [u8; 32] { + let mut p = [0u8; 32]; + p[0..4].copy_from_slice(&RPC_NONCE_U32.to_le_bytes()); + p[4..8].copy_from_slice(&key_selector.to_le_bytes()); + p[8..12].copy_from_slice(&RPC_CRYPTO_AES_U32.to_le_bytes()); + p[12..16].copy_from_slice(&crypto_ts.to_le_bytes()); + p[16..32].copy_from_slice(nonce); + p + } + + fn parse_nonce_response(d: &[u8; 32]) -> Result<(u32, u32, [u8; 16])> { + let t = u32::from_le_bytes([d[0], d[1], d[2], d[3]]); + if t != RPC_NONCE_U32 { + return Err(ProxyError::InvalidHandshake(format!("Expected RPC_NONCE, got 0x{:08x}", t))); + } + let schema = u32::from_le_bytes([d[8], d[9], d[10], d[11]]); + let ts = u32::from_le_bytes([d[12], d[13], d[14], d[15]]); + let mut nonce = [0u8; 16]; + nonce.copy_from_slice(&d[16..32]); + Ok((schema, ts, nonce)) + } + + // ========== RPC Handshake (32 bytes) ========== + + fn build_handshake_packet() -> [u8; 32] { + let mut p = [0u8; 32]; + p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes()); + // flags=0, sender_pid with our PID + let pid = (std::process::id() & 0xFFFF) as u16; + p[14..16].copy_from_slice(&pid.to_le_bytes()); + let utime = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default().as_secs() as u32; + p[16..20].copy_from_slice(&utime.to_le_bytes()); + p + } + + // ========== CRC32 RPC Frame ========== + + fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec { + let total = (4 + 4 + payload.len() + 4) as u32; + let mut f = Vec::with_capacity(total as usize); + f.extend_from_slice(&total.to_le_bytes()); + f.extend_from_slice(&seq_no.to_le_bytes()); + f.extend_from_slice(payload); + let c = crc32(&f); + f.extend_from_slice(&c.to_le_bytes()); + f + } + + // ========== RPC_PROXY_REQ ========== + + fn build_proxy_req_payload( + conn_id: u64, + client_addr: SocketAddr, + our_addr: SocketAddr, + data: &[u8], + proxy_tag: Option<&[u8]>, + ) -> Vec { + let mut flags: u32 = proxy_flags::FLAG_HAS_AD_TAG2 | proxy_flags::FLAG_EXTMODE2; + if proxy_tag.is_some() { + flags |= proxy_flags::FLAG_HAS_AD_TAG; + } + + let extra_words: u32 = if let Some(tag) = proxy_tag { + let tl_len = 1 + tag.len(); + let padded = (tl_len + 3) / 4; + (1 + padded) as u32 + } else { 0 }; + + let mut b = Vec::with_capacity(64 + data.len()); + b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); + b.extend_from_slice(&flags.to_le_bytes()); + b.extend_from_slice(&conn_id.to_le_bytes()); + + // Client IP + match client_addr.ip() { + IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()), + IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()), + } + b.extend_from_slice(&(client_addr.port() as u32).to_le_bytes()); + // Our IP + match our_addr.ip() { + IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()), + IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()), + } + b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes()); + b.extend_from_slice(&extra_words.to_le_bytes()); + + if let Some(tag) = proxy_tag { + b.extend_from_slice(&TL_PROXY_TAG_U32.to_le_bytes()); + b.push(tag.len() as u8); + b.extend_from_slice(tag); + let pad = (4 - ((1 + tag.len()) % 4)) % 4; + b.extend(std::iter::repeat(0u8).take(pad)); + } + + b.extend_from_slice(data); + b + } + + // ========== ME Response ========== + + #[derive(Debug)] + pub enum MeResponse { + Data(Bytes), + Ack(u32), + Close, + } + + // ========== Connection Registry ========== + + pub struct ConnRegistry { + map: RwLock>>, + next_id: AtomicU64, + } + + impl ConnRegistry { + pub fn new() -> Self { + Self { map: RwLock::new(HashMap::new()), next_id: AtomicU64::new(1) } + } + pub async fn register(&self) -> (u64, mpsc::Receiver) { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = mpsc::channel(256); + self.map.write().await.insert(id, tx); + (id, rx) + } + pub async fn unregister(&self, id: u64) { + self.map.write().await.remove(&id); + } + pub async fn route(&self, id: u64, resp: MeResponse) -> bool { + let m = self.map.read().await; + if let Some(tx) = m.get(&id) { tx.send(resp).await.is_ok() } else { false } + } + } + + // ========== RPC Writer (streaming CBC) ========== + + struct RpcWriter { + writer: tokio::io::WriteHalf, + key: [u8; 32], + iv: [u8; 16], + seq_no: i32, + } + + impl RpcWriter { + async fn send(&mut self, payload: &[u8]) -> Result<()> { + let frame = build_rpc_frame(self.seq_no, payload); + self.seq_no += 1; + let pad = (16 - (frame.len() % 16)) % 16; + let mut buf = frame; + buf.extend(std::iter::repeat(0u8).take(pad)); + + let cipher = AesCbc::new(self.key, self.iv); + cipher.encrypt_in_place(&mut buf) + .map_err(|e| ProxyError::Crypto(format!("{}", e)))?; + if buf.len() >= 16 { + self.iv.copy_from_slice(&buf[buf.len() - 16..]); + } + self.writer.write_all(&buf).await.map_err(ProxyError::Io) + } + } + + // ========== ME Pool ========== + + pub struct MePool { + registry: Arc, + writers: RwLock>>>, + rr: AtomicU64, + proxy_tag: Option>, + } + + impl MePool { + pub fn new(proxy_tag: Option>) -> Arc { + Arc::new(Self { + registry: Arc::new(ConnRegistry::new()), + writers: RwLock::new(Vec::new()), + rr: AtomicU64::new(0), + proxy_tag, + }) + } + pub fn registry(&self) -> &Arc { &self.registry } + + pub async fn init(self: &Arc, pool_size: usize, secret: &[u8], rng: &SecureRandom) -> Result<()> { + let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4; + info!(me_servers = addrs.len(), pool_size, "Initializing ME pool"); + for &(ip, port) in addrs.iter().take(3) { + for i in 0..pool_size { + let addr = SocketAddr::new(ip, port); + match self.connect(addr, secret, rng).await { + Ok(()) => info!(%addr, idx = i, "ME connected"), + Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"), + } + } + } + if self.writers.read().await.is_empty() { + return Err(ProxyError::Proxy("No ME connections".into())); + } + Ok(()) + } + + async fn connect(self: &Arc, addr: SocketAddr, secret: &[u8], rng: &SecureRandom) -> Result<()> { + let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), TcpStream::connect(addr)) + .await.map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })? + .map_err(ProxyError::Io)?; + stream.set_nodelay(true).ok(); + let (mut rd, mut wr) = tokio::io::split(stream); + + // Nonce exchange + let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap(); + let crypto_ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs() as u32; + let sh = sha1(secret); + let ks = u32::from_le_bytes([sh[0], sh[1], sh[2], sh[3]]); + wr.write_all(&build_nonce_packet(ks, crypto_ts, &my_nonce)).await.map_err(ProxyError::Io)?; + + let mut resp = [0u8; 32]; + timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut resp)) + .await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?; + let (schema, _srv_ts, srv_nonce) = parse_nonce_response(&resp)?; + if schema != RPC_CRYPTO_AES_U32 { + return Err(ProxyError::InvalidHandshake(format!("Unsupported crypto: {}", schema))); + } + + // Key derivation via existing derive_middleproxy_keys + let ts_bytes = crypto_ts.to_le_bytes(); + let (wk, wi) = derive_middleproxy_keys( + &srv_nonce, &my_nonce, &ts_bytes, + None, &0u16.to_le_bytes(), b"CLIENT", None, &0u16.to_le_bytes(), secret, None, None, + ); + let (rk, ri) = derive_middleproxy_keys( + &srv_nonce, &my_nonce, &ts_bytes, + None, &0u16.to_le_bytes(), b"SERVER", None, &0u16.to_le_bytes(), secret, None, None, + ); + + // Handshake + wr.write_all(&build_handshake_packet()).await.map_err(ProxyError::Io)?; + let mut hs = [0u8; 32]; + timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut hs)) + .await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?; + let ht = u32::from_le_bytes([hs[0], hs[1], hs[2], hs[3]]); + if ht == RPC_HANDSHAKE_ERROR_U32 { + return Err(ProxyError::InvalidHandshake("ME rejected handshake".into())); + } + if ht != RPC_HANDSHAKE_U32 { + return Err(ProxyError::InvalidHandshake(format!("Got 0x{:08x}", ht))); + } + + info!(%addr, "RPC handshake OK"); + + self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter { writer: wr, key: wk, iv: wi, seq_no: 0 }))); + + let reg = self.registry.clone(); + tokio::spawn(async move { + if let Err(e) = reader_loop(rd, rk, ri, reg).await { + warn!(error = %e, "ME reader ended"); + } + }); + Ok(()) + } + + pub async fn send_proxy_req(&self, conn_id: u64, client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8]) -> Result<()> { + let ws = self.writers.read().await; + if ws.is_empty() { return Err(ProxyError::Proxy("No ME connections".into())); } + let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone(); + drop(ws); + let payload = build_proxy_req_payload(conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref()); + w.lock().await.send(&payload).await + } + + pub async fn send_close(&self, conn_id: u64) -> Result<()> { + let ws = self.writers.read().await; + if !ws.is_empty() { + let w = ws[0].clone(); + drop(ws); + let mut p = Vec::with_capacity(12); + p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); + p.extend_from_slice(&conn_id.to_le_bytes()); + let _ = w.lock().await.send(&p).await; + } + self.registry.unregister(conn_id).await; + Ok(()) + } + } + + // ========== Reader Loop ========== + + async fn reader_loop( + mut rd: tokio::io::ReadHalf, + dk: [u8; 32], mut div: [u8; 16], + reg: Arc, + ) -> Result<()> { + let mut raw = BytesMut::with_capacity(65536); + let mut dec = BytesMut::new(); + loop { + let mut tmp = [0u8; 16384]; + let n = rd.read(&mut tmp).await.map_err(ProxyError::Io)?; + if n == 0 { return Ok(()); } + raw.extend_from_slice(&tmp[..n]); + + let blocks = raw.len() / 16 * 16; + if blocks > 0 { + let mut new_iv = [0u8; 16]; + new_iv.copy_from_slice(&raw[blocks - 16..blocks]); + let mut chunk = vec![0u8; blocks]; + chunk.copy_from_slice(&raw[..blocks]); + AesCbc::new(dk, div).decrypt_in_place(&mut chunk) + .map_err(|e| ProxyError::Crypto(format!("{}", e)))?; + div = new_iv; + dec.extend_from_slice(&chunk); + let _ = raw.split_to(blocks); + } + + while dec.len() >= 12 { + let fl = u32::from_le_bytes([dec[0], dec[1], dec[2], dec[3]]) as usize; + if fl == 4 { let _ = dec.split_to(4); continue; } + if fl < 12 || fl > (1 << 24) { dec.clear(); break; } + if dec.len() < fl { break; } + let frame = dec.split_to(fl); + // CRC32 check + let pe = fl - 4; + let ec = u32::from_le_bytes([frame[pe], frame[pe+1], frame[pe+2], frame[pe+3]]); + if crc32(&frame[..pe]) != ec { warn!("CRC mismatch"); continue; } + let payload = &frame[8..pe]; + if payload.len() < 4 { continue; } + let pt = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]); + let body = &payload[4..]; + + if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 { + let flags = u32::from_le_bytes(body[0..4].try_into().unwrap()); + let cid = u64::from_le_bytes(body[4..12].try_into().unwrap()); + let data = Bytes::copy_from_slice(&body[12..]); + trace!(cid, len = data.len(), flags, "ANS"); + reg.route(cid, MeResponse::Data(data)).await; + } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { + let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); + let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap()); + trace!(cid, cfm, "ACK"); + reg.route(cid, MeResponse::Ack(cfm)).await; + } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { + let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); + debug!(cid, "CLOSE_EXT"); + reg.route(cid, MeResponse::Close).await; + reg.unregister(cid).await; + } + } + } + } + \ No newline at end of file diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 2b507d5..10becae 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -10,4 +10,5 @@ pub use pool::ConnectionPool; pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol}; pub use socket::*; pub use socks::*; -pub use upstream::{UpstreamManager, StartupPingResult, DcPingResult}; \ No newline at end of file +pub use upstream::{UpstreamManager, StartupPingResult, DcPingResult}; +pub mod middle_proxy;