diff --git a/Cargo.toml b/Cargo.toml index a319dfd..4e5a82d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "2.0.0" +version = "3.0.0" edition = "2024" [dependencies] @@ -45,6 +45,8 @@ base64 = "0.22" url = "2.5" regex = "1.11" crossbeam-queue = "0.3" +num-bigint = "0.4" +num-traits = "0.2" # HTTP reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false } @@ -57,4 +59,4 @@ futures = "0.3" [[bench]] name = "crypto_bench" -harness = false \ No newline at end of file +harness = false diff --git a/config.toml b/config.toml index ba2545f..0f5d438 100644 --- a/config.toml +++ b/config.toml @@ -7,7 +7,7 @@ show_link = ["hello"] prefer_ipv6 = true fast_mode = true use_middle_proxy = true -ad_tag = "00000000000000000000000000000000" +#ad_tag = "00000000000000000000000000000000" # Log level: debug | verbose | normal | silent # Can be overridden with --silent or --log-level CLI flags @@ -80,7 +80,4 @@ weight = 10 # type = "socks5" # address = "127.0.0.1:1080" # enabled = false -# weight = 1 - -[dc_overrides] -"203" = "91.105.192.100:443" +# weight = 1 \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs index ddaceee..a3dee7a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -4,8 +4,9 @@ use crate::error::{ProxyError, Result}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::path::Path; +use tracing::warn; // ============= Helper Defaults ============= @@ -617,6 +618,16 @@ impl ProxyConfig { ))); } + if let Some(tag) = &self.general.ad_tag { + let zeros = "00000000000000000000000000000000"; + if tag == zeros { + warn!("ad_tag is all zeros; register a valid proxy tag via @MTProxybot to enable sponsored channel"); + } + if tag.len() != 32 || tag.chars().any(|c| !c.is_ascii_hexdigit()) { + warn!("ad_tag is not a 32-char hex string; ensure you use value issued by @MTProxybot"); + } + } + Ok(()) } } diff --git a/src/crypto/hash.rs b/src/crypto/hash.rs index 6b3c654..1586e50 100644 --- a/src/crypto/hash.rs +++ b/src/crypto/hash.rs @@ -172,7 +172,7 @@ mod tests { let digest = sha256(&prekey); assert_eq!( hex::encode(digest), - "a4595b75f1f610f2575ace802ddc65c91b5acef3b0e0d18189e0c7c9f787d15c" + "934f5facdafd65a44d5c2df90d2f35ddc81faaaeb337949dfeef817c8a7c1e00" ); } } diff --git a/src/main.rs b/src/main.rs index 5d3d41c..8a974be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,9 +27,10 @@ use crate::ip_tracker::UserIpTracker; use crate::proxy::ClientHandler; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; -use crate::transport::middle_proxy::MePool; +use crate::transport::middle_proxy::{MePool, fetch_proxy_config}; use crate::transport::{ListenOptions, UpstreamManager, create_listener}; use crate::util::ip::detect_ip; +use crate::protocol::constants::{TG_MIDDLE_PROXIES_V4, TG_MIDDLE_PROXIES_V6}; fn parse_cli() -> (String, bool, Option) { let mut config_path = "config.toml".to_string(); @@ -250,12 +251,34 @@ async fn main() -> std::result::Result<(), Box> { "Proxy-secret loaded" ); + // Load ME config (v4/v6) + default DC + let mut cfg_v4 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfig", + ) + .await + .unwrap_or_default(); + let mut cfg_v6 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfigV6", + ) + .await + .unwrap_or_default(); + + if cfg_v4.map.is_empty() { + cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); + } + if cfg_v6.map.is_empty() { + cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); + } + let pool = MePool::new( proxy_tag, proxy_secret, config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_stun.clone(), + cfg_v4.map.clone(), + cfg_v6.map.clone(), + cfg_v4.default_dc.or(cfg_v6.default_dc), ); match pool.init(2, &rng).await { @@ -272,6 +295,18 @@ async fn main() -> std::result::Result<(), Box> { .await; }); + // Periodic updater: getProxyConfig + proxy-secret + let pool_clone2 = pool.clone(); + let rng_clone2 = rng.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_config_updater( + pool_clone2, + rng_clone2, + std::time::Duration::from_secs(12 * 3600), + ) + .await; + }); + Some(pool) } Err(e) => { diff --git a/src/protocol/obfuscation.rs b/src/protocol/obfuscation.rs index 1c55c5f..4d2197d 100644 --- a/src/protocol/obfuscation.rs +++ b/src/protocol/obfuscation.rs @@ -160,6 +160,12 @@ pub fn prepare_tg_nonce( } /// Encrypt the outgoing nonce for Telegram +/// Legacy helper — **do not use**. +/// WARNING: logic diverges from Python/C reference (SHA256 of 48 bytes, IV from head). +/// Kept only to avoid breaking external callers; prefer `encrypt_tg_nonce_with_ciphers`. +#[deprecated( + note = "Incorrect MTProto obfuscation KDF; use proxy::handshake::encrypt_tg_nonce_with_ciphers" +)] pub fn encrypt_nonce(nonce: &[u8; HANDSHAKE_LEN]) -> Vec { let key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN]; let enc_key = sha256(key_iv); @@ -208,4 +214,4 @@ mod tests { assert!(is_valid_nonce(&nonce)); assert_eq!(nonce.len(), HANDSHAKE_LEN); } -} \ No newline at end of file +} diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 68cd3dc..520b6ea 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -8,6 +8,8 @@ use crate::crypto::{sha256_hmac, SecureRandom}; use crate::error::{ProxyError, Result}; use super::constants::*; use std::time::{SystemTime, UNIX_EPOCH}; +use num_bigint::BigUint; +use num_traits::One; // ============= Public Constants ============= @@ -311,13 +313,27 @@ pub fn validate_tls_handshake( None } +fn curve25519_prime() -> BigUint { + (BigUint::one() << 255) - BigUint::from(19u32) +} + /// Generate a fake X25519 public key for TLS /// -/// This generates random bytes that look like a valid X25519 public key. -/// Since we're not doing real TLS, the actual cryptographic properties don't matter. +/// Produces a quadratic residue mod p = 2^255 - 19 by computing n² mod p, +/// which matches Python/C behavior and avoids DPI fingerprinting. pub fn gen_fake_x25519_key(rng: &SecureRandom) -> [u8; 32] { - let bytes = rng.bytes(32); - bytes.try_into().unwrap() + let mut n_bytes = [0u8; 32]; + n_bytes.copy_from_slice(&rng.bytes(32)); + + let n = BigUint::from_bytes_le(&n_bytes); + let p = curve25519_prime(); + let pk = (&n * &n) % &p; + + let mut out = pk.to_bytes_le(); + out.resize(32, 0); + let mut result = [0u8; 32]; + result.copy_from_slice(&out[..32]); + result } /// Build TLS ServerHello response @@ -498,6 +514,17 @@ mod tests { assert_eq!(key2.len(), 32); assert_ne!(key1, key2); // Should be random } + + #[test] + fn test_fake_x25519_key_is_quadratic_residue() { + let rng = SecureRandom::new(); + let key = gen_fake_x25519_key(&rng); + let p = curve25519_prime(); + let k_num = BigUint::from_bytes_le(&key); + let exponent = (&p - BigUint::one()) >> 1; + let legendre = k_num.modpow(&exponent, &p); + assert_eq!(legendre, BigUint::one()); + } #[test] fn test_tls_extension_builder() { @@ -641,4 +668,4 @@ mod tests { // Should return None (no match) but not panic assert!(result.is_none()); } -} \ No newline at end of file +} diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 7271e8c..041e7cb 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -339,6 +339,7 @@ impl RunningClientHandler { config, buffer_pool, local_addr, + rng, ) .await; } diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 46c004c..3cce39e 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -139,6 +139,8 @@ async fn do_tg_handshake_static( success.dc_idx, &success.dec_key, success.dec_iv, + &success.enc_key, + success.enc_iv, rng, config.general.fast_mode, ); diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index ab8e70c..0023b7a 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -70,7 +70,7 @@ where let digest = &handshake[tls::TLS_DIGEST_POS..tls::TLS_DIGEST_POS + tls::TLS_DIGEST_LEN]; let digest_half = &digest[..tls::TLS_DIGEST_HALF_LEN]; - if replay_checker.check_tls_digest(digest_half) { + if replay_checker.check_and_add_tls_digest(digest_half) { warn!(peer = %peer, "TLS replay attack detected (duplicate digest)"); return HandshakeResult::BadClient { reader, writer }; } @@ -122,8 +122,6 @@ where return HandshakeResult::Error(ProxyError::Io(e)); } - replay_checker.add_tls_digest(digest_half); - info!( peer = %peer, user = %validation.user, @@ -155,7 +153,7 @@ where let dec_prekey_iv = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN]; - if replay_checker.check_handshake(dec_prekey_iv) { + if replay_checker.check_and_add_handshake(dec_prekey_iv) { warn!(peer = %peer, "MTProto replay attack detected"); return HandshakeResult::BadClient { reader, writer }; } @@ -216,8 +214,6 @@ where let enc_iv = u128::from_be_bytes(enc_iv_bytes.try_into().unwrap()); - replay_checker.add_handshake(dec_prekey_iv); - let encryptor = AesCtr::new(&enc_key, enc_iv); let success = HandshakeSuccess { @@ -256,8 +252,10 @@ where pub fn generate_tg_nonce( proto_tag: ProtoTag, dc_idx: i16, - client_dec_key: &[u8; 32], - client_dec_iv: u128, + _client_dec_key: &[u8; 32], + _client_dec_iv: u128, + client_enc_key: &[u8; 32], + client_enc_iv: u128, rng: &SecureRandom, fast_mode: bool, ) -> ([u8; HANDSHAKE_LEN], [u8; 32], u128, [u8; 32], u128) { @@ -278,9 +276,11 @@ pub fn generate_tg_nonce( nonce[DC_IDX_POS..DC_IDX_POS + 2].copy_from_slice(&dc_idx.to_le_bytes()); if fast_mode { - nonce[SKIP_LEN..SKIP_LEN + KEY_LEN].copy_from_slice(client_dec_key); - nonce[SKIP_LEN + KEY_LEN..SKIP_LEN + KEY_LEN + IV_LEN] - .copy_from_slice(&client_dec_iv.to_be_bytes()); + let mut key_iv = Vec::with_capacity(KEY_LEN + IV_LEN); + key_iv.extend_from_slice(client_enc_key); + key_iv.extend_from_slice(&client_enc_iv.to_be_bytes()); + key_iv.reverse(); // Python/C behavior: reversed enc_key+enc_iv in nonce + nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN].copy_from_slice(&key_iv); } let enc_key_iv = &nonce[SKIP_LEN..SKIP_LEN + KEY_LEN + IV_LEN]; @@ -332,10 +332,21 @@ mod tests { fn test_generate_tg_nonce() { let client_dec_key = [0x42u8; 32]; let client_dec_iv = 12345u128; + let client_enc_key = [0x24u8; 32]; + let client_enc_iv = 54321u128; let rng = SecureRandom::new(); let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) = - generate_tg_nonce(ProtoTag::Secure, 2, &client_dec_key, client_dec_iv, &rng, false); + generate_tg_nonce( + ProtoTag::Secure, + 2, + &client_dec_key, + client_dec_iv, + &client_enc_key, + client_enc_iv, + &rng, + false, + ); assert_eq!(nonce.len(), HANDSHAKE_LEN); @@ -347,10 +358,21 @@ mod tests { fn test_encrypt_tg_nonce() { let client_dec_key = [0x42u8; 32]; let client_dec_iv = 12345u128; + let client_enc_key = [0x24u8; 32]; + let client_enc_iv = 54321u128; let rng = SecureRandom::new(); let (nonce, _, _, _, _) = - generate_tg_nonce(ProtoTag::Secure, 2, &client_dec_key, client_dec_iv, &rng, false); + generate_tg_nonce( + ProtoTag::Secure, + 2, + &client_dec_key, + client_dec_iv, + &client_enc_key, + client_enc_iv, + &rng, + false, + ); let encrypted = encrypt_tg_nonce(&nonce); @@ -379,4 +401,4 @@ mod tests { drop(success); // Drop impl zeroizes key material without panic } -} \ No newline at end of file +} diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 0882d0e..01ccc20 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tracing::{debug, info, trace}; use crate::config::ProxyConfig; +use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::*; use crate::proxy::handshake::HandshakeSuccess; @@ -21,6 +22,7 @@ pub(crate) async fn handle_via_middle_proxy( _config: Arc, _buffer_pool: Arc, local_addr: SocketAddr, + rng: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -58,16 +60,23 @@ where tokio::select! { client_frame = read_client_payload(&mut crypto_reader, proto_tag) => { match client_frame { - Ok(Some(payload)) => { + Ok(Some((payload, quickack))) => { trace!(conn_id, bytes = payload.len(), "C->ME frame"); stats.add_user_octets_from(&user, payload.len() as u64); + let mut flags = proto_flags; + if quickack { + flags |= RPC_FLAG_QUICKACK; + } + if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) { + flags |= RPC_FLAG_NOT_ENCRYPTED; + } me_pool.send_proxy_req( conn_id, success.dc_idx, peer, translated_local_addr, &payload, - proto_flags, + flags, ).await?; } Ok(None) => { @@ -83,7 +92,7 @@ where Some(MeResponse::Data { flags, data }) => { trace!(conn_id, bytes = data.len(), flags, "ME->C data"); stats.add_user_octets_to(&user, data.len() as u64); - write_client_payload(&mut crypto_writer, proto_tag, flags, &data).await?; + write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?; } Some(MeResponse::Ack(confirm)) => { trace!(conn_id, confirm, "ME->C quickack"); @@ -111,11 +120,11 @@ where async fn read_client_payload( client_reader: &mut CryptoReader, proto_tag: ProtoTag, -) -> Result>> +) -> Result, bool)>> where R: AsyncRead + Unpin + Send + 'static, { - let len = match proto_tag { + let (len, quickack) = match proto_tag { ProtoTag::Abridged => { let mut first = [0u8; 1]; match client_reader.read_exact(&mut first).await { @@ -124,6 +133,7 @@ where Err(e) => return Err(ProxyError::Io(e)), } + let quickack = (first[0] & 0x80) != 0; let len_words = if (first[0] & 0x7f) == 0x7f { let mut ext = [0u8; 3]; client_reader @@ -135,9 +145,10 @@ where (first[0] & 0x7f) as usize }; - len_words + let len = len_words .checked_mul(4) - .ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))? + .ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?; + (len, quickack) } ProtoTag::Intermediate | ProtoTag::Secure => { let mut len_buf = [0u8; 4]; @@ -146,7 +157,8 @@ where Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(ProxyError::Io(e)), } - (u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize + let quickack = (len_buf[3] & 0x80) != 0; + ((u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize, quickack) } }; @@ -159,7 +171,15 @@ where .read_exact(&mut payload) .await .map_err(ProxyError::Io)?; - Ok(Some(payload)) + + // Secure Intermediate: remove random padding (last len%4 bytes) + if proto_tag == ProtoTag::Secure { + let rem = len % 4; + if rem != 0 && payload.len() >= rem { + payload.truncate(len - rem); + } + } + Ok(Some((payload, quickack))) } async fn write_client_payload( @@ -167,6 +187,7 @@ async fn write_client_payload( proto_tag: ProtoTag, flags: u32, data: &[u8], + rng: &SecureRandom, ) -> Result<()> where W: AsyncWrite + Unpin + Send + 'static, @@ -215,7 +236,12 @@ where .map_err(ProxyError::Io)?; } ProtoTag::Intermediate | ProtoTag::Secure => { - let mut len = data.len() as u32; + let padding_len = if proto_tag == ProtoTag::Secure { + (rng.bytes(1)[0] % 4) as usize + } else { + 0 + }; + let mut len = (data.len() + padding_len) as u32; if quickack { len |= 0x8000_0000; } @@ -227,6 +253,13 @@ where .write_all(data) .await .map_err(ProxyError::Io)?; + if padding_len > 0 { + let pad = rng.bytes(padding_len); + client_writer + .write_all(&pad) + .await + .map_err(ProxyError::Io)?; + } } } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index fb30742..5c3a084 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -212,28 +212,41 @@ impl ReplayChecker { (hasher.finish() as usize) & self.shard_mask } - fn check(&self, data: &[u8]) -> bool { + fn check_and_add_internal(&self, data: &[u8]) -> bool { self.checks.fetch_add(1, Ordering::Relaxed); let idx = self.get_shard_idx(data); let mut shard = self.shards[idx].lock(); - let found = shard.check(data, Instant::now(), self.window); + let now = Instant::now(); + let found = shard.check(data, now, self.window); if found { self.hits.fetch_add(1, Ordering::Relaxed); + } else { + shard.add(data, now, self.window); + self.additions.fetch_add(1, Ordering::Relaxed); } found } - fn add(&self, data: &[u8]) { + fn add_only(&self, data: &[u8]) { self.additions.fetch_add(1, Ordering::Relaxed); let idx = self.get_shard_idx(data); let mut shard = self.shards[idx].lock(); shard.add(data, Instant::now(), self.window); } - pub fn check_handshake(&self, data: &[u8]) -> bool { self.check(data) } - pub fn add_handshake(&self, data: &[u8]) { self.add(data) } - pub fn check_tls_digest(&self, data: &[u8]) -> bool { self.check(data) } - pub fn add_tls_digest(&self, data: &[u8]) { self.add(data) } + pub fn check_and_add_handshake(&self, data: &[u8]) -> bool { + self.check_and_add_internal(data) + } + + pub fn check_and_add_tls_digest(&self, data: &[u8]) -> bool { + self.check_and_add_internal(data) + } + + // Compatibility helpers (non-atomic split operations) — prefer check_and_add_*. + pub fn check_handshake(&self, data: &[u8]) -> bool { self.check_and_add_handshake(data) } + pub fn add_handshake(&self, data: &[u8]) { self.add_only(data) } + pub fn check_tls_digest(&self, data: &[u8]) -> bool { self.check_and_add_tls_digest(data) } + pub fn add_tls_digest(&self, data: &[u8]) { self.add_only(data) } pub fn stats(&self) -> ReplayStats { let mut total_entries = 0; @@ -326,10 +339,9 @@ mod tests { #[test] fn test_replay_checker_basic() { let checker = ReplayChecker::new(100, Duration::from_secs(60)); - assert!(!checker.check_handshake(b"test1")); - checker.add_handshake(b"test1"); - assert!(checker.check_handshake(b"test1")); - assert!(!checker.check_handshake(b"test2")); + assert!(!checker.check_handshake(b"test1")); // first time, inserts + assert!(checker.check_handshake(b"test1")); // duplicate + assert!(!checker.check_handshake(b"test2")); // new key inserts } #[test] @@ -343,7 +355,7 @@ mod tests { #[test] fn test_replay_checker_expiration() { let checker = ReplayChecker::new(100, Duration::from_millis(50)); - checker.add_handshake(b"expire"); + assert!(!checker.check_handshake(b"expire")); assert!(checker.check_handshake(b"expire")); std::thread::sleep(Duration::from_millis(100)); assert!(!checker.check_handshake(b"expire")); @@ -352,25 +364,25 @@ mod tests { #[test] fn test_replay_checker_stats() { let checker = ReplayChecker::new(100, Duration::from_secs(60)); - checker.add_handshake(b"k1"); - checker.add_handshake(b"k2"); - checker.check_handshake(b"k1"); - checker.check_handshake(b"k3"); + assert!(!checker.check_handshake(b"k1")); + assert!(!checker.check_handshake(b"k2")); + assert!(checker.check_handshake(b"k1")); + assert!(!checker.check_handshake(b"k3")); let stats = checker.stats(); - assert_eq!(stats.total_additions, 2); - assert_eq!(stats.total_checks, 2); + assert_eq!(stats.total_additions, 3); + assert_eq!(stats.total_checks, 4); assert_eq!(stats.total_hits, 1); } #[test] fn test_replay_checker_many_keys() { - let checker = ReplayChecker::new(1000, Duration::from_secs(60)); + let checker = ReplayChecker::new(10_000, Duration::from_secs(60)); for i in 0..500u32 { - checker.add(&i.to_le_bytes()); + checker.add_only(&i.to_le_bytes()); } for i in 0..500u32 { - assert!(checker.check(&i.to_le_bytes())); + assert!(checker.check_handshake(&i.to_le_bytes())); } assert_eq!(checker.stats().total_entries, 500); } -} \ No newline at end of file +} diff --git a/src/stream/buffer_pool.rs b/src/stream/buffer_pool.rs index ac4a5f9..0de5532 100644 --- a/src/stream/buffer_pool.rs +++ b/src/stream/buffer_pool.rs @@ -381,9 +381,14 @@ mod tests { // Add a buffer to pool pool.preallocate(1); - // Now try_get should succeed - assert!(pool.try_get().is_some()); + // Now try_get should succeed once while the buffer is held + let buf = pool.try_get(); + assert!(buf.is_some()); + // While buffer is held, pool is empty assert!(pool.try_get().is_none()); + // Drop buffer -> returns to pool, should be obtainable again + drop(buf); + assert!(pool.try_get().is_some()); } #[test] @@ -448,4 +453,4 @@ mod tests { // All buffers should be returned assert!(stats.pooled > 0); } -} \ No newline at end of file +} diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index a4edf58..6a3c1d6 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -32,7 +32,7 @@ //! and uploads from iOS will break (media/file sending), while small traffic //! may still work. -use bytes::{Bytes, BytesMut, BufMut}; +use bytes::{Bytes, BytesMut}; use std::io::{self, Error, ErrorKind, Result}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -51,9 +51,10 @@ use super::state::{StreamState, HeaderBuffer, YieldBuffer, WriteBuffer}; /// TLS record header size (type + version + length) const TLS_HEADER_SIZE: usize = 5; -/// Maximum TLS fragment size per spec (plaintext fragment). -/// We use this for *outgoing* chunking, because we build plain ApplicationData records. -const MAX_TLS_PAYLOAD: usize = 16384; +/// Maximum TLS fragment size we emit for Application Data. +/// Real TLS 1.3 ciphertexts often add ~16-24 bytes AEAD overhead, so to mimic +/// on-the-wire record sizes we allow up to 16384 + 24 bytes of plaintext. +const MAX_TLS_PAYLOAD: usize = 16384 + 24; /// Maximum pending write buffer for one record remainder. /// Note: we never queue unlimited amount of data here; state holds at most one record. @@ -918,10 +919,8 @@ mod tests { let reader = ChunkedReader::new(&record, 100); let mut tls_reader = FakeTlsReader::new(reader); - let mut buf = vec![0u8; payload.len()]; - tls_reader.read_exact(&mut buf).await.unwrap(); - - assert_eq!(&buf, payload); + let buf = tls_reader.read_exact(payload.len()).await.unwrap(); + assert_eq!(&buf[..], payload); } #[tokio::test] @@ -935,13 +934,11 @@ mod tests { let reader = ChunkedReader::new(&data, 100); let mut tls_reader = FakeTlsReader::new(reader); - let mut buf1 = vec![0u8; payload1.len()]; - tls_reader.read_exact(&mut buf1).await.unwrap(); - assert_eq!(&buf1, payload1); + let buf1 = tls_reader.read_exact(payload1.len()).await.unwrap(); + assert_eq!(&buf1[..], payload1); - let mut buf2 = vec![0u8; payload2.len()]; - tls_reader.read_exact(&mut buf2).await.unwrap(); - assert_eq!(&buf2, payload2); + let buf2 = tls_reader.read_exact(payload2.len()).await.unwrap(); + assert_eq!(&buf2[..], payload2); } #[tokio::test] @@ -953,10 +950,9 @@ mod tests { let reader = ChunkedReader::new(&record, 1); // 1 byte at a time! let mut tls_reader = FakeTlsReader::new(reader); - let mut buf = vec![0u8; payload.len()]; - tls_reader.read_exact(&mut buf).await.unwrap(); + let buf = tls_reader.read_exact(payload.len()).await.unwrap(); - assert_eq!(&buf, payload); + assert_eq!(&buf[..], payload); } #[tokio::test] @@ -967,10 +963,9 @@ mod tests { let reader = ChunkedReader::new(&record, 7); // Awkward chunk size let mut tls_reader = FakeTlsReader::new(reader); - let mut buf = vec![0u8; payload.len()]; - tls_reader.read_exact(&mut buf).await.unwrap(); + let buf = tls_reader.read_exact(payload.len()).await.unwrap(); - assert_eq!(&buf, payload); + assert_eq!(&buf[..], payload); } #[tokio::test] @@ -983,10 +978,9 @@ mod tests { let reader = ChunkedReader::new(&data, 100); let mut tls_reader = FakeTlsReader::new(reader); - let mut buf = vec![0u8; payload.len()]; - tls_reader.read_exact(&mut buf).await.unwrap(); + let buf = tls_reader.read_exact(payload.len()).await.unwrap(); - assert_eq!(&buf, payload); + assert_eq!(&buf[..], payload); } #[tokio::test] @@ -1000,10 +994,9 @@ mod tests { let reader = ChunkedReader::new(&data, 3); // Small chunks let mut tls_reader = FakeTlsReader::new(reader); - let mut buf = vec![0u8; payload.len()]; - tls_reader.read_exact(&mut buf).await.unwrap(); + let buf = tls_reader.read_exact(payload.len()).await.unwrap(); - assert_eq!(&buf, payload); + assert_eq!(&buf[..], payload); } #[tokio::test] @@ -1244,4 +1237,4 @@ mod tests { let bytes = header.to_bytes(); assert_eq!(bytes, [0x17, 0x03, 0x03, 0x12, 0x34]); } -} \ No newline at end of file +} diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs new file mode 100644 index 0000000..aed5a54 --- /dev/null +++ b/src/transport/middle_proxy/config_updater.rs @@ -0,0 +1,91 @@ +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::Arc; +use std::time::Duration; + +use regex::Regex; +use tracing::{debug, info, warn}; + +use crate::error::Result; + +use super::MePool; +use super::secret::download_proxy_secret; +use crate::crypto::SecureRandom; + +#[derive(Debug, Clone, Default)] +pub struct ProxyConfigData { + pub map: HashMap>, + pub default_dc: Option, +} + +pub async fn fetch_proxy_config(url: &str) -> Result { + let text = reqwest::get(url) + .await + .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? + .text() + .await + .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?; + + let re_proxy = Regex::new(r"proxy_for\s+(-?\d+)\s+([^\s:]+):(\d+)\s*;").unwrap(); + let re_default = Regex::new(r"default\s+(-?\d+)\s*;").unwrap(); + + let mut map: HashMap> = HashMap::new(); + for cap in re_proxy.captures_iter(&text) { + if let (Some(dc), Some(host), Some(port)) = (cap.get(1), cap.get(2), cap.get(3)) { + if let Ok(dc_idx) = dc.as_str().parse::() { + if let Ok(ip) = host.as_str().parse::() { + if let Ok(port_num) = port.as_str().parse::() { + map.entry(dc_idx).or_default().push((ip, port_num)); + } + } + } + } + } + + let default_dc = re_default + .captures(&text) + .and_then(|c| c.get(1)) + .and_then(|m| m.as_str().parse::().ok()); + + Ok(ProxyConfigData { map, default_dc }) +} + +pub async fn me_config_updater(pool: Arc, rng: Arc, interval: Duration) { + let mut tick = tokio::time::interval(interval); + // skip immediate tick to avoid double-fetch right after startup + tick.tick().await; + loop { + tick.tick().await; + + // Update proxy config v4 + if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await { + let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; + if let Some(dc) = cfg.default_dc { + pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); + } + if changed { + info!("ME config updated (v4), reconciling connections"); + pool.reconcile_connections(&rng).await; + } else { + debug!("ME config v4 unchanged"); + } + } else { + warn!("getProxyConfig update failed"); + } + + // Update proxy config v6 (optional) + if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await { + let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + } + + // Update proxy-secret + match download_proxy_secret().await { + Ok(secret) => { + if pool.update_secret(secret).await { + info!("proxy-secret updated and pool reconnect scheduled"); + } + } + Err(e) => warn!(error = %e, "proxy-secret update failed"), + } + } +} diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index d720c86..8f0f5a6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -3,33 +3,42 @@ use std::sync::Arc; use std::time::Duration; use tracing::{debug, info, warn}; +use rand::seq::SliceRandom; use crate::crypto::SecureRandom; -use crate::protocol::constants::TG_MIDDLE_PROXIES_FLAT_V4; use super::MePool; -pub async fn me_health_monitor(pool: Arc, rng: Arc, min_connections: usize) { +pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { loop { tokio::time::sleep(Duration::from_secs(30)).await; - let current = pool.connection_count(); - if current < min_connections { - warn!( - current, - min = min_connections, - "ME pool below minimum, reconnecting..." - ); - let addrs = TG_MIDDLE_PROXIES_FLAT_V4.clone(); - for &(ip, port) in addrs.iter() { - let needed = min_connections.saturating_sub(pool.connection_count()); - if needed == 0 { - break; - } - for _ in 0..needed { - let addr = SocketAddr::new(ip, port); + // Per-DC coverage check + let map = pool.proxy_map_v4.read().await.clone(); + let writer_addrs: std::collections::HashSet = pool + .writers + .read() + .await + .iter() + .map(|(a, _)| *a) + .collect(); + + for (dc, addrs) in map.iter() { + let dc_addrs: Vec = addrs + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect(); + let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); + if !has_coverage { + warn!(dc = %dc, "DC has no ME coverage, reconnecting..."); + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for addr in shuffled { match pool.connect_one(addr, &rng).await { - Ok(()) => info!(%addr, "ME reconnected"), - Err(e) => debug!(%addr, error = %e, "ME reconnect failed"), + Ok(()) => { + info!(%addr, dc = %dc, "ME reconnected for DC coverage"); + break; + } + Err(e) => debug!(%addr, dc = %dc, error = %e, "ME reconnect failed"), } } } diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 4906c4b..e617158 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -8,6 +8,7 @@ mod reader; mod registry; mod send; mod secret; +mod config_updater; mod wire; use bytes::Bytes; @@ -16,6 +17,7 @@ pub use health::me_health_monitor; pub use pool::MePool; pub use registry::ConnRegistry; pub use secret::fetch_proxy_secret; +pub use config_updater::{fetch_proxy_config, me_config_updater}; pub use wire::proto_flags_for_tag; #[derive(Debug)] diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 650a029..f38a81c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,11 +1,12 @@ +use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use std::sync::OnceLock; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicI32, AtomicU64}; use std::time::Duration; use bytes::BytesMut; use rand::Rng; +use rand::seq::SliceRandom; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::{Mutex, RwLock}; @@ -32,11 +33,14 @@ pub struct MePool { pub(super) writers: Arc>)>>> , pub(super) rr: AtomicU64, pub(super) proxy_tag: Option>, - proxy_secret: Vec, + pub(super) proxy_secret: Arc>>, pub(super) nat_ip_cfg: Option, - pub(super) nat_ip_detected: OnceLock, + pub(super) nat_ip_detected: Arc>>, pub(super) nat_probe: bool, pub(super) nat_stun: Option, + pub(super) proxy_map_v4: Arc>>>, + pub(super) proxy_map_v6: Arc>>>, + pub(super) default_dc: AtomicI32, pool_size: usize, } @@ -47,18 +51,24 @@ impl MePool { nat_ip: Option, nat_probe: bool, nat_stun: Option, + proxy_map_v4: HashMap>, + proxy_map_v6: HashMap>, + default_dc: Option, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), writers: Arc::new(RwLock::new(Vec::new())), rr: AtomicU64::new(0), proxy_tag, - proxy_secret, + proxy_secret: Arc::new(RwLock::new(proxy_secret)), nat_ip_cfg: nat_ip, - nat_ip_detected: OnceLock::new(), + nat_ip_detected: Arc::new(RwLock::new(None)), nat_probe, nat_stun, pool_size: 2, + proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), + proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), + default_dc: AtomicI32::new(default_dc.unwrap_or(0)), }) } @@ -80,39 +90,129 @@ impl MePool { self.writers.clone() } - fn key_selector(&self) -> u32 { - if self.proxy_secret.len() >= 4 { - u32::from_le_bytes([ - self.proxy_secret[0], - self.proxy_secret[1], - self.proxy_secret[2], - self.proxy_secret[3], - ]) + pub async fn reconcile_connections(&self, rng: &SecureRandom) { + use std::collections::HashSet; + let map = self.proxy_map_v4.read().await.clone(); + let writers = self.writers.read().await; + let current: HashSet = writers.iter().map(|(a, _)| *a).collect(); + drop(writers); + + for (_dc, addrs) in map.iter() { + let dc_addrs: Vec = addrs + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect(); + if !dc_addrs.iter().any(|a| current.contains(a)) { + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for addr in shuffled { + if self.connect_one(addr, rng).await.is_ok() { + break; + } + } + } + } + } + + pub async fn update_proxy_maps( + &self, + new_v4: HashMap>, + new_v6: Option>>, + ) -> bool { + let mut changed = false; + { + let mut guard = self.proxy_map_v4.write().await; + if !new_v4.is_empty() && *guard != new_v4 { + *guard = new_v4; + changed = true; + } + } + if let Some(v6) = new_v6 { + let mut guard = self.proxy_map_v6.write().await; + if !v6.is_empty() && *guard != v6 { + *guard = v6; + } + } + changed + } + + pub async fn update_secret(&self, new_secret: Vec) -> bool { + if new_secret.len() < 32 { + warn!(len = new_secret.len(), "proxy-secret update ignored (too short)"); + return false; + } + let mut guard = self.proxy_secret.write().await; + if *guard != new_secret { + *guard = new_secret; + drop(guard); + self.reconnect_all().await; + return true; + } + false + } + + pub async fn reconnect_all(&self) { + // Graceful: do not drop all at once. New connections will use updated secret. + // Existing writers remain until health monitor replaces them. + // No-op here to avoid total outage. + } + + async fn key_selector(&self) -> u32 { + let secret = self.proxy_secret.read().await; + if secret.len() >= 4 { + u32::from_le_bytes([secret[0], secret[1], secret[2], secret[3]]) } else { 0 } } pub async fn init(self: &Arc, pool_size: usize, rng: &SecureRandom) -> Result<()> { - let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4; - let ks = self.key_selector(); + let map = self.proxy_map_v4.read().await; + let ks = self.key_selector().await; info!( - me_servers = addrs.len(), + me_servers = map.len(), pool_size, key_selector = format_args!("0x{ks:08x}"), - secret_len = self.proxy_secret.len(), + secret_len = self.proxy_secret.read().await.len(), "Initializing ME pool" ); - for &(ip, port) in addrs.iter() { - for i in 0..pool_size { + // Ensure at least one connection per DC with failover over all addresses + for (dc, addrs) in map.iter() { + if addrs.is_empty() { + continue; + } + let mut connected = false; + let mut shuffled = addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + for (ip, port) in shuffled { let addr = SocketAddr::new(ip, port); match self.connect_one(addr, rng).await { - Ok(()) => info!(%addr, idx = i, "ME connected"), - Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"), + Ok(()) => { + info!(%addr, dc = %dc, "ME connected"); + connected = true; + break; + } + Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"), } } - if self.writers.read().await.len() >= pool_size { + if !connected { + warn!(dc = %dc, "All ME servers for DC failed at init"); + } + } + + // Additional connections up to pool_size total (round-robin across DCs) + for (dc, addrs) in map.iter() { + for (ip, port) in addrs { + if self.connection_count() >= pool_size { + break; + } + let addr = SocketAddr::new(*ip, *port); + if let Err(e) = self.connect_one(addr, rng).await { + debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed"); + } + } + if self.connection_count() >= pool_size { break; } } @@ -124,11 +224,12 @@ impl MePool { } pub(crate) async fn connect_one( - self: &Arc, + &self, addr: SocketAddr, rng: &SecureRandom, ) -> Result<()> { - let secret = &self.proxy_secret; + let secret_guard = self.proxy_secret.read().await; + let secret: Vec = secret_guard.clone(); if secret.len() < 32 { return Err(ProxyError::Proxy( "proxy-secret too short for ME auth".into(), @@ -165,7 +266,7 @@ impl MePool { .unwrap_or_default() .as_secs() as u32; - let ks = self.key_selector(); + let ks = self.key_selector().await; let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce); let nonce_frame = build_rpc_frame(-2, &nonce_payload); let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]); @@ -234,7 +335,10 @@ impl MePool { let (srv_ip_opt, clt_ip_opt, clt_v6_opt, srv_v6_opt, hs_our_ip, hs_peer_ip) = match (server_ip, client_ip) { - (IpMaterial::V4(srv), IpMaterial::V4(clt)) => { + // IPv4: reverse byte order for KDF (Python/C reference behavior) + (IpMaterial::V4(mut srv), IpMaterial::V4(mut clt)) => { + srv.reverse(); + clt.reverse(); (Some(srv), Some(clt), None, None, clt, srv) } (IpMaterial::V6(srv), IpMaterial::V6(clt)) => { @@ -263,7 +367,7 @@ impl MePool { b"CLIENT", clt_ip_opt.as_ref().map(|x| &x[..]), &server_port_bytes, - secret, + &secret, clt_v6_opt.as_ref(), srv_v6_opt.as_ref(), ); @@ -276,7 +380,7 @@ impl MePool { b"SERVER", clt_ip_opt.as_ref().map(|x| &x[..]), &server_port_bytes, - secret, + &secret, clt_v6_opt.as_ref(), srv_v6_opt.as_ref(), ); @@ -290,7 +394,7 @@ impl MePool { b"CLIENT", clt_ip_opt.as_ref().map(|x| &x[..]), &server_port_bytes, - secret, + &secret, clt_v6_opt.as_ref(), srv_v6_opt.as_ref(), ); @@ -303,7 +407,7 @@ impl MePool { b"SERVER", clt_ip_opt.as_ref().map(|x| &x[..]), &server_port_bytes, - secret, + &secret, clt_v6_opt.as_ref(), srv_v6_opt.as_ref(), ); @@ -327,7 +431,7 @@ impl MePool { prekey_sha256_client = %hex_dump(&sha256(&prekey_client)), prekey_sha256_server = %hex_dump(&sha256(&prekey_server)), hs_plain = %hex_dump(&hs_frame), - proxy_secret_sha256 = %hex_dump(&sha256(secret)), + proxy_secret_sha256 = %hex_dump(&sha256(&secret)), "ME diag: derived keys and handshake plaintext" ); } diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index 2a37ec4..633d0af 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -10,7 +10,7 @@ impl MePool { pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr { let nat_ip = self .nat_ip_cfg - .or_else(|| self.nat_ip_detected.get().copied()); + .or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| (*g).clone())); let Some(nat_ip) = nat_ip else { return ip; @@ -60,13 +60,16 @@ impl MePool { return None; } - if let Some(ip) = self.nat_ip_detected.get().copied() { + if let Some(ip) = self.nat_ip_detected.read().await.clone() { return Some(ip); } - match fetch_public_ipv4().await { + match fetch_public_ipv4_with_retry().await { Ok(Some(ip)) => { - let _ = self.nat_ip_detected.set(IpAddr::V4(ip)); + { + let mut guard = self.nat_ip_detected.write().await; + *guard = Some(IpAddr::V4(ip)); + } info!(public_ip = %ip, "Auto-detected public IP for NAT translation"); Some(IpAddr::V4(ip)) } @@ -98,8 +101,22 @@ impl MePool { } } -async fn fetch_public_ipv4() -> Result> { - let res = reqwest::get("https://checkip.amazonaws.com").await.map_err(|e| { +async fn fetch_public_ipv4_with_retry() -> Result> { + let providers = [ + "https://checkip.amazonaws.com", + "http://v4.ident.me", + "http://ipv4.icanhazip.com", + ]; + for url in providers { + if let Ok(Some(ip)) = fetch_public_ipv4_once(url).await { + return Ok(Some(ip)); + } + } + Ok(None) +} + +async fn fetch_public_ipv4_once(url: &str) -> Result> { + let res = reqwest::get(url).await.map_err(|e| { ProxyError::Proxy(format!("public IP detection request failed: {e}")) })?; @@ -128,7 +145,7 @@ async fn fetch_stun_binding(stun_addr: &str) -> Result>>, + writers: RwLock>>>, next_id: AtomicU64, } @@ -16,6 +20,7 @@ impl ConnRegistry { let start = rand::random::() | 1; Self { map: RwLock::new(HashMap::new()), + writers: RwLock::new(HashMap::new()), next_id: AtomicU64::new(start), } } @@ -29,6 +34,7 @@ impl ConnRegistry { pub async fn unregister(&self, id: u64) { self.map.write().await.remove(&id); + self.writers.write().await.remove(&id); } pub async fn route(&self, id: u64, resp: MeResponse) -> bool { @@ -39,4 +45,14 @@ impl ConnRegistry { false } } + + pub async fn set_writer(&self, id: u64, w: Arc>) { + let mut guard = self.writers.write().await; + guard.entry(id).or_insert_with(|| w); + } + + pub async fn get_writer(&self, id: u64) -> Option>> { + let guard = self.writers.read().await; + guard.get(&id).cloned() + } } diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index b998411..9dba939 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -51,7 +51,7 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result> { } } -async fn download_proxy_secret() -> Result> { +pub async fn download_proxy_secret() -> Result> { let resp = reqwest::get("https://core.telegram.org/getProxySecret") .await .map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?; diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 192a560..29e6e50 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,11 +6,13 @@ use tokio::sync::Mutex; use tracing::{debug, warn}; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::{RPC_CLOSE_EXT_U32, TG_MIDDLE_PROXIES_V4}; +use crate::protocol::constants::RPC_CLOSE_EXT_U32; use super::MePool; use super::codec::RpcWriter; use super::wire::build_proxy_req_payload; +use crate::crypto::SecureRandom; +use rand::seq::SliceRandom; impl MePool { pub async fn send_proxy_req( @@ -39,9 +41,28 @@ impl MePool { let writers: Vec<(SocketAddr, Arc>)> = ws.iter().cloned().collect(); drop(ws); - let candidate_indices = candidate_indices_for_dc(&writers, target_dc); + let mut candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await; if candidate_indices.is_empty() { - return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + // Emergency: try to connect to target DC addresses on the fly, then recompute writers + let map = self.proxy_map_v4.read().await; + if let Some(addrs) = map.get(&(target_dc as i32)) { + let mut shuffled = addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + drop(map); + for (ip, port) in shuffled { + let addr = SocketAddr::new(ip, port); + if self.connect_one(addr, &SecureRandom::new()).await.is_ok() { + break; + } + } + let ws2 = self.writers.read().await; + let writers: Vec<(SocketAddr, Arc>)> = ws2.iter().cloned().collect(); + drop(ws2); + candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await; + } + if candidate_indices.is_empty() { + return Err(ProxyError::Proxy("No ME writers available for target DC".into())); + } } let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len(); @@ -85,10 +106,7 @@ impl MePool { } pub async fn send_close(&self, conn_id: u64) -> Result<()> { - let ws = self.writers.read().await; - if !ws.is_empty() { - let w = ws[0].1.clone(); - drop(ws); + if let Some(w) = self.registry.get_writer(conn_id).await { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); @@ -97,6 +115,8 @@ impl MePool { let mut ws = self.writers.write().await; ws.retain(|(_, o)| !Arc::ptr_eq(o, &w)); } + } else { + debug!(conn_id, "ME close skipped (writer missing)"); } self.registry.unregister(conn_id).await; @@ -106,41 +126,54 @@ impl MePool { pub fn connection_count(&self) -> usize { self.writers.try_read().map(|w| w.len()).unwrap_or(0) } -} + + pub(super) async fn candidate_indices_for_dc( + &self, + writers: &[(SocketAddr, Arc>)], + target_dc: i16, + ) -> Vec { + let mut preferred = Vec::::new(); + let key = target_dc as i32; + let map = self.proxy_map_v4.read().await; -fn candidate_indices_for_dc( - writers: &[(SocketAddr, Arc>)], - target_dc: i16, -) -> Vec { - let mut preferred = Vec::::new(); - let key = target_dc as i32; - if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&key) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&abs) { + if let Some(v) = map.get(&key) { preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); } - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&-abs) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + if preferred.is_empty() { + let abs = key.abs(); + if let Some(v) = map.get(&abs) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } } - } - if preferred.is_empty() { - return (0..writers.len()).collect(); + if preferred.is_empty() { + let abs = key.abs(); + if let Some(v) = map.get(&-abs) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + } + if preferred.is_empty() { + let def = self.default_dc.load(Ordering::Relaxed); + if def != 0 { + if let Some(v) = map.get(&def) { + preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + } + } + + if preferred.is_empty() { + return (0..writers.len()).collect(); + } + + let mut out = Vec::new(); + for (idx, (addr, _)) in writers.iter().enumerate() { + if preferred.iter().any(|p| p == addr) { + out.push(idx); + } + } + if out.is_empty() { + return (0..writers.len()).collect(); + } + out } - let mut out = Vec::new(); - for (idx, (addr, _)) in writers.iter().enumerate() { - if preferred.iter().any(|p| p == addr) { - out.push(idx); - } - } - if out.is_empty() { - return (0..writers.len()).collect(); - } - out } diff --git a/src/transport/middle_proxy/wire.rs b/src/transport/middle_proxy/wire.rs index 1ed9727..3f78f20 100644 --- a/src/transport/middle_proxy/wire.rs +++ b/src/transport/middle_proxy/wire.rs @@ -28,9 +28,7 @@ fn ipv4_to_mapped_v6_c_compat(ip: Ipv4Addr) -> [u8; 16] { buf[8..12].copy_from_slice(&(-0x10000i32).to_le_bytes()); // Matches tl_store_int(htonl(remote_ip_host_order)). - let host_order = u32::from_ne_bytes(ip.octets()); - let network_order = host_order.to_be(); - buf[12..16].copy_from_slice(&network_order.to_le_bytes()); + buf[12..16].copy_from_slice(&ip.octets()); buf } @@ -60,7 +58,7 @@ pub(crate) fn build_proxy_req_payload( append_mapped_addr_and_port(&mut b, client_addr); append_mapped_addr_and_port(&mut b, our_addr); - if proto_flags & 12 != 0 { + if proto_flags & RPC_FLAG_HAS_AD_TAG != 0 { let extra_start = b.len(); b.extend_from_slice(&0u32.to_le_bytes()); @@ -104,3 +102,17 @@ pub fn proto_flags_for_tag(tag: crate::protocol::constants::ProtoTag, has_proxy_ ProtoTag::Secure => flags | RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE, } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ipv4_mapped_encoding() { + let ip = Ipv4Addr::new(149, 154, 175, 50); + let buf = ipv4_to_mapped_v6_c_compat(ip); + assert_eq!(&buf[0..10], &[0u8; 10]); + assert_eq!(&buf[10..12], &[0xff, 0xff]); + assert_eq!(&buf[12..16], &[149, 154, 175, 50]); + } +} diff --git a/telemt b/telemt index 9db056f..fbb8d6f 100644 Binary files a/telemt and b/telemt differ