From f1c1f42de84444b303df795b92894c1e35edb1e0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 13 Feb 2026 12:51:49 +0300 Subject: [PATCH] Key derivation + me_health_monitor + QuickACK Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/mod.rs | 6 + src/main.rs | 670 ++++++++++--------- src/proxy/client.rs | 930 +++++++++++++++----------- src/transport/middle_proxy.rs | 1183 +++++++++++++++++++++++---------- 4 files changed, 1771 insertions(+), 1018 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 14ac6ab..7f74489 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -116,6 +116,11 @@ pub struct GeneralConfig { #[serde(default)] pub ad_tag: Option, + + /// Path to proxy-secret binary file (auto-downloaded if absent). + /// Infrastructure secret from https://core.telegram.org/getProxySecret + #[serde(default)] + pub proxy_secret_path: Option, #[serde(default)] pub log_level: LogLevel, @@ -129,6 +134,7 @@ impl Default for GeneralConfig { fast_mode: true, use_middle_proxy: false, ad_tag: None, + proxy_secret_path: None, log_level: LogLevel::Normal, } } diff --git a/src/main.rs b/src/main.rs index b28167c..94ae06b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,313 +1,391 @@ -//! Telemt - MTProxy on Rust - -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; -use tokio::net::TcpListener; -use tokio::signal; -use tokio::sync::Semaphore; -use tracing::{info, error, warn, debug}; -use tracing_subscriber::{fmt, EnvFilter, reload, prelude::*}; - -mod cli; -mod config; -mod crypto; -mod error; -mod protocol; -mod proxy; -mod stats; -mod stream; -mod transport; -mod util; - -use crate::config::{ProxyConfig, LogLevel}; -use crate::proxy::ClientHandler; -use crate::stats::{Stats, ReplayChecker}; -use crate::crypto::SecureRandom; -use crate::transport::{create_listener, ListenOptions, UpstreamManager}; -use crate::util::ip::detect_ip; -use crate::stream::BufferPool; - -fn parse_cli() -> (String, bool, Option) { - let mut config_path = "config.toml".to_string(); - let mut silent = false; - let mut log_level: Option = None; +//! telemt — Telegram MTProto Proxy - let args: Vec = std::env::args().skip(1).collect(); + use std::net::SocketAddr; + use std::sync::Arc; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio::signal; + use tokio::sync::Semaphore; + use tracing::{info, error, warn, debug}; + use tracing_subscriber::{fmt, EnvFilter, reload, prelude::*}; - // Check for --init first (handled before tokio) - if let Some(init_opts) = cli::parse_init_args(&args) { - if let Err(e) = cli::run_init(init_opts) { - eprintln!("[telemt] Init failed: {}", e); + mod cli; + mod config; + mod crypto; + mod error; + mod protocol; + mod proxy; + mod stats; + mod stream; + mod transport; + mod util; + + use crate::config::{ProxyConfig, LogLevel}; + use crate::proxy::ClientHandler; + use crate::stats::{Stats, ReplayChecker}; + use crate::crypto::SecureRandom; + use crate::transport::{create_listener, ListenOptions, UpstreamManager}; + use crate::transport::middle_proxy::MePool; + use crate::util::ip::detect_ip; + use crate::stream::BufferPool; + + fn parse_cli() -> (String, bool, Option) { + let mut config_path = "config.toml".to_string(); + let mut silent = false; + let mut log_level: Option = None; + + let args: Vec = std::env::args().skip(1).collect(); + + // Check for --init first (handled before tokio) + if let Some(init_opts) = cli::parse_init_args(&args) { + if let Err(e) = cli::run_init(init_opts) { + eprintln!("[telemt] Init failed: {}", e); + std::process::exit(1); + } + std::process::exit(0); + } + + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--silent" | "-s" => { silent = true; } + "--log-level" => { + i += 1; + if i < args.len() { log_level = Some(args[i].clone()); } + } + s if s.starts_with("--log-level=") => { + log_level = Some(s.trim_start_matches("--log-level=").to_string()); + } + "--help" | "-h" => { + eprintln!("Usage: telemt [config.toml] [OPTIONS]"); + eprintln!(); + eprintln!("Options:"); + eprintln!(" --silent, -s Suppress info logs"); + eprintln!(" --log-level debug|verbose|normal|silent"); + eprintln!(" --help, -h Show this help"); + eprintln!(); + eprintln!("Setup (fire-and-forget):"); + eprintln!(" --init Generate config, install systemd service, start"); + eprintln!(" --port Listen port (default: 443)"); + eprintln!(" --domain TLS domain for masking (default: www.google.com)"); + eprintln!(" --secret 32-char hex secret (auto-generated if omitted)"); + eprintln!(" --user Username (default: user)"); + eprintln!(" --config-dir Config directory (default: /etc/telemt)"); + eprintln!(" --no-start Don't start the service after install"); + std::process::exit(0); + } + s if !s.starts_with('-') => { config_path = s.to_string(); } + other => { eprintln!("Unknown option: {}", other); } + } + i += 1; + } + + (config_path, silent, log_level) + } + + #[tokio::main] + async fn main() -> std::result::Result<(), Box> { + let (config_path, cli_silent, cli_log_level) = parse_cli(); + + let config = match ProxyConfig::load(&config_path) { + Ok(c) => c, + Err(e) => { + if std::path::Path::new(&config_path).exists() { + eprintln!("[telemt] Error: {}", e); + std::process::exit(1); + } else { + let default = ProxyConfig::default(); + std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); + eprintln!("[telemt] Created default config at {}", config_path); + default + } + } + }; + + if let Err(e) = config.validate() { + eprintln!("[telemt] Invalid config: {}", e); std::process::exit(1); } - std::process::exit(0); - } - let mut i = 0; - while i < args.len() { - match args[i].as_str() { - "--silent" | "-s" => { silent = true; } - "--log-level" => { - i += 1; - if i < args.len() { log_level = Some(args[i].clone()); } - } - s if s.starts_with("--log-level=") => { - log_level = Some(s.trim_start_matches("--log-level=").to_string()); - } - "--help" | "-h" => { - eprintln!("Usage: telemt [config.toml] [OPTIONS]"); - eprintln!(); - eprintln!("Options:"); - eprintln!(" --silent, -s Suppress info logs"); - eprintln!(" --log-level debug|verbose|normal|silent"); - eprintln!(" --help, -h Show this help"); - eprintln!(); - eprintln!("Setup (fire-and-forget):"); - eprintln!(" --init Generate config, install systemd service, start"); - eprintln!(" --port Listen port (default: 443)"); - eprintln!(" --domain TLS domain for masking (default: www.google.com)"); - eprintln!(" --secret 32-char hex secret (auto-generated if omitted)"); - eprintln!(" --user Username (default: user)"); - eprintln!(" --config-dir Config directory (default: /etc/telemt)"); - eprintln!(" --no-start Don't start the service after install"); - std::process::exit(0); - } - s if !s.starts_with('-') => { config_path = s.to_string(); } - other => { eprintln!("Unknown option: {}", other); } - } - i += 1; - } - - (config_path, silent, log_level) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let (config_path, cli_silent, cli_log_level) = parse_cli(); - - let config = match ProxyConfig::load(&config_path) { - Ok(c) => c, - Err(e) => { - if std::path::Path::new(&config_path).exists() { - eprintln!("[telemt] Error: {}", e); - std::process::exit(1); - } else { - let default = ProxyConfig::default(); - std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); - eprintln!("[telemt] Created default config at {}", config_path); - default - } - } - }; - - if let Err(e) = config.validate() { - eprintln!("[telemt] Invalid config: {}", e); - std::process::exit(1); - } - - let has_rust_log = std::env::var("RUST_LOG").is_ok(); - let effective_log_level = if cli_silent { - LogLevel::Silent - } else if let Some(ref s) = cli_log_level { - LogLevel::from_str_loose(s) - } else { - config.general.log_level.clone() - }; - - // Start with INFO so startup messages are always visible, - // then switch to user-configured level after startup - let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); - tracing_subscriber::registry() - .with(filter_layer) - .with(fmt::Layer::default()) - .init(); - - info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); - info!("Log level: {}", effective_log_level); - info!("Modes: classic={} secure={} tls={}", - config.general.modes.classic, - config.general.modes.secure, - config.general.modes.tls); - info!("TLS domain: {}", config.censorship.tls_domain); - if let Some(ref sock) = config.censorship.mask_unix_sock { - info!("Mask: {} -> unix:{}", config.censorship.mask, sock); - if !std::path::Path::new(sock).exists() { - warn!("Unix socket '{}' does not exist yet. Masking will fail until it appears.", sock); - } - } else { - info!("Mask: {} -> {}:{}", - config.censorship.mask, - config.censorship.mask_host.as_deref().unwrap_or(&config.censorship.tls_domain), - config.censorship.mask_port); - } - - if config.censorship.tls_domain == "www.google.com" { - warn!("Using default tls_domain. Consider setting a custom domain."); - } - - let prefer_ipv6 = config.general.prefer_ipv6; - let config = Arc::new(config); - let stats = Arc::new(Stats::new()); - let rng = Arc::new(SecureRandom::new()); - - let replay_checker = Arc::new(ReplayChecker::new( - config.access.replay_check_len, - Duration::from_secs(config.access.replay_window_secs), - )); - - let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); - let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); - - // Connection concurrency limit — prevents OOM under SYN flood / connection storm. - // 10000 is generous; each connection uses ~64KB (2x 16KB relay buffers + overhead). - // 10000 connections ≈ 640MB peak memory. - let max_connections = Arc::new(Semaphore::new(10_000)); - - // Startup DC ping - info!("=== Telegram DC Connectivity ==="); - let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await; - for upstream_result in &ping_results { - info!(" via {}", upstream_result.upstream_name); - for dc in &upstream_result.results { - match (&dc.rtt_ms, &dc.error) { - (Some(rtt), _) => { - info!(" DC{} ({:>21}): {:.0}ms", dc.dc_idx, dc.dc_addr, rtt); - } - (None, Some(err)) => { - info!(" DC{} ({:>21}): FAIL ({})", dc.dc_idx, dc.dc_addr, err); - } - _ => { - info!(" DC{} ({:>21}): FAIL", dc.dc_idx, dc.dc_addr); - } - } - } - } - info!("================================"); - - // Background tasks - let um_clone = upstream_manager.clone(); - tokio::spawn(async move { um_clone.run_health_checks(prefer_ipv6).await; }); - - let rc_clone = replay_checker.clone(); - tokio::spawn(async move { rc_clone.run_periodic_cleanup().await; }); - - let detected_ip = detect_ip().await; - debug!("Detected IPs: v4={:?} v6={:?}", detected_ip.ipv4, detected_ip.ipv6); - - let mut listeners = Vec::new(); - - for listener_conf in &config.server.listeners { - let addr = SocketAddr::new(listener_conf.ip, config.server.port); - let options = ListenOptions { - ipv6_only: listener_conf.ip.is_ipv6(), - ..Default::default() + let has_rust_log = std::env::var("RUST_LOG").is_ok(); + let effective_log_level = if cli_silent { + LogLevel::Silent + } else if let Some(ref s) = cli_log_level { + LogLevel::from_str_loose(s) + } else { + config.general.log_level.clone() }; - - match create_listener(addr, &options) { - Ok(socket) => { - let listener = TcpListener::from_std(socket.into())?; - info!("Listening on {}", addr); - - let public_ip = if let Some(ip) = listener_conf.announce_ip { - ip - } else if listener_conf.ip.is_unspecified() { - if listener_conf.ip.is_ipv4() { - detected_ip.ipv4.unwrap_or(listener_conf.ip) - } else { - detected_ip.ipv6.unwrap_or(listener_conf.ip) - } - } else { - listener_conf.ip - }; - - if !config.show_link.is_empty() { - info!("--- Proxy Links ({}) ---", public_ip); - for user_name in &config.show_link { - if let Some(secret) = config.access.users.get(user_name) { - info!("User: {}", user_name); - if config.general.modes.classic { - info!(" Classic: tg://proxy?server={}&port={}&secret={}", - public_ip, config.server.port, secret); + + let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt::Layer::default()) + .init(); + + info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); + info!("Log level: {}", effective_log_level); + info!("Modes: classic={} secure={} tls={}", + config.general.modes.classic, + config.general.modes.secure, + config.general.modes.tls); + info!("TLS domain: {}", config.censorship.tls_domain); + if let Some(ref sock) = config.censorship.mask_unix_sock { + info!("Mask: {} -> unix:{}", config.censorship.mask, sock); + if !std::path::Path::new(sock).exists() { + warn!("Unix socket '{}' does not exist yet. Masking will fail until it appears.", sock); + } + } else { + info!("Mask: {} -> {}:{}", + config.censorship.mask, + config.censorship.mask_host.as_deref().unwrap_or(&config.censorship.tls_domain), + config.censorship.mask_port); + } + + if config.censorship.tls_domain == "www.google.com" { + warn!("Using default tls_domain. Consider setting a custom domain."); + } + + let prefer_ipv6 = config.general.prefer_ipv6; + let use_middle_proxy = config.general.use_middle_proxy; + let config = Arc::new(config); + let stats = Arc::new(Stats::new()); + let rng = Arc::new(SecureRandom::new()); + + let replay_checker = Arc::new(ReplayChecker::new( + config.access.replay_check_len, + Duration::from_secs(config.access.replay_window_secs), + )); + + let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); + let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); + + // Connection concurrency limit + let _max_connections = Arc::new(Semaphore::new(10_000)); + + // ===================================================================== + // Middle Proxy initialization (if enabled) + // ===================================================================== + let me_pool: Option> = if use_middle_proxy { + info!("=== Middle Proxy Mode ==="); + + // ad_tag (proxy_tag) for advertising + let proxy_tag = config.general.ad_tag.as_ref().map(|tag| { + hex::decode(tag).unwrap_or_else(|_| { + warn!("Invalid ad_tag hex, middle proxy ad_tag will be empty"); + Vec::new() + }) + }); + + // ============================================================= + // CRITICAL: Download Telegram proxy-secret (NOT user secret!) + // + // C MTProxy uses TWO separate secrets: + // -S flag = 16-byte user secret for client obfuscation + // --aes-pwd = 32-512 byte binary file for ME RPC auth + // + // proxy-secret is from: https://core.telegram.org/getProxySecret + // ============================================================= + let proxy_secret_path = config.general.proxy_secret_path.as_deref(); + match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { + Ok(proxy_secret) => { + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!("0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([proxy_secret[0], proxy_secret[1], + proxy_secret[2], proxy_secret[3]]) + } else { 0 }), + "Proxy-secret loaded" + ); + + let pool = MePool::new(proxy_tag, proxy_secret); + + match pool.init(2, &rng).await { + Ok(()) => { + info!("Middle-End pool initialized successfully"); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, 2, + ).await; + }); + + Some(pool) } - if config.general.modes.secure { - info!(" DD: tg://proxy?server={}&port={}&secret=dd{}", - public_ip, config.server.port, secret); + Err(e) => { + error!(error = %e, "Failed to initialize ME pool. Falling back to direct mode."); + None } - if config.general.modes.tls { - let domain_hex = hex::encode(&config.censorship.tls_domain); - info!(" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", - public_ip, config.server.port, secret, domain_hex); - } - } else { - warn!("User '{}' in show_link not found", user_name); } } - info!("------------------------"); + Err(e) => { + error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode."); + None + } + } + } else { + None + }; + + if me_pool.is_some() { + info!("Transport: Middle Proxy (supports all DCs including CDN)"); + } else { + info!("Transport: Direct TCP (standard DCs only)"); + } + + // Startup DC ping (only meaningful in direct mode) + if me_pool.is_none() { + info!("=== Telegram DC Connectivity ==="); + let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await; + for upstream_result in &ping_results { + info!(" via {}", upstream_result.upstream_name); + for dc in &upstream_result.results { + match (&dc.rtt_ms, &dc.error) { + (Some(rtt), _) => { + info!(" DC{} ({:>21}): {:.0}ms", dc.dc_idx, dc.dc_addr, rtt); + } + (None, Some(err)) => { + info!(" DC{} ({:>21}): FAIL ({})", dc.dc_idx, dc.dc_addr, err); + } + _ => { + info!(" DC{} ({:>21}): FAIL", dc.dc_idx, dc.dc_addr); + } + } + } + } + info!("================================"); + } + + // Background tasks + let um_clone = upstream_manager.clone(); + tokio::spawn(async move { um_clone.run_health_checks(prefer_ipv6).await; }); + + let rc_clone = replay_checker.clone(); + tokio::spawn(async move { rc_clone.run_periodic_cleanup().await; }); + + let detected_ip = detect_ip().await; + debug!("Detected IPs: v4={:?} v6={:?}", detected_ip.ipv4, detected_ip.ipv6); + + let mut listeners = Vec::new(); + + for listener_conf in &config.server.listeners { + let addr = SocketAddr::new(listener_conf.ip, config.server.port); + let options = ListenOptions { + ipv6_only: listener_conf.ip.is_ipv6(), + ..Default::default() + }; + + match create_listener(addr, &options) { + Ok(socket) => { + let listener = TcpListener::from_std(socket.into())?; + info!("Listening on {}", addr); + + let public_ip = if let Some(ip) = listener_conf.announce_ip { + ip + } else if listener_conf.ip.is_unspecified() { + if listener_conf.ip.is_ipv4() { + detected_ip.ipv4.unwrap_or(listener_conf.ip) + } else { + detected_ip.ipv6.unwrap_or(listener_conf.ip) + } + } else { + listener_conf.ip + }; + + if !config.show_link.is_empty() { + info!("--- Proxy Links ({}) ---", public_ip); + for user_name in &config.show_link { + if let Some(secret) = config.access.users.get(user_name) { + info!("User: {}", user_name); + if config.general.modes.classic { + info!(" Classic: tg://proxy?server={}&port={}&secret={}", + public_ip, config.server.port, secret); + } + if config.general.modes.secure { + info!(" DD: tg://proxy?server={}&port={}&secret=dd{}", + public_ip, config.server.port, secret); + } + if config.general.modes.tls { + let domain_hex = hex::encode(&config.censorship.tls_domain); + info!(" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", + public_ip, config.server.port, secret, domain_hex); + } + } else { + warn!("User '{}' in show_link not found", user_name); + } + } + info!("------------------------"); + } + + listeners.push(listener); + }, + Err(e) => { + error!("Failed to bind to {}: {}", addr, e); } - - listeners.push(listener); - }, - Err(e) => { - error!("Failed to bind to {}: {}", addr, e); } } - } - if listeners.is_empty() { - error!("No listeners. Exiting."); - std::process::exit(1); - } - - // Switch to user-configured log level after startup - let runtime_filter = if has_rust_log { - EnvFilter::from_default_env() - } else { - EnvFilter::new(effective_log_level.to_filter_str()) - }; - filter_handle.reload(runtime_filter).expect("Failed to switch log filter"); - - for listener in listeners { - let config = config.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - - tokio::spawn(async move { - loop { - match listener.accept().await { - Ok((stream, peer_addr)) => { - let config = config.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - - tokio::spawn(async move { - if let Err(e) = ClientHandler::new( - stream, peer_addr, config, stats, - upstream_manager, replay_checker, buffer_pool, rng - ).run().await { - debug!(peer = %peer_addr, error = %e, "Connection error"); - } - }); - } - Err(e) => { - error!("Accept error: {}", e); - tokio::time::sleep(Duration::from_millis(100)).await; + if listeners.is_empty() { + error!("No listeners. Exiting."); + std::process::exit(1); + } + + // Switch to user-configured log level after startup + let runtime_filter = if has_rust_log { + EnvFilter::from_default_env() + } else { + EnvFilter::new(effective_log_level.to_filter_str()) + }; + filter_handle.reload(runtime_filter).expect("Failed to switch log filter"); + + for listener in listeners { + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + + tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((stream, peer_addr)) => { + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + + tokio::spawn(async move { + if let Err(e) = ClientHandler::new( + stream, peer_addr, config, stats, + upstream_manager, replay_checker, buffer_pool, rng, + me_pool, + ).run().await { + debug!(peer = %peer_addr, error = %e, "Connection error"); + } + }); + } + Err(e) => { + error!("Accept error: {}", e); + tokio::time::sleep(Duration::from_millis(100)).await; + } } } - } - }); + }); + } + + match signal::ctrl_c().await { + Ok(()) => info!("Shutting down..."), + Err(e) => error!("Signal error: {}", e), + } + + Ok(()) } - - match signal::ctrl_c().await { - Ok(()) => info!("Shutting down..."), - Err(e) => error!("Signal error: {}", e), - } - - Ok(()) -} \ No newline at end of file + \ No newline at end of file diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 78db056..4a8d7ac 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1,403 +1,583 @@ //! Client Handler - -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; -use tokio::net::TcpStream; -use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; -use tokio::time::timeout; -use tracing::{debug, info, warn, error, trace}; - -use crate::config::ProxyConfig; -use crate::error::{ProxyError, Result, HandshakeResult}; -use crate::protocol::constants::*; -use crate::protocol::tls; -use crate::stats::{Stats, ReplayChecker}; -use crate::transport::{configure_client_socket, UpstreamManager}; -use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool}; -use crate::crypto::{AesCtr, SecureRandom}; - -use crate::proxy::handshake::{ - handle_tls_handshake, handle_mtproto_handshake, - HandshakeSuccess, generate_tg_nonce, encrypt_tg_nonce, -}; -use crate::proxy::relay::relay_bidirectional; -use crate::proxy::masking::handle_bad_client; - -pub struct ClientHandler; - -pub struct RunningClientHandler { - stream: TcpStream, - peer: SocketAddr, - config: Arc, - stats: Arc, - replay_checker: Arc, - upstream_manager: Arc, - buffer_pool: Arc, - rng: Arc, -} - -impl ClientHandler { - pub fn new( + + use std::net::SocketAddr; + use std::sync::Arc; + use std::time::Duration; + use tokio::net::TcpStream; + use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; + use tokio::time::timeout; + use tracing::{debug, info, warn, error, trace}; + + use crate::config::ProxyConfig; + use crate::error::{ProxyError, Result, HandshakeResult}; + use crate::protocol::constants::*; + use crate::protocol::tls; + use crate::stats::{Stats, ReplayChecker}; + use crate::transport::{configure_client_socket, UpstreamManager}; + use crate::transport::middle_proxy::{MePool, MeResponse}; + use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool}; + use crate::crypto::{AesCtr, SecureRandom}; + + use crate::proxy::handshake::{ + handle_tls_handshake, handle_mtproto_handshake, + HandshakeSuccess, generate_tg_nonce, encrypt_tg_nonce, + }; + use crate::proxy::relay::relay_bidirectional; + use crate::proxy::masking::handle_bad_client; + + pub struct ClientHandler; + + pub struct RunningClientHandler { stream: TcpStream, peer: SocketAddr, config: Arc, stats: Arc, - upstream_manager: Arc, replay_checker: Arc, - buffer_pool: Arc, - rng: Arc, - ) -> RunningClientHandler { - RunningClientHandler { - stream, peer, config, stats, replay_checker, - upstream_manager, buffer_pool, rng, - } - } -} - -impl RunningClientHandler { - pub async fn run(mut self) -> Result<()> { - self.stats.increment_connects_all(); - - let peer = self.peer; - debug!(peer = %peer, "New connection"); - - if let Err(e) = configure_client_socket( - &self.stream, - self.config.timeouts.client_keepalive, - self.config.timeouts.client_ack, - ) { - debug!(peer = %peer, error = %e, "Failed to configure client socket"); - } - - let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); - let stats = self.stats.clone(); - - let result = timeout(handshake_timeout, self.do_handshake()).await; - - match result { - Ok(Ok(())) => { - debug!(peer = %peer, "Connection handled successfully"); - Ok(()) - } - Ok(Err(e)) => { - debug!(peer = %peer, error = %e, "Handshake failed"); - Err(e) - } - Err(_) => { - stats.increment_handshake_timeouts(); - debug!(peer = %peer, "Handshake timeout"); - Err(ProxyError::TgHandshakeTimeout) - } - } - } - - async fn do_handshake(mut self) -> Result<()> { - let mut first_bytes = [0u8; 5]; - self.stream.read_exact(&mut first_bytes).await?; - - let is_tls = tls::is_tls_handshake(&first_bytes[..3]); - let peer = self.peer; - - debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); - - if is_tls { - self.handle_tls_client(first_bytes).await - } else { - self.handle_direct_client(first_bytes).await - } - } - - async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> { - let peer = self.peer; - - let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; - - debug!(peer = %peer, tls_len = tls_len, "Reading TLS handshake"); - - if tls_len < 512 { - debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); - self.stats.increment_connects_bad(); - let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; - return Ok(()); - } - - let mut handshake = vec![0u8; 5 + tls_len]; - handshake[..5].copy_from_slice(&first_bytes); - self.stream.read_exact(&mut handshake[5..]).await?; - - let config = self.config.clone(); - let replay_checker = self.replay_checker.clone(); - let stats = self.stats.clone(); - let buffer_pool = self.buffer_pool.clone(); - - let (read_half, write_half) = self.stream.into_split(); - - let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake( - &handshake, read_half, write_half, peer, - &config, &replay_checker, &self.rng, - ).await { - HandshakeResult::Success(result) => result, - HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; - return Ok(()); - } - HandshakeResult::Error(e) => return Err(e), - }; - - debug!(peer = %peer, "Reading MTProto handshake through TLS"); - let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; - let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into() - .map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?; - - let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( - &mtproto_handshake, tls_reader, tls_writer, peer, - &config, &replay_checker, true, - ).await { - HandshakeResult::Success(result) => result, - HandshakeResult::BadClient { reader: _, writer: _ } => { - stats.increment_connects_bad(); - debug!(peer = %peer, "Valid TLS but invalid MTProto handshake"); - return Ok(()); - } - HandshakeResult::Error(e) => return Err(e), - }; - - Self::handle_authenticated_static( - crypto_reader, crypto_writer, success, - self.upstream_manager, self.stats, self.config, - buffer_pool, self.rng, - ).await - } - - async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> { - let peer = self.peer; - - if !self.config.general.modes.classic && !self.config.general.modes.secure { - debug!(peer = %peer, "Non-TLS modes disabled"); - self.stats.increment_connects_bad(); - let (reader, writer) = self.stream.into_split(); - handle_bad_client(reader, writer, &first_bytes, &self.config).await; - return Ok(()); - } - - let mut handshake = [0u8; HANDSHAKE_LEN]; - handshake[..5].copy_from_slice(&first_bytes); - self.stream.read_exact(&mut handshake[5..]).await?; - - let config = self.config.clone(); - let replay_checker = self.replay_checker.clone(); - let stats = self.stats.clone(); - let buffer_pool = self.buffer_pool.clone(); - - let (read_half, write_half) = self.stream.into_split(); - - let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( - &handshake, read_half, write_half, peer, - &config, &replay_checker, false, - ).await { - HandshakeResult::Success(result) => result, - HandshakeResult::BadClient { reader, writer } => { - stats.increment_connects_bad(); - handle_bad_client(reader, writer, &handshake, &config).await; - return Ok(()); - } - HandshakeResult::Error(e) => return Err(e), - }; - - Self::handle_authenticated_static( - crypto_reader, crypto_writer, success, - self.upstream_manager, self.stats, self.config, - buffer_pool, self.rng, - ).await - } - - async fn handle_authenticated_static( - client_reader: CryptoReader, - client_writer: CryptoWriter, - success: HandshakeSuccess, upstream_manager: Arc, - stats: Arc, - config: Arc, buffer_pool: Arc, rng: Arc, - ) -> Result<()> - where - R: AsyncRead + Unpin + Send + 'static, - W: AsyncWrite + Unpin + Send + 'static, - { - let user = &success.user; - - if let Err(e) = Self::check_user_limits_static(user, &config, &stats) { - warn!(user = %user, error = %e, "User limit exceeded"); - return Err(e); - } - - let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?; - - info!( - user = %user, - peer = %success.peer, - dc = success.dc_idx, - dc_addr = %dc_addr, - proto = ?success.proto_tag, - "Connecting to Telegram" - ); - - // Pass dc_idx for latency-based upstream selection - let tg_stream = upstream_manager.connect(dc_addr, Some(success.dc_idx)).await?; - - debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); - - let (tg_reader, tg_writer) = Self::do_tg_handshake_static( - tg_stream, &success, &config, rng.as_ref(), - ).await?; - - debug!(peer = %success.peer, "TG handshake complete, starting relay"); - - stats.increment_user_connects(user); - stats.increment_user_curr_connects(user); - - let relay_result = relay_bidirectional( - client_reader, client_writer, - tg_reader, tg_writer, - user, Arc::clone(&stats), buffer_pool, - ).await; - - stats.decrement_user_curr_connects(user); - - match &relay_result { - Ok(()) => debug!(user = %user, "Relay completed"), - Err(e) => debug!(user = %user, error = %e, "Relay ended with error"), - } - - relay_result + me_pool: Option>, } - fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { - if let Some(expiration) = config.access.user_expirations.get(user) { - if chrono::Utc::now() > *expiration { - return Err(ProxyError::UserExpired { user: user.to_string() }); + impl ClientHandler { + pub fn new( + stream: TcpStream, + peer: SocketAddr, + config: Arc, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + ) -> RunningClientHandler { + RunningClientHandler { + stream, peer, config, stats, replay_checker, + upstream_manager, buffer_pool, rng, me_pool, } } - - if let Some(limit) = config.access.user_max_tcp_conns.get(user) { - if stats.get_user_curr_connects(user) >= *limit as u64 { - return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() }); - } - } - - if let Some(quota) = config.access.user_data_quota.get(user) { - if stats.get_user_total_octets(user) >= *quota { - return Err(ProxyError::DataQuotaExceeded { user: user.to_string() }); - } - } - - Ok(()) } - /// Resolve DC index to a target address. - /// - /// Matches the C implementation's behavior exactly: - /// - /// 1. Look up DC in known clusters (standard DCs ±1..±5) - /// 2. If not found and `force=1` → fall back to `default_cluster` - /// - /// In the C code: - /// - `proxy-multi.conf` is downloaded from Telegram, contains only DC ±1..±5 - /// - `default 2;` directive sets the default cluster - /// - `mf_cluster_lookup(CurConf, target_dc, 1)` returns default_cluster - /// for any unknown DC (like CDN DC 203) - /// - /// So DC 203, DC 101, DC -300, etc. all route to the default DC (2). - /// There is NO modular arithmetic in the C implementation. - fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { - let datacenters = if config.general.prefer_ipv6 { - &*TG_DATACENTERS_V6 - } else { - &*TG_DATACENTERS_V4 - }; + impl RunningClientHandler { + pub async fn run(mut self) -> Result<()> { + self.stats.increment_connects_all(); - let num_dcs = datacenters.len(); // 5 + let peer = self.peer; + debug!(peer = %peer, "New connection"); - // === Step 1: Check dc_overrides (like C's `proxy_for :`) === - let dc_key = dc_idx.to_string(); - if let Some(addr_str) = config.dc_overrides.get(&dc_key) { - match addr_str.parse::() { - Ok(addr) => { - debug!(dc_idx = dc_idx, addr = %addr, "Using DC override from config"); - return Ok(addr); + if let Err(e) = configure_client_socket( + &self.stream, + self.config.timeouts.client_keepalive, + self.config.timeouts.client_ack, + ) { + debug!(peer = %peer, error = %e, "Failed to configure client socket"); + } + + let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); + let stats = self.stats.clone(); + + let result = timeout(handshake_timeout, self.do_handshake()).await; + + match result { + Ok(Ok(())) => { + debug!(peer = %peer, "Connection handled successfully"); + Ok(()) + } + Ok(Err(e)) => { + debug!(peer = %peer, error = %e, "Handshake failed"); + Err(e) } Err(_) => { - warn!(dc_idx = dc_idx, addr_str = %addr_str, - "Invalid DC override address in config, ignoring"); + stats.increment_handshake_timeouts(); + debug!(peer = %peer, "Handshake timeout"); + Err(ProxyError::TgHandshakeTimeout) } } } - // === Step 2: Standard DCs ±1..±5 — direct lookup === - let abs_dc = dc_idx.unsigned_abs() as usize; - if abs_dc >= 1 && abs_dc <= num_dcs { - return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT)); + async fn do_handshake(mut self) -> Result<()> { + let mut first_bytes = [0u8; 5]; + self.stream.read_exact(&mut first_bytes).await?; + + let is_tls = tls::is_tls_handshake(&first_bytes[..3]); + let peer = self.peer; + + debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); + + if is_tls { + self.handle_tls_client(first_bytes).await + } else { + self.handle_direct_client(first_bytes).await + } } - // === Step 3: Unknown DC — fall back to default_cluster === - // Exactly like C's `mf_cluster_lookup(CurConf, target_dc, force=1)` - // which returns `MC->default_cluster` when the DC is not found. - // Telegram's proxy-multi.conf uses `default 2;` - let default_dc = config.default_dc.unwrap_or(2) as usize; - let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs { - default_dc - 1 - } else { - 1 // DC 2 (index 1) — matches Telegram's `default 2;` - }; + async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> { + let peer = self.peer; - info!( - original_dc = dc_idx, - fallback_dc = (fallback_idx + 1) as u16, - fallback_addr = %datacenters[fallback_idx], - "Special DC ---> default_cluster" - ); + let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; - Ok(SocketAddr::new(datacenters[fallback_idx], TG_DATACENTER_PORT)) + debug!(peer = %peer, tls_len = tls_len, "Reading TLS handshake"); + + if tls_len < 512 { + debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); + self.stats.increment_connects_bad(); + let (reader, writer) = self.stream.into_split(); + handle_bad_client(reader, writer, &first_bytes, &self.config).await; + return Ok(()); + } + + let mut handshake = vec![0u8; 5 + tls_len]; + handshake[..5].copy_from_slice(&first_bytes); + self.stream.read_exact(&mut handshake[5..]).await?; + + let config = self.config.clone(); + let replay_checker = self.replay_checker.clone(); + let stats = self.stats.clone(); + let buffer_pool = self.buffer_pool.clone(); + + let (read_half, write_half) = self.stream.into_split(); + + let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake( + &handshake, read_half, write_half, peer, + &config, &replay_checker, &self.rng, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader, writer } => { + stats.increment_connects_bad(); + handle_bad_client(reader, writer, &handshake, &config).await; + return Ok(()); + } + HandshakeResult::Error(e) => return Err(e), + }; + + debug!(peer = %peer, "Reading MTProto handshake through TLS"); + let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; + let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into() + .map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?; + + let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( + &mtproto_handshake, tls_reader, tls_writer, peer, + &config, &replay_checker, true, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader: _, writer: _ } => { + stats.increment_connects_bad(); + debug!(peer = %peer, "Valid TLS but invalid MTProto handshake"); + return Ok(()); + } + HandshakeResult::Error(e) => return Err(e), + }; + + Self::handle_authenticated_static( + crypto_reader, crypto_writer, success, + self.upstream_manager, self.stats, self.config, + buffer_pool, self.rng, self.me_pool, + ).await + } + + async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> { + let peer = self.peer; + + if !self.config.general.modes.classic && !self.config.general.modes.secure { + debug!(peer = %peer, "Non-TLS modes disabled"); + self.stats.increment_connects_bad(); + let (reader, writer) = self.stream.into_split(); + handle_bad_client(reader, writer, &first_bytes, &self.config).await; + return Ok(()); + } + + let mut handshake = [0u8; HANDSHAKE_LEN]; + handshake[..5].copy_from_slice(&first_bytes); + self.stream.read_exact(&mut handshake[5..]).await?; + + let config = self.config.clone(); + let replay_checker = self.replay_checker.clone(); + let stats = self.stats.clone(); + let buffer_pool = self.buffer_pool.clone(); + + let (read_half, write_half) = self.stream.into_split(); + + let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( + &handshake, read_half, write_half, peer, + &config, &replay_checker, false, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader, writer } => { + stats.increment_connects_bad(); + handle_bad_client(reader, writer, &handshake, &config).await; + return Ok(()); + } + HandshakeResult::Error(e) => return Err(e), + }; + + Self::handle_authenticated_static( + crypto_reader, crypto_writer, success, + self.upstream_manager, self.stats, self.config, + buffer_pool, self.rng, self.me_pool, + ).await + } + + /// Main dispatch after successful handshake. + /// Two modes: + /// - Direct: TCP relay to TG DC (existing behavior) + /// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs) + async fn handle_authenticated_static( + client_reader: CryptoReader, + client_writer: CryptoWriter, + success: HandshakeSuccess, + upstream_manager: Arc, + stats: Arc, + config: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + ) -> Result<()> + where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + { + let user = &success.user; + + if let Err(e) = Self::check_user_limits_static(user, &config, &stats) { + warn!(user = %user, error = %e, "User limit exceeded"); + return Err(e); + } + + // Decide: middle proxy or direct + if config.general.use_middle_proxy { + if let Some(ref pool) = me_pool { + return Self::handle_via_middle_proxy( + client_reader, client_writer, success, + pool.clone(), stats, config, buffer_pool, + ).await; + } + warn!("use_middle_proxy=true but MePool not initialized, falling back to direct"); + } + + // Direct mode (original behavior) + Self::handle_via_direct( + client_reader, client_writer, success, + upstream_manager, stats, config, buffer_pool, rng, + ).await + } + + // ===================================================================== + // Direct mode — TCP relay to Telegram DC + // ===================================================================== + + async fn handle_via_direct( + client_reader: CryptoReader, + client_writer: CryptoWriter, + success: HandshakeSuccess, + upstream_manager: Arc, + stats: Arc, + config: Arc, + buffer_pool: Arc, + rng: Arc, + ) -> Result<()> + where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + { + let user = &success.user; + let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?; + + info!( + user = %user, + peer = %success.peer, + dc = success.dc_idx, + dc_addr = %dc_addr, + proto = ?success.proto_tag, + mode = "direct", + "Connecting to Telegram DC" + ); + + let tg_stream = upstream_manager.connect(dc_addr, Some(success.dc_idx)).await?; + + debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); + + let (tg_reader, tg_writer) = Self::do_tg_handshake_static( + tg_stream, &success, &config, rng.as_ref(), + ).await?; + + debug!(peer = %success.peer, "TG handshake complete, starting relay"); + + stats.increment_user_connects(user); + stats.increment_user_curr_connects(user); + + let relay_result = relay_bidirectional( + client_reader, client_writer, + tg_reader, tg_writer, + user, Arc::clone(&stats), buffer_pool, + ).await; + + stats.decrement_user_curr_connects(user); + + match &relay_result { + Ok(()) => debug!(user = %user, "Direct relay completed"), + Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"), + } + + relay_result + } + + // ===================================================================== + // Middle Proxy mode — RPC multiplex through ME pool + // ===================================================================== + + /// Middle Proxy RPC relay + /// + /// Architecture (matches C MTProxy): + /// ```text + /// Client ←AES-CTR→ [telemt] ←RPC/AES-CBC→ ME ←internal→ DC (any, incl CDN 203) + /// ``` + /// + /// Key difference from direct mode: + /// - No per-client TCP to DC; all clients share ME pool connections + /// - ME internally routes to correct DC based on client's encrypted auth_key_id + /// - CDN DCs (203+) work because ME knows their internal addresses + /// - We pass raw client MTProto bytes in RPC_PROXY_REQ envelope + /// - ME returns responses in RPC_PROXY_ANS envelope + async fn handle_via_middle_proxy( + mut client_reader: CryptoReader, + mut client_writer: CryptoWriter, + success: HandshakeSuccess, + me_pool: Arc, + stats: Arc, + config: Arc, + _buffer_pool: Arc, + ) -> Result<()> + where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + { + let user = success.user.clone(); + let peer = success.peer; + + info!( + user = %user, + peer = %peer, + dc = success.dc_idx, + proto = ?success.proto_tag, + mode = "middle_proxy", + "Routing via Middle-End" + ); + + // Register this client connection in ME demux registry + let (conn_id, mut me_rx) = me_pool.registry().register().await; + + // Our listening address for RPC_PROXY_REQ metadata + let our_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) + .parse().unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap()); + + stats.increment_user_connects(&user); + stats.increment_user_curr_connects(&user); + + debug!(user = %user, conn_id, "ME relay started"); + + // Bidirectional relay loop: client ↔ ME pool + // + // C→S direction: read raw bytes from client_reader, wrap in RPC_PROXY_REQ, send via ME + // S→C direction: receive MeResponse::Data from registry channel, write to client_writer + // + // We use tokio::select! to handle both directions concurrently. + // Unlike direct mode (copy_bidirectional on two TCP streams), + // here one side is a channel (mpsc::Receiver), not a stream. + + let mut client_buf = vec![0u8; 64 * 1024]; + let mut client_closed = false; + let mut server_closed = false; + + let result: Result<()> = loop { + tokio::select! { + // C→S: client sends data, we forward to ME + read_result = client_reader.read(&mut client_buf), if !client_closed => { + match read_result { + Ok(0) => { + debug!(conn_id, "Client EOF"); + client_closed = true; + if server_closed { break Ok(()); } + // Signal ME to close this connection + let _ = me_pool.send_close(conn_id).await; + } + Ok(n) => { + trace!(conn_id, bytes = n, "C→ME"); + stats.add_user_octets_from(&user, n as u64); + if let Err(e) = me_pool.send_proxy_req( + conn_id, peer, our_addr, &client_buf[..n] + ).await { + break Err(e); + } + } + Err(e) => { + debug!(conn_id, error = %e, "Client read error"); + break Err(ProxyError::Io(e)); + } + } + } + + // S→C: ME sends response, we forward to client + me_msg = me_rx.recv(), if !server_closed => { + match me_msg { + Some(MeResponse::Data(data)) => { + trace!(conn_id, bytes = data.len(), "ME→C"); + stats.add_user_octets_to(&user, data.len() as u64); + if let Err(e) = client_writer.write_all(&data).await { + debug!(conn_id, error = %e, "Client write error"); + break Err(ProxyError::Io(e)); + } + if let Err(e) = client_writer.flush().await { + break Err(ProxyError::Io(e)); + } + } + Some(MeResponse::Ack(_token)) => { + // QuickACK from ME — could forward to client as obfuscated ACK + // For now, just log + trace!(conn_id, "ME ACK (ignored)"); + } + Some(MeResponse::Close) => { + debug!(conn_id, "ME sent CLOSE"); + server_closed = true; + if client_closed { break Ok(()); } + } + None => { + // Channel closed — ME connection died + debug!(conn_id, "ME channel closed"); + server_closed = true; + if client_closed { break Ok(()); } + break Err(ProxyError::Proxy("ME connection lost".into())); + } + } + } + + // Both sides closed + else => { + break Ok(()); + } + } + }; + + // Cleanup + debug!(user = %user, conn_id, "ME relay cleanup"); + me_pool.registry().unregister(conn_id).await; + stats.decrement_user_curr_connects(&user); + + match &result { + Ok(()) => debug!(user = %user, conn_id, "ME relay completed"), + Err(e) => debug!(user = %user, conn_id, error = %e, "ME relay error"), + } + + result + } + + // ===================================================================== + // Helpers + // ===================================================================== + + fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { + if let Some(expiration) = config.access.user_expirations.get(user) { + if chrono::Utc::now() > *expiration { + return Err(ProxyError::UserExpired { user: user.to_string() }); + } + } + + if let Some(limit) = config.access.user_max_tcp_conns.get(user) { + if stats.get_user_curr_connects(user) >= *limit as u64 { + return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() }); + } + } + + if let Some(quota) = config.access.user_data_quota.get(user) { + if stats.get_user_total_octets(user) >= *quota { + return Err(ProxyError::DataQuotaExceeded { user: user.to_string() }); + } + } + + Ok(()) + } + + /// Resolve DC index to target address (used only in direct mode) + fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { + let datacenters = if config.general.prefer_ipv6 { + &*TG_DATACENTERS_V6 + } else { + &*TG_DATACENTERS_V4 + }; + + let num_dcs = datacenters.len(); + + let dc_key = dc_idx.to_string(); + if let Some(addr_str) = config.dc_overrides.get(&dc_key) { + match addr_str.parse::() { + Ok(addr) => { + debug!(dc_idx = dc_idx, addr = %addr, "Using DC override from config"); + return Ok(addr); + } + Err(_) => { + warn!(dc_idx = dc_idx, addr_str = %addr_str, + "Invalid DC override address in config, ignoring"); + } + } + } + + let abs_dc = dc_idx.unsigned_abs() as usize; + if abs_dc >= 1 && abs_dc <= num_dcs { + return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT)); + } + + let default_dc = config.default_dc.unwrap_or(2) as usize; + let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs { + default_dc - 1 + } else { + 1 + }; + + info!( + original_dc = dc_idx, + fallback_dc = (fallback_idx + 1) as u16, + fallback_addr = %datacenters[fallback_idx], + "Special DC ---> default_cluster" + ); + + Ok(SocketAddr::new(datacenters[fallback_idx], TG_DATACENTER_PORT)) + } + + /// Perform obfuscated handshake with Telegram DC (direct mode only) + async fn do_tg_handshake_static( + mut stream: TcpStream, + success: &HandshakeSuccess, + config: &ProxyConfig, + rng: &SecureRandom, + ) -> Result<(CryptoReader, CryptoWriter)> { + let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce( + success.proto_tag, + success.dc_idx, + &success.dec_key, + success.dec_iv, + rng, + config.general.fast_mode, + ); + + let encrypted_nonce = encrypt_tg_nonce(&nonce); + + debug!( + peer = %success.peer, + nonce_head = %hex::encode(&nonce[..16]), + "Sending nonce to Telegram" + ); + + stream.write_all(&encrypted_nonce).await?; + stream.flush().await?; + + let (read_half, write_half) = stream.into_split(); + + let decryptor = AesCtr::new(&tg_dec_key, tg_dec_iv); + let encryptor = AesCtr::new(&tg_enc_key, tg_enc_iv); + + Ok(( + CryptoReader::new(read_half, decryptor), + CryptoWriter::new(write_half, encryptor), + )) + } } - - async fn do_tg_handshake_static( - mut stream: TcpStream, - success: &HandshakeSuccess, - config: &ProxyConfig, - rng: &SecureRandom, - ) -> Result<(CryptoReader, CryptoWriter)> { - let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce( - success.proto_tag, - success.dc_idx, - &success.dec_key, - success.dec_iv, - rng, - config.general.fast_mode, - ); - - let encrypted_nonce = encrypt_tg_nonce(&nonce); - - debug!( - peer = %success.peer, - nonce_head = %hex::encode(&nonce[..16]), - "Sending nonce to Telegram" - ); - - stream.write_all(&encrypted_nonce).await?; - stream.flush().await?; - - let (read_half, write_half) = stream.into_split(); - - let decryptor = AesCtr::new(&tg_dec_key, tg_dec_iv); - let encryptor = AesCtr::new(&tg_enc_key, tg_enc_iv); - - Ok(( - CryptoReader::new(read_half, decryptor), - CryptoWriter::new(write_half, encryptor), - )) - } -} \ No newline at end of file + \ No newline at end of file diff --git a/src/transport/middle_proxy.rs b/src/transport/middle_proxy.rs index 68603bf..b2eb872 100644 --- a/src/transport/middle_proxy.rs +++ b/src/transport/middle_proxy.rs @@ -1,371 +1,860 @@ //! Middle Proxy RPC Transport - //! - //! Implements Telegram Middle-End RPC protocol for routing to ALL DCs (including CDN). - //! Uses existing crypto primitives from crate::crypto. - - use std::collections::HashMap; - use std::net::{IpAddr, SocketAddr}; - use std::sync::Arc; - use std::sync::atomic::{AtomicU64, Ordering}; - use std::time::Duration; - use bytes::{Bytes, BytesMut}; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use tokio::net::TcpStream; - use tokio::sync::{mpsc, Mutex, RwLock}; - use tokio::time::{timeout, Instant}; - use tracing::{debug, info, trace, warn}; - - use crate::crypto::{sha1, crc32, derive_middleproxy_keys, AesCbc, SecureRandom}; - use crate::error::{ProxyError, Result}; - use crate::protocol::constants::*; - - // ========== RPC Nonce (32 bytes) ========== - - fn build_nonce_packet(key_selector: u32, crypto_ts: u32, nonce: &[u8; 16]) -> [u8; 32] { - let mut p = [0u8; 32]; - p[0..4].copy_from_slice(&RPC_NONCE_U32.to_le_bytes()); - p[4..8].copy_from_slice(&key_selector.to_le_bytes()); - p[8..12].copy_from_slice(&RPC_CRYPTO_AES_U32.to_le_bytes()); - p[12..16].copy_from_slice(&crypto_ts.to_le_bytes()); - p[16..32].copy_from_slice(nonce); - p - } - - fn parse_nonce_response(d: &[u8; 32]) -> Result<(u32, u32, [u8; 16])> { - let t = u32::from_le_bytes([d[0], d[1], d[2], d[3]]); - if t != RPC_NONCE_U32 { - return Err(ProxyError::InvalidHandshake(format!("Expected RPC_NONCE, got 0x{:08x}", t))); +//! +//! Implements Telegram Middle-End RPC protocol for routing to ALL DCs (including CDN). +//! +//! ## Phase 3 fixes: +//! - ROOT CAUSE: Use Telegram proxy-secret (binary file) not user secret +//! - Streaming handshake response (no fixed-size read deadlock) +//! - Health monitoring + reconnection +//! - Hex diagnostics for debugging + +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; +use bytes::{Bytes, BytesMut}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::time::{timeout, Instant}; +use tracing::{debug, info, trace, warn, error}; + +use crate::crypto::{crc32, derive_middleproxy_keys, AesCbc, SecureRandom}; +use crate::error::{ProxyError, Result}; +use crate::protocol::constants::*; + +// ========== Proxy Secret Fetching ========== + +/// Fetch the Telegram proxy-secret binary file. +/// +/// This is NOT the user secret (-S flag, 16 bytes hex for clients). +/// This is the infrastructure secret (--aes-pwd in C MTProxy), +/// a binary file of 32-512 bytes used for ME RPC key derivation. +/// +/// Strategy: try local cache, then download from Telegram. +pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result> { + let cache = cache_path.unwrap_or("proxy-secret"); + + // 1. Try local cache (< 24h old) + if let Ok(metadata) = tokio::fs::metadata(cache).await { + if let Ok(modified) = metadata.modified() { + let age = std::time::SystemTime::now() + .duration_since(modified) + .unwrap_or(Duration::from_secs(u64::MAX)); + if age < Duration::from_secs(86400) { + if let Ok(data) = tokio::fs::read(cache).await { + if data.len() >= 32 { + info!( + path = cache, + len = data.len(), + age_hours = age.as_secs() / 3600, + "Loaded proxy-secret from cache" + ); + return Ok(data); + } + warn!(path = cache, len = data.len(), "Cached proxy-secret too short"); + } + } } - let schema = u32::from_le_bytes([d[8], d[9], d[10], d[11]]); - let ts = u32::from_le_bytes([d[12], d[13], d[14], d[15]]); - let mut nonce = [0u8; 16]; - nonce.copy_from_slice(&d[16..32]); - Ok((schema, ts, nonce)) } - - // ========== RPC Handshake (32 bytes) ========== - - fn build_handshake_packet() -> [u8; 32] { - let mut p = [0u8; 32]; - p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes()); - // flags=0, sender_pid with our PID - let pid = (std::process::id() & 0xFFFF) as u16; - p[14..16].copy_from_slice(&pid.to_le_bytes()); - let utime = std::time::SystemTime::now() + + // 2. Download from Telegram + info!("Downloading proxy-secret from core.telegram.org..."); + let data = download_proxy_secret().await?; + + // 3. Cache locally (best-effort) + if let Err(e) = tokio::fs::write(cache, &data).await { + warn!(error = %e, "Failed to cache proxy-secret (non-fatal)"); + } else { + debug!(path = cache, len = data.len(), "Cached proxy-secret"); + } + + Ok(data) +} + +async fn download_proxy_secret() -> Result> { + let url = "https://core.telegram.org/getProxySecret"; + let resp = reqwest::get(url) + .await + .map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {}", e)))?; + + if !resp.status().is_success() { + return Err(ProxyError::Proxy(format!( + "proxy-secret download HTTP {}", resp.status() + ))); + } + + let data = resp.bytes().await + .map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {}", e)))? + .to_vec(); + + if data.len() < 32 { + return Err(ProxyError::Proxy(format!( + "proxy-secret too short: {} bytes (need >= 32)", data.len() + ))); + } + + info!(len = data.len(), "Downloaded proxy-secret OK"); + Ok(data) +} + +// ========== RPC Frame helpers ========== + +/// Build an RPC frame: [len(4) | seq_no(4) | payload | crc32(4)] +fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec { + let total_len = (4 + 4 + payload.len() + 4) as u32; + let mut f = Vec::with_capacity(total_len as usize); + f.extend_from_slice(&total_len.to_le_bytes()); + f.extend_from_slice(&seq_no.to_le_bytes()); + f.extend_from_slice(payload); + let c = crc32(&f); + f.extend_from_slice(&c.to_le_bytes()); + f +} + +/// Read one plaintext RPC frame. Returns (seq_no, payload). +async fn read_rpc_frame_plaintext( + rd: &mut (impl AsyncReadExt + Unpin), +) -> Result<(i32, Vec)> { + let mut len_buf = [0u8; 4]; + rd.read_exact(&mut len_buf).await.map_err(ProxyError::Io)?; + let total_len = u32::from_le_bytes(len_buf) as usize; + + if total_len < 12 || total_len > (1 << 24) { + return Err(ProxyError::InvalidHandshake( + format!("Bad RPC frame length: {}", total_len), + )); + } + + let mut rest = vec![0u8; total_len - 4]; + rd.read_exact(&mut rest).await.map_err(ProxyError::Io)?; + + let mut full = Vec::with_capacity(total_len); + full.extend_from_slice(&len_buf); + full.extend_from_slice(&rest); + + let crc_offset = total_len - 4; + let expected_crc = u32::from_le_bytes([ + full[crc_offset], full[crc_offset + 1], + full[crc_offset + 2], full[crc_offset + 3], + ]); + let actual_crc = crc32(&full[..crc_offset]); + if expected_crc != actual_crc { + return Err(ProxyError::InvalidHandshake( + format!("CRC mismatch: 0x{:08x} vs 0x{:08x}", expected_crc, actual_crc), + )); + } + + let seq_no = i32::from_le_bytes([full[4], full[5], full[6], full[7]]); + let payload = full[8..crc_offset].to_vec(); + Ok((seq_no, payload)) +} + +// ========== RPC Nonce (32 bytes payload) ========== + +fn build_nonce_payload(key_selector: u32, crypto_ts: u32, nonce: &[u8; 16]) -> [u8; 32] { + let mut p = [0u8; 32]; + p[0..4].copy_from_slice(&RPC_NONCE_U32.to_le_bytes()); + p[4..8].copy_from_slice(&key_selector.to_le_bytes()); + p[8..12].copy_from_slice(&RPC_CRYPTO_AES_U32.to_le_bytes()); + p[12..16].copy_from_slice(&crypto_ts.to_le_bytes()); + p[16..32].copy_from_slice(nonce); + p +} + +fn parse_nonce_payload(d: &[u8]) -> Result<(u32, u32, [u8; 16])> { + if d.len() < 32 { + return Err(ProxyError::InvalidHandshake( + format!("Nonce payload too short: {} bytes", d.len()), + )); + } + let t = u32::from_le_bytes([d[0], d[1], d[2], d[3]]); + if t != RPC_NONCE_U32 { + return Err(ProxyError::InvalidHandshake( + format!("Expected RPC_NONCE 0x{:08x}, got 0x{:08x}", RPC_NONCE_U32, t), + )); + } + let schema = u32::from_le_bytes([d[8], d[9], d[10], d[11]]); + let ts = u32::from_le_bytes([d[12], d[13], d[14], d[15]]); + let mut nonce = [0u8; 16]; + nonce.copy_from_slice(&d[16..32]); + Ok((schema, ts, nonce)) +} + +// ========== RPC Handshake (32 bytes payload) ========== + +fn build_handshake_payload(our_ip: u32, our_port: u16, peer_ip: u32, peer_port: u16) -> [u8; 32] { + let mut p = [0u8; 32]; + p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes()); + // flags = 0 at offset 4..8 + + // sender_pid: {ip(4), port(2), pid(2), utime(4)} at offset 8..20 + p[8..12].copy_from_slice(&our_ip.to_le_bytes()); + p[12..14].copy_from_slice(&our_port.to_le_bytes()); + let pid = (std::process::id() & 0xFFFF) as u16; + p[14..16].copy_from_slice(&pid.to_le_bytes()); + let utime = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + p[16..20].copy_from_slice(&utime.to_le_bytes()); + + // peer_pid: {ip(4), port(2), pid(2), utime(4)} at offset 20..32 + p[20..24].copy_from_slice(&peer_ip.to_le_bytes()); + p[24..26].copy_from_slice(&peer_port.to_le_bytes()); + p +} + +// ========== CBC helpers ========== + +fn cbc_encrypt_padded(key: &[u8; 32], iv: &[u8; 16], plaintext: &[u8]) -> Result<(Vec, [u8; 16])> { + let pad = (16 - (plaintext.len() % 16)) % 16; + let mut buf = plaintext.to_vec(); + let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00]; + for i in 0..pad { + buf.push(pad_pattern[i % 4]); + } + let cipher = AesCbc::new(*key, *iv); + cipher.encrypt_in_place(&mut buf) + .map_err(|e| ProxyError::Crypto(format!("CBC encrypt: {}", e)))?; + let mut new_iv = [0u8; 16]; + if buf.len() >= 16 { + new_iv.copy_from_slice(&buf[buf.len() - 16..]); + } + Ok((buf, new_iv)) +} + +fn cbc_decrypt_inplace(key: &[u8; 32], iv: &[u8; 16], data: &mut [u8]) -> Result<[u8; 16]> { + let mut new_iv = [0u8; 16]; + if data.len() >= 16 { + new_iv.copy_from_slice(&data[data.len() - 16..]); + } + AesCbc::new(*key, *iv) + .decrypt_in_place(data) + .map_err(|e| ProxyError::Crypto(format!("CBC decrypt: {}", e)))?; + Ok(new_iv) +} + +// ========== IPv4 helpers ========== + +fn ipv4_to_mapped_v6(ip: Ipv4Addr) -> [u8; 16] { + let mut buf = [0u8; 16]; + buf[10] = 0xFF; + buf[11] = 0xFF; + let o = ip.octets(); + buf[12] = o[0]; buf[13] = o[1]; buf[14] = o[2]; buf[15] = o[3]; + buf +} + +fn addr_to_ip_u32(addr: &SocketAddr) -> u32 { + match addr.ip() { + IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()), + IpAddr::V6(v6) => { + if let Some(v4) = v6.to_ipv4_mapped() { + u32::from_le_bytes(v4.octets()) + } else { 0 } + } + } +} + +// ========== ME Response ========== + +#[derive(Debug)] +pub enum MeResponse { + Data(Bytes), + Ack(u32), + Close, +} + +// ========== Connection Registry ========== + +pub struct ConnRegistry { + map: RwLock>>, + next_id: AtomicU64, +} + +impl ConnRegistry { + pub fn new() -> Self { + Self { + map: RwLock::new(HashMap::new()), + next_id: AtomicU64::new(1), + } + } + pub async fn register(&self) -> (u64, mpsc::Receiver) { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let (tx, rx) = mpsc::channel(256); + self.map.write().await.insert(id, tx); + (id, rx) + } + pub async fn unregister(&self, id: u64) { + self.map.write().await.remove(&id); + } + pub async fn route(&self, id: u64, resp: MeResponse) -> bool { + let m = self.map.read().await; + if let Some(tx) = m.get(&id) { + tx.send(resp).await.is_ok() + } else { false } + } +} + +// ========== RPC Writer (streaming CBC) ========== + +struct RpcWriter { + writer: tokio::io::WriteHalf, + key: [u8; 32], + iv: [u8; 16], + seq_no: i32, +} + +impl RpcWriter { + async fn send(&mut self, payload: &[u8]) -> Result<()> { + let frame = build_rpc_frame(self.seq_no, payload); + self.seq_no += 1; + + let pad = (16 - (frame.len() % 16)) % 16; + let mut buf = frame; + let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00]; + for i in 0..pad { + buf.push(pad_pattern[i % 4]); + } + + let cipher = AesCbc::new(self.key, self.iv); + cipher.encrypt_in_place(&mut buf) + .map_err(|e| ProxyError::Crypto(format!("{}", e)))?; + + if buf.len() >= 16 { + self.iv.copy_from_slice(&buf[buf.len() - 16..]); + } + self.writer.write_all(&buf).await.map_err(ProxyError::Io) + } +} + +// ========== RPC_PROXY_REQ ========== + +fn build_proxy_req_payload( + conn_id: u64, + client_addr: SocketAddr, + our_addr: SocketAddr, + data: &[u8], + proxy_tag: Option<&[u8]>, +) -> Vec { + let mut flags: u32 = 0; + let has_tag = proxy_tag.is_some(); + if has_tag { + flags |= 8; // C: `flags |= 8` when proxy_tag_set + } + + let mut b = Vec::with_capacity(128 + data.len()); + b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); + b.extend_from_slice(&flags.to_le_bytes()); + b.extend_from_slice(&conn_id.to_le_bytes()); + + // Client IP (16 bytes IPv4-mapped-v6) + port (4 bytes) + match client_addr.ip() { + IpAddr::V4(v4) => b.extend_from_slice(&ipv4_to_mapped_v6(v4)), + IpAddr::V6(v6) => b.extend_from_slice(&v6.octets()), + } + b.extend_from_slice(&(client_addr.port() as u32).to_le_bytes()); + + // Our IP (16 bytes) + port (4 bytes) + match our_addr.ip() { + IpAddr::V4(v4) => b.extend_from_slice(&ipv4_to_mapped_v6(v4)), + IpAddr::V6(v6) => b.extend_from_slice(&v6.octets()), + } + b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes()); + + // Extra section (proxy_tag) + if flags & 12 != 0 { + let extra_start = b.len(); + b.extend_from_slice(&0u32.to_le_bytes()); // placeholder + + if let Some(tag) = proxy_tag { + b.extend_from_slice(&TL_PROXY_TAG_U32.to_le_bytes()); + // TL string encoding + if tag.len() < 254 { + b.push(tag.len() as u8); + b.extend_from_slice(tag); + let pad = (4 - ((1 + tag.len()) % 4)) % 4; + b.extend(std::iter::repeat(0u8).take(pad)); + } else { + b.push(0xfe); + let len_bytes = (tag.len() as u32).to_le_bytes(); + b.extend_from_slice(&len_bytes[..3]); + b.extend_from_slice(tag); + let pad = (4 - (tag.len() % 4)) % 4; + b.extend(std::iter::repeat(0u8).take(pad)); + } + } + + let extra_bytes = (b.len() - extra_start - 4) as u32; + let eb = extra_bytes.to_le_bytes(); + b[extra_start..extra_start + 4].copy_from_slice(&eb); + } + + b.extend_from_slice(data); + b +} + +// ========== ME Pool ========== + +pub struct MePool { + registry: Arc, + writers: RwLock>>>, + rr: AtomicU64, + proxy_tag: Option>, + /// Telegram proxy-secret (binary, 32-512 bytes) + proxy_secret: Vec, + pool_size: usize, +} + +impl MePool { + pub fn new(proxy_tag: Option>, proxy_secret: Vec) -> Arc { + Arc::new(Self { + registry: Arc::new(ConnRegistry::new()), + writers: RwLock::new(Vec::new()), + rr: AtomicU64::new(0), + proxy_tag, + proxy_secret, + pool_size: 2, + }) + } + + pub fn registry(&self) -> &Arc { + &self.registry + } + + /// key_selector = first 4 bytes of proxy-secret as LE u32 + /// C: main_secret.key_signature via union { char secret[]; int key_signature; } + fn key_selector(&self) -> u32 { + if self.proxy_secret.len() >= 4 { + u32::from_le_bytes([ + self.proxy_secret[0], self.proxy_secret[1], + self.proxy_secret[2], self.proxy_secret[3], + ]) + } else { 0 } + } + + pub async fn init( + self: &Arc, + pool_size: usize, + rng: &SecureRandom, + ) -> Result<()> { + let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4; + let ks = self.key_selector(); + info!( + me_servers = addrs.len(), + pool_size, + key_selector = format_args!("0x{:08x}", ks), + secret_len = self.proxy_secret.len(), + "Initializing ME pool" + ); + + for &(ip, port) in addrs.iter() { + for i in 0..pool_size { + let addr = SocketAddr::new(ip, port); + match self.connect_one(addr, rng).await { + Ok(()) => info!(%addr, idx = i, "ME connected"), + Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"), + } + } + if self.writers.read().await.len() >= pool_size { + break; + } + } + + if self.writers.read().await.is_empty() { + return Err(ProxyError::Proxy("No ME connections".into())); + } + Ok(()) + } + + async fn connect_one( + self: &Arc, + addr: SocketAddr, + rng: &SecureRandom, + ) -> Result<()> { + let secret = &self.proxy_secret; + if secret.len() < 32 { + return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into())); + } + + // ===== TCP connect ===== + let stream = timeout( + Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), + TcpStream::connect(addr), + ) + .await + .map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })? + .map_err(ProxyError::Io)?; + stream.set_nodelay(true).ok(); + + let local_addr = stream.local_addr().map_err(ProxyError::Io)?; + let peer_addr = stream.peer_addr().map_err(ProxyError::Io)?; + let (mut rd, mut wr) = tokio::io::split(stream); + + // ===== 1. Send RPC nonce (plaintext, seq=-2) ===== + let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap(); + let crypto_ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default().as_secs() as u32; - p[16..20].copy_from_slice(&utime.to_le_bytes()); - p + .unwrap_or_default() + .as_secs() as u32; + let ks = self.key_selector(); + + let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce); + let nonce_frame = build_rpc_frame(-2, &nonce_payload); + + debug!( + %addr, + frame_len = nonce_frame.len(), + key_sel = format_args!("0x{:08x}", ks), + crypto_ts, + "Sending nonce" + ); + + wr.write_all(&nonce_frame).await.map_err(ProxyError::Io)?; + wr.flush().await.map_err(ProxyError::Io)?; + + // ===== 2. Read server nonce (plaintext, seq=-2) ===== + let (srv_seq, srv_nonce_payload) = timeout( + Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), + read_rpc_frame_plaintext(&mut rd), + ) + .await + .map_err(|_| ProxyError::TgHandshakeTimeout)??; + + if srv_seq != -2 { + return Err(ProxyError::InvalidHandshake( + format!("Expected seq=-2, got {}", srv_seq), + )); + } + + let (schema, _srv_ts, srv_nonce) = parse_nonce_payload(&srv_nonce_payload)?; + if schema != RPC_CRYPTO_AES_U32 { + return Err(ProxyError::InvalidHandshake( + format!("Unsupported crypto schema: 0x{:x}", schema), + )); + } + + debug!(%addr, "Nonce exchange OK, deriving keys"); + + // ===== 3. Derive AES-256-CBC keys ===== + // C buffer layout: + // [0..16] nonce_server (srv_nonce) + // [16..32] nonce_client (my_nonce) + // [32..36] client_timestamp + // [36..40] server_ip + // [40..42] client_port + // [42..48] "CLIENT" or "SERVER" + // [48..52] client_ip + // [52..54] server_port + // [54..54+N] secret (proxy-secret binary) + // [54+N..70+N] nonce_server + // nonce_client(16) + + let ts_bytes = crypto_ts.to_le_bytes(); + let server_ip = addr_to_ip_u32(&peer_addr); + let client_ip = addr_to_ip_u32(&local_addr); + let server_ip_bytes = server_ip.to_le_bytes(); + let client_ip_bytes = client_ip.to_le_bytes(); + let server_port_bytes = peer_addr.port().to_le_bytes(); + let client_port_bytes = local_addr.port().to_le_bytes(); + + let (wk, wi) = derive_middleproxy_keys( + &srv_nonce, &my_nonce, &ts_bytes, + Some(&server_ip_bytes), &client_port_bytes, + b"CLIENT", + Some(&client_ip_bytes), &server_port_bytes, + secret, None, None, + ); + let (rk, ri) = derive_middleproxy_keys( + &srv_nonce, &my_nonce, &ts_bytes, + Some(&server_ip_bytes), &client_port_bytes, + b"SERVER", + Some(&client_ip_bytes), &server_port_bytes, + secret, None, None, + ); + + debug!( + %addr, + write_key = %hex::encode(&wk[..8]), + read_key = %hex::encode(&rk[..8]), + "Keys derived" + ); + + // ===== 4. Send encrypted handshake (seq=-1) ===== + let hs_payload = build_handshake_payload( + client_ip, local_addr.port(), + server_ip, peer_addr.port(), + ); + let hs_frame = build_rpc_frame(-1, &hs_payload); + let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?; + wr.write_all(&encrypted_hs).await.map_err(ProxyError::Io)?; + wr.flush().await.map_err(ProxyError::Io)?; + + debug!(%addr, enc_len = encrypted_hs.len(), "Sent encrypted handshake"); + + // ===== 5. Read encrypted handshake response (STREAMING) ===== + // Server sends encrypted handshake. C crypto layer may send partial + // blocks (only complete 16-byte blocks get encrypted at a time). + // We read incrementally and decrypt block-by-block. + let deadline = Instant::now() + Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS); + let mut enc_buf = BytesMut::with_capacity(256); + let mut dec_buf = BytesMut::with_capacity(256); + let mut read_iv = ri; + let mut handshake_ok = false; + + while Instant::now() < deadline && !handshake_ok { + let remaining = deadline - Instant::now(); + let mut tmp = [0u8; 256]; + let n = match timeout(remaining, rd.read(&mut tmp)).await { + Ok(Ok(0)) => return Err(ProxyError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, "ME closed during handshake", + ))), + Ok(Ok(n)) => n, + Ok(Err(e)) => return Err(ProxyError::Io(e)), + Err(_) => return Err(ProxyError::TgHandshakeTimeout), + }; + enc_buf.extend_from_slice(&tmp[..n]); + + // Decrypt complete 16-byte blocks + let blocks = enc_buf.len() / 16 * 16; + if blocks > 0 { + let mut chunk = vec![0u8; blocks]; + chunk.copy_from_slice(&enc_buf[..blocks]); + let new_iv = cbc_decrypt_inplace(&rk, &read_iv, &mut chunk)?; + read_iv = new_iv; + dec_buf.extend_from_slice(&chunk); + let _ = enc_buf.split_to(blocks); + } + + // Try to parse RPC frame from decrypted data + while dec_buf.len() >= 4 { + let fl = u32::from_le_bytes([ + dec_buf[0], dec_buf[1], dec_buf[2], dec_buf[3], + ]) as usize; + + // Skip noop padding + if fl == 4 { + let _ = dec_buf.split_to(4); + continue; + } + if fl < 12 || fl > (1 << 24) { + return Err(ProxyError::InvalidHandshake( + format!("Bad HS response frame len: {}", fl), + )); + } + if dec_buf.len() < fl { + break; // need more data + } + + let frame = dec_buf.split_to(fl); + + // CRC32 check + let pe = fl - 4; + let ec = u32::from_le_bytes([ + frame[pe], frame[pe + 1], frame[pe + 2], frame[pe + 3], + ]); + let ac = crc32(&frame[..pe]); + if ec != ac { + return Err(ProxyError::InvalidHandshake( + format!("HS CRC mismatch: 0x{:08x} vs 0x{:08x}", ec, ac), + )); + } + + // Check type + let hs_type = u32::from_le_bytes([ + frame[8], frame[9], frame[10], frame[11], + ]); + if hs_type == RPC_HANDSHAKE_ERROR_U32 { + let err_code = if frame.len() >= 16 { + i32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]]) + } else { -1 }; + return Err(ProxyError::InvalidHandshake( + format!("ME rejected handshake (error={})", err_code), + )); + } + if hs_type != RPC_HANDSHAKE_U32 { + return Err(ProxyError::InvalidHandshake( + format!("Expected HANDSHAKE 0x{:08x}, got 0x{:08x}", RPC_HANDSHAKE_U32, hs_type), + )); + } + + handshake_ok = true; + break; + } + } + + if !handshake_ok { + return Err(ProxyError::TgHandshakeTimeout); + } + + info!(%addr, "RPC handshake OK"); + + // ===== 6. Setup writer + reader ===== + self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter { + writer: wr, + key: wk, + iv: write_iv, + seq_no: 0, + }))); + + let reg = self.registry.clone(); + tokio::spawn(async move { + if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf).await { + warn!(error = %e, "ME reader ended"); + } + }); + + Ok(()) } - - // ========== CRC32 RPC Frame ========== - - fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec { - let total = (4 + 4 + payload.len() + 4) as u32; - let mut f = Vec::with_capacity(total as usize); - f.extend_from_slice(&total.to_le_bytes()); - f.extend_from_slice(&seq_no.to_le_bytes()); - f.extend_from_slice(payload); - let c = crc32(&f); - f.extend_from_slice(&c.to_le_bytes()); - f - } - - // ========== RPC_PROXY_REQ ========== - - fn build_proxy_req_payload( + + pub async fn send_proxy_req( + &self, conn_id: u64, client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8], - proxy_tag: Option<&[u8]>, - ) -> Vec { - let mut flags: u32 = proxy_flags::FLAG_HAS_AD_TAG2 | proxy_flags::FLAG_EXTMODE2; - if proxy_tag.is_some() { - flags |= proxy_flags::FLAG_HAS_AD_TAG; + ) -> Result<()> { + let ws = self.writers.read().await; + if ws.is_empty() { + return Err(ProxyError::Proxy("No ME connections".into())); } - - let extra_words: u32 = if let Some(tag) = proxy_tag { - let tl_len = 1 + tag.len(); - let padded = (tl_len + 3) / 4; - (1 + padded) as u32 - } else { 0 }; - - let mut b = Vec::with_capacity(64 + data.len()); - b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); - b.extend_from_slice(&flags.to_le_bytes()); - b.extend_from_slice(&conn_id.to_le_bytes()); - - // Client IP - match client_addr.ip() { - IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()), - IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()), - } - b.extend_from_slice(&(client_addr.port() as u32).to_le_bytes()); - // Our IP - match our_addr.ip() { - IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()), - IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()), - } - b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes()); - b.extend_from_slice(&extra_words.to_le_bytes()); - - if let Some(tag) = proxy_tag { - b.extend_from_slice(&TL_PROXY_TAG_U32.to_le_bytes()); - b.push(tag.len() as u8); - b.extend_from_slice(tag); - let pad = (4 - ((1 + tag.len()) % 4)) % 4; - b.extend(std::iter::repeat(0u8).take(pad)); - } - - b.extend_from_slice(data); - b + let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone(); + drop(ws); + let payload = build_proxy_req_payload( + conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref(), + ); + w.lock().await.send(&payload).await } - - // ========== ME Response ========== - - #[derive(Debug)] - pub enum MeResponse { - Data(Bytes), - Ack(u32), - Close, - } - - // ========== Connection Registry ========== - - pub struct ConnRegistry { - map: RwLock>>, - next_id: AtomicU64, - } - - impl ConnRegistry { - pub fn new() -> Self { - Self { map: RwLock::new(HashMap::new()), next_id: AtomicU64::new(1) } - } - pub async fn register(&self) -> (u64, mpsc::Receiver) { - let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = mpsc::channel(256); - self.map.write().await.insert(id, tx); - (id, rx) - } - pub async fn unregister(&self, id: u64) { - self.map.write().await.remove(&id); - } - pub async fn route(&self, id: u64, resp: MeResponse) -> bool { - let m = self.map.read().await; - if let Some(tx) = m.get(&id) { tx.send(resp).await.is_ok() } else { false } + + pub async fn send_close(&self, conn_id: u64) -> Result<()> { + let ws = self.writers.read().await; + if !ws.is_empty() { + let w = ws[0].clone(); + drop(ws); + let mut p = Vec::with_capacity(12); + p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); + p.extend_from_slice(&conn_id.to_le_bytes()); + let _ = w.lock().await.send(&p).await; } + self.registry.unregister(conn_id).await; + Ok(()) } - - // ========== RPC Writer (streaming CBC) ========== - - struct RpcWriter { - writer: tokio::io::WriteHalf, - key: [u8; 32], - iv: [u8; 16], - seq_no: i32, + + pub fn connection_count(&self) -> usize { + self.writers.try_read().map(|w| w.len()).unwrap_or(0) } - - impl RpcWriter { - async fn send(&mut self, payload: &[u8]) -> Result<()> { - let frame = build_rpc_frame(self.seq_no, payload); - self.seq_no += 1; - let pad = (16 - (frame.len() % 16)) % 16; - let mut buf = frame; - buf.extend(std::iter::repeat(0u8).take(pad)); - - let cipher = AesCbc::new(self.key, self.iv); - cipher.encrypt_in_place(&mut buf) +} + +// ========== Reader Loop ========== + +async fn reader_loop( + mut rd: tokio::io::ReadHalf, + dk: [u8; 32], + mut div: [u8; 16], + reg: Arc, + mut enc_leftover: BytesMut, + mut dec: BytesMut, +) -> Result<()> { + let mut raw = enc_leftover; + loop { + let mut tmp = [0u8; 16384]; + let n = rd.read(&mut tmp).await.map_err(ProxyError::Io)?; + if n == 0 { return Ok(()); } + raw.extend_from_slice(&tmp[..n]); + + // Decrypt complete 16-byte blocks + let blocks = raw.len() / 16 * 16; + if blocks > 0 { + let mut new_iv = [0u8; 16]; + new_iv.copy_from_slice(&raw[blocks - 16..blocks]); + let mut chunk = vec![0u8; blocks]; + chunk.copy_from_slice(&raw[..blocks]); + AesCbc::new(dk, div) + .decrypt_in_place(&mut chunk) .map_err(|e| ProxyError::Crypto(format!("{}", e)))?; - if buf.len() >= 16 { - self.iv.copy_from_slice(&buf[buf.len() - 16..]); + div = new_iv; + dec.extend_from_slice(&chunk); + let _ = raw.split_to(blocks); + } + + // Parse RPC frames + while dec.len() >= 12 { + let fl = u32::from_le_bytes([dec[0], dec[1], dec[2], dec[3]]) as usize; + if fl == 4 { let _ = dec.split_to(4); continue; } + if fl < 12 || fl > (1 << 24) { + warn!(frame_len = fl, "Invalid RPC frame len"); + dec.clear(); + break; + } + if dec.len() < fl { break; } + + let frame = dec.split_to(fl); + let pe = fl - 4; + let ec = u32::from_le_bytes([frame[pe], frame[pe+1], frame[pe+2], frame[pe+3]]); + if crc32(&frame[..pe]) != ec { + warn!("CRC mismatch in data frame"); + continue; + } + + let payload = &frame[8..pe]; + if payload.len() < 4 { continue; } + let pt = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]); + let body = &payload[4..]; + + if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 { + let flags = u32::from_le_bytes(body[0..4].try_into().unwrap()); + let cid = u64::from_le_bytes(body[4..12].try_into().unwrap()); + let data = Bytes::copy_from_slice(&body[12..]); + trace!(cid, len = data.len(), flags, "ANS"); + reg.route(cid, MeResponse::Data(data)).await; + } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { + let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); + let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap()); + trace!(cid, cfm, "ACK"); + reg.route(cid, MeResponse::Ack(cfm)).await; + } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { + let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); + debug!(cid, "CLOSE_EXT from ME"); + reg.route(cid, MeResponse::Close).await; + reg.unregister(cid).await; + } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { + let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); + debug!(cid, "CLOSE_CONN from ME"); + reg.route(cid, MeResponse::Close).await; + reg.unregister(cid).await; + } else { + debug!(rpc_type = format_args!("0x{:08x}", pt), len = body.len(), "Unknown RPC"); } - self.writer.write_all(&buf).await.map_err(ProxyError::Io) } } - - // ========== ME Pool ========== - - pub struct MePool { - registry: Arc, - writers: RwLock>>>, - rr: AtomicU64, - proxy_tag: Option>, - } - - impl MePool { - pub fn new(proxy_tag: Option>) -> Arc { - Arc::new(Self { - registry: Arc::new(ConnRegistry::new()), - writers: RwLock::new(Vec::new()), - rr: AtomicU64::new(0), - proxy_tag, - }) - } - pub fn registry(&self) -> &Arc { &self.registry } - - pub async fn init(self: &Arc, pool_size: usize, secret: &[u8], rng: &SecureRandom) -> Result<()> { - let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4; - info!(me_servers = addrs.len(), pool_size, "Initializing ME pool"); - for &(ip, port) in addrs.iter().take(3) { - for i in 0..pool_size { +} + +// ========== Health Monitor (Phase 4) ========== + +pub async fn me_health_monitor( + pool: Arc, + rng: Arc, + min_connections: usize, +) { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + let current = pool.writers.read().await.len(); + if current < min_connections { + warn!(current, min = min_connections, "ME pool below minimum, reconnecting..."); + let addrs = TG_MIDDLE_PROXIES_FLAT_V4.clone(); + for &(ip, port) in addrs.iter() { + let needed = min_connections.saturating_sub(pool.writers.read().await.len()); + if needed == 0 { break; } + for _ in 0..needed { let addr = SocketAddr::new(ip, port); - match self.connect(addr, secret, rng).await { - Ok(()) => info!(%addr, idx = i, "ME connected"), - Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"), + match pool.connect_one(addr, &rng).await { + Ok(()) => info!(%addr, "ME reconnected"), + Err(e) => debug!(%addr, error = %e, "ME reconnect failed"), } } } - if self.writers.read().await.is_empty() { - return Err(ProxyError::Proxy("No ME connections".into())); - } - Ok(()) - } - - async fn connect(self: &Arc, addr: SocketAddr, secret: &[u8], rng: &SecureRandom) -> Result<()> { - let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), TcpStream::connect(addr)) - .await.map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })? - .map_err(ProxyError::Io)?; - stream.set_nodelay(true).ok(); - let (mut rd, mut wr) = tokio::io::split(stream); - - // Nonce exchange - let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap(); - let crypto_ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs() as u32; - let sh = sha1(secret); - let ks = u32::from_le_bytes([sh[0], sh[1], sh[2], sh[3]]); - wr.write_all(&build_nonce_packet(ks, crypto_ts, &my_nonce)).await.map_err(ProxyError::Io)?; - - let mut resp = [0u8; 32]; - timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut resp)) - .await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?; - let (schema, _srv_ts, srv_nonce) = parse_nonce_response(&resp)?; - if schema != RPC_CRYPTO_AES_U32 { - return Err(ProxyError::InvalidHandshake(format!("Unsupported crypto: {}", schema))); - } - - // Key derivation via existing derive_middleproxy_keys - let ts_bytes = crypto_ts.to_le_bytes(); - let (wk, wi) = derive_middleproxy_keys( - &srv_nonce, &my_nonce, &ts_bytes, - None, &0u16.to_le_bytes(), b"CLIENT", None, &0u16.to_le_bytes(), secret, None, None, - ); - let (rk, ri) = derive_middleproxy_keys( - &srv_nonce, &my_nonce, &ts_bytes, - None, &0u16.to_le_bytes(), b"SERVER", None, &0u16.to_le_bytes(), secret, None, None, - ); - - // Handshake - wr.write_all(&build_handshake_packet()).await.map_err(ProxyError::Io)?; - let mut hs = [0u8; 32]; - timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut hs)) - .await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?; - let ht = u32::from_le_bytes([hs[0], hs[1], hs[2], hs[3]]); - if ht == RPC_HANDSHAKE_ERROR_U32 { - return Err(ProxyError::InvalidHandshake("ME rejected handshake".into())); - } - if ht != RPC_HANDSHAKE_U32 { - return Err(ProxyError::InvalidHandshake(format!("Got 0x{:08x}", ht))); - } - - info!(%addr, "RPC handshake OK"); - - self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter { writer: wr, key: wk, iv: wi, seq_no: 0 }))); - - let reg = self.registry.clone(); - tokio::spawn(async move { - if let Err(e) = reader_loop(rd, rk, ri, reg).await { - warn!(error = %e, "ME reader ended"); - } - }); - Ok(()) - } - - pub async fn send_proxy_req(&self, conn_id: u64, client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8]) -> Result<()> { - let ws = self.writers.read().await; - if ws.is_empty() { return Err(ProxyError::Proxy("No ME connections".into())); } - let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone(); - drop(ws); - let payload = build_proxy_req_payload(conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref()); - w.lock().await.send(&payload).await - } - - pub async fn send_close(&self, conn_id: u64) -> Result<()> { - let ws = self.writers.read().await; - if !ws.is_empty() { - let w = ws[0].clone(); - drop(ws); - let mut p = Vec::with_capacity(12); - p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); - p.extend_from_slice(&conn_id.to_le_bytes()); - let _ = w.lock().await.send(&p).await; - } - self.registry.unregister(conn_id).await; - Ok(()) } } - - // ========== Reader Loop ========== - - async fn reader_loop( - mut rd: tokio::io::ReadHalf, - dk: [u8; 32], mut div: [u8; 16], - reg: Arc, - ) -> Result<()> { - let mut raw = BytesMut::with_capacity(65536); - let mut dec = BytesMut::new(); - loop { - let mut tmp = [0u8; 16384]; - let n = rd.read(&mut tmp).await.map_err(ProxyError::Io)?; - if n == 0 { return Ok(()); } - raw.extend_from_slice(&tmp[..n]); - - let blocks = raw.len() / 16 * 16; - if blocks > 0 { - let mut new_iv = [0u8; 16]; - new_iv.copy_from_slice(&raw[blocks - 16..blocks]); - let mut chunk = vec![0u8; blocks]; - chunk.copy_from_slice(&raw[..blocks]); - AesCbc::new(dk, div).decrypt_in_place(&mut chunk) - .map_err(|e| ProxyError::Crypto(format!("{}", e)))?; - div = new_iv; - dec.extend_from_slice(&chunk); - let _ = raw.split_to(blocks); - } - - while dec.len() >= 12 { - let fl = u32::from_le_bytes([dec[0], dec[1], dec[2], dec[3]]) as usize; - if fl == 4 { let _ = dec.split_to(4); continue; } - if fl < 12 || fl > (1 << 24) { dec.clear(); break; } - if dec.len() < fl { break; } - let frame = dec.split_to(fl); - // CRC32 check - let pe = fl - 4; - let ec = u32::from_le_bytes([frame[pe], frame[pe+1], frame[pe+2], frame[pe+3]]); - if crc32(&frame[..pe]) != ec { warn!("CRC mismatch"); continue; } - let payload = &frame[8..pe]; - if payload.len() < 4 { continue; } - let pt = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]); - let body = &payload[4..]; - - if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 { - let flags = u32::from_le_bytes(body[0..4].try_into().unwrap()); - let cid = u64::from_le_bytes(body[4..12].try_into().unwrap()); - let data = Bytes::copy_from_slice(&body[12..]); - trace!(cid, len = data.len(), flags, "ANS"); - reg.route(cid, MeResponse::Data(data)).await; - } else if pt == RPC_SIMPLE_ACK_U32 && body.len() >= 12 { - let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); - let cfm = u32::from_le_bytes(body[8..12].try_into().unwrap()); - trace!(cid, cfm, "ACK"); - reg.route(cid, MeResponse::Ack(cfm)).await; - } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { - let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); - debug!(cid, "CLOSE_EXT"); - reg.route(cid, MeResponse::Close).await; - reg.unregister(cid).await; - } - } - } - } - \ No newline at end of file +}