Key derivation + me_health_monitor + QuickACK

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-13 12:51:49 +03:00
parent a494dfa9eb
commit f1c1f42de8
4 changed files with 1771 additions and 1018 deletions

View File

@@ -117,6 +117,11 @@ pub struct GeneralConfig {
#[serde(default)] #[serde(default)]
pub ad_tag: Option<String>, pub ad_tag: Option<String>,
/// Path to proxy-secret binary file (auto-downloaded if absent).
/// Infrastructure secret from https://core.telegram.org/getProxySecret
#[serde(default)]
pub proxy_secret_path: Option<String>,
#[serde(default)] #[serde(default)]
pub log_level: LogLevel, pub log_level: LogLevel,
} }
@@ -129,6 +134,7 @@ impl Default for GeneralConfig {
fast_mode: true, fast_mode: true,
use_middle_proxy: false, use_middle_proxy: false,
ad_tag: None, ad_tag: None,
proxy_secret_path: None,
log_level: LogLevel::Normal, log_level: LogLevel::Normal,
} }
} }

View File

@@ -1,4 +1,4 @@
//! Telemt - MTProxy on Rust //! telemt — Telegram MTProto Proxy
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@@ -25,6 +25,7 @@ use crate::proxy::ClientHandler;
use crate::stats::{Stats, ReplayChecker}; use crate::stats::{Stats, ReplayChecker};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::transport::{create_listener, ListenOptions, UpstreamManager}; use crate::transport::{create_listener, ListenOptions, UpstreamManager};
use crate::transport::middle_proxy::MePool;
use crate::util::ip::detect_ip; use crate::util::ip::detect_ip;
use crate::stream::BufferPool; use crate::stream::BufferPool;
@@ -83,7 +84,7 @@ fn parse_cli() -> (String, bool, Option<String>) {
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (config_path, cli_silent, cli_log_level) = parse_cli(); let (config_path, cli_silent, cli_log_level) = parse_cli();
let config = match ProxyConfig::load(&config_path) { let config = match ProxyConfig::load(&config_path) {
@@ -115,8 +116,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
config.general.log_level.clone() config.general.log_level.clone()
}; };
// Start with INFO so startup messages are always visible,
// then switch to user-configured level after startup
let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
tracing_subscriber::registry() tracing_subscriber::registry()
.with(filter_layer) .with(filter_layer)
@@ -147,6 +146,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
let prefer_ipv6 = config.general.prefer_ipv6; let prefer_ipv6 = config.general.prefer_ipv6;
let use_middle_proxy = config.general.use_middle_proxy;
let config = Arc::new(config); let config = Arc::new(config);
let stats = Arc::new(Stats::new()); let stats = Arc::new(Stats::new());
let rng = Arc::new(SecureRandom::new()); let rng = Arc::new(SecureRandom::new());
@@ -159,12 +159,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
// Connection concurrency limit — prevents OOM under SYN flood / connection storm. // Connection concurrency limit
// 10000 is generous; each connection uses ~64KB (2x 16KB relay buffers + overhead). let _max_connections = Arc::new(Semaphore::new(10_000));
// 10000 connections ≈ 640MB peak memory.
let max_connections = Arc::new(Semaphore::new(10_000));
// Startup DC ping // =====================================================================
// Middle Proxy initialization (if enabled)
// =====================================================================
let me_pool: Option<Arc<MePool>> = if use_middle_proxy {
info!("=== Middle Proxy Mode ===");
// ad_tag (proxy_tag) for advertising
let proxy_tag = config.general.ad_tag.as_ref().map(|tag| {
hex::decode(tag).unwrap_or_else(|_| {
warn!("Invalid ad_tag hex, middle proxy ad_tag will be empty");
Vec::new()
})
});
// =============================================================
// CRITICAL: Download Telegram proxy-secret (NOT user secret!)
//
// C MTProxy uses TWO separate secrets:
// -S flag = 16-byte user secret for client obfuscation
// --aes-pwd = 32-512 byte binary file for ME RPC auth
//
// proxy-secret is from: https://core.telegram.org/getProxySecret
// =============================================================
let proxy_secret_path = config.general.proxy_secret_path.as_deref();
match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await {
Ok(proxy_secret) => {
info!(
secret_len = proxy_secret.len(),
key_sig = format_args!("0x{:08x}",
if proxy_secret.len() >= 4 {
u32::from_le_bytes([proxy_secret[0], proxy_secret[1],
proxy_secret[2], proxy_secret[3]])
} else { 0 }),
"Proxy-secret loaded"
);
let pool = MePool::new(proxy_tag, proxy_secret);
match pool.init(2, &rng).await {
Ok(()) => {
info!("Middle-End pool initialized successfully");
// Phase 4: Start health monitor
let pool_clone = pool.clone();
let rng_clone = rng.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, 2,
).await;
});
Some(pool)
}
Err(e) => {
error!(error = %e, "Failed to initialize ME pool. Falling back to direct mode.");
None
}
}
}
Err(e) => {
error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode.");
None
}
}
} else {
None
};
if me_pool.is_some() {
info!("Transport: Middle Proxy (supports all DCs including CDN)");
} else {
info!("Transport: Direct TCP (standard DCs only)");
}
// Startup DC ping (only meaningful in direct mode)
if me_pool.is_none() {
info!("=== Telegram DC Connectivity ==="); info!("=== Telegram DC Connectivity ===");
let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await; let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await;
for upstream_result in &ping_results { for upstream_result in &ping_results {
@@ -184,6 +257,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
} }
info!("================================"); info!("================================");
}
// Background tasks // Background tasks
let um_clone = upstream_manager.clone(); let um_clone = upstream_manager.clone();
@@ -274,6 +348,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
let buffer_pool = buffer_pool.clone(); let buffer_pool = buffer_pool.clone();
let rng = rng.clone(); let rng = rng.clone();
let me_pool = me_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@@ -285,11 +360,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let replay_checker = replay_checker.clone(); let replay_checker = replay_checker.clone();
let buffer_pool = buffer_pool.clone(); let buffer_pool = buffer_pool.clone();
let rng = rng.clone(); let rng = rng.clone();
let me_pool = me_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = ClientHandler::new( if let Err(e) = ClientHandler::new(
stream, peer_addr, config, stats, stream, peer_addr, config, stats,
upstream_manager, replay_checker, buffer_pool, rng upstream_manager, replay_checker, buffer_pool, rng,
me_pool,
).run().await { ).run().await {
debug!(peer = %peer_addr, error = %e, "Connection error"); debug!(peer = %peer_addr, error = %e, "Connection error");
} }
@@ -311,3 +388,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }

View File

@@ -14,6 +14,7 @@ use crate::protocol::constants::*;
use crate::protocol::tls; use crate::protocol::tls;
use crate::stats::{Stats, ReplayChecker}; use crate::stats::{Stats, ReplayChecker};
use crate::transport::{configure_client_socket, UpstreamManager}; use crate::transport::{configure_client_socket, UpstreamManager};
use crate::transport::middle_proxy::{MePool, MeResponse};
use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool}; use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter, BufferPool};
use crate::crypto::{AesCtr, SecureRandom}; use crate::crypto::{AesCtr, SecureRandom};
@@ -35,6 +36,7 @@ pub struct RunningClientHandler {
upstream_manager: Arc<UpstreamManager>, upstream_manager: Arc<UpstreamManager>,
buffer_pool: Arc<BufferPool>, buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
} }
impl ClientHandler { impl ClientHandler {
@@ -47,10 +49,11 @@ impl ClientHandler {
replay_checker: Arc<ReplayChecker>, replay_checker: Arc<ReplayChecker>,
buffer_pool: Arc<BufferPool>, buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
) -> RunningClientHandler { ) -> RunningClientHandler {
RunningClientHandler { RunningClientHandler {
stream, peer, config, stats, replay_checker, stream, peer, config, stats, replay_checker,
upstream_manager, buffer_pool, rng, upstream_manager, buffer_pool, rng, me_pool,
} }
} }
} }
@@ -168,7 +171,7 @@ impl RunningClientHandler {
Self::handle_authenticated_static( Self::handle_authenticated_static(
crypto_reader, crypto_writer, success, crypto_reader, crypto_writer, success,
self.upstream_manager, self.stats, self.config, self.upstream_manager, self.stats, self.config,
buffer_pool, self.rng, buffer_pool, self.rng, self.me_pool,
).await ).await
} }
@@ -210,10 +213,14 @@ impl RunningClientHandler {
Self::handle_authenticated_static( Self::handle_authenticated_static(
crypto_reader, crypto_writer, success, crypto_reader, crypto_writer, success,
self.upstream_manager, self.stats, self.config, self.upstream_manager, self.stats, self.config,
buffer_pool, self.rng, buffer_pool, self.rng, self.me_pool,
).await ).await
} }
/// Main dispatch after successful handshake.
/// Two modes:
/// - Direct: TCP relay to TG DC (existing behavior)
/// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs)
async fn handle_authenticated_static<R, W>( async fn handle_authenticated_static<R, W>(
client_reader: CryptoReader<R>, client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>, client_writer: CryptoWriter<W>,
@@ -223,18 +230,56 @@ impl RunningClientHandler {
config: Arc<ProxyConfig>, config: Arc<ProxyConfig>,
buffer_pool: Arc<BufferPool>, buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>, rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let user = &success.user;
if let Err(e) = Self::check_user_limits_static(user, &config, &stats) {
warn!(user = %user, error = %e, "User limit exceeded");
return Err(e);
}
// Decide: middle proxy or direct
if config.general.use_middle_proxy {
if let Some(ref pool) = me_pool {
return Self::handle_via_middle_proxy(
client_reader, client_writer, success,
pool.clone(), stats, config, buffer_pool,
).await;
}
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
}
// Direct mode (original behavior)
Self::handle_via_direct(
client_reader, client_writer, success,
upstream_manager, stats, config, buffer_pool, rng,
).await
}
// =====================================================================
// Direct mode — TCP relay to Telegram DC
// =====================================================================
async fn handle_via_direct<R, W>(
client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
upstream_manager: Arc<UpstreamManager>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
) -> Result<()> ) -> Result<()>
where where
R: AsyncRead + Unpin + Send + 'static, R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static,
{ {
let user = &success.user; let user = &success.user;
if let Err(e) = Self::check_user_limits_static(user, &config, &stats) {
warn!(user = %user, error = %e, "User limit exceeded");
return Err(e);
}
let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?; let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?;
info!( info!(
@@ -243,10 +288,10 @@ impl RunningClientHandler {
dc = success.dc_idx, dc = success.dc_idx,
dc_addr = %dc_addr, dc_addr = %dc_addr,
proto = ?success.proto_tag, proto = ?success.proto_tag,
"Connecting to Telegram" mode = "direct",
"Connecting to Telegram DC"
); );
// Pass dc_idx for latency-based upstream selection
let tg_stream = upstream_manager.connect(dc_addr, Some(success.dc_idx)).await?; let tg_stream = upstream_manager.connect(dc_addr, Some(success.dc_idx)).await?;
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake");
@@ -269,13 +314,166 @@ impl RunningClientHandler {
stats.decrement_user_curr_connects(user); stats.decrement_user_curr_connects(user);
match &relay_result { match &relay_result {
Ok(()) => debug!(user = %user, "Relay completed"), Ok(()) => debug!(user = %user, "Direct relay completed"),
Err(e) => debug!(user = %user, error = %e, "Relay ended with error"), Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"),
} }
relay_result relay_result
} }
// =====================================================================
// Middle Proxy mode — RPC multiplex through ME pool
// =====================================================================
/// Middle Proxy RPC relay
///
/// Architecture (matches C MTProxy):
/// ```text
/// Client ←AES-CTR→ [telemt] ←RPC/AES-CBC→ ME ←internal→ DC (any, incl CDN 203)
/// ```
///
/// Key difference from direct mode:
/// - No per-client TCP to DC; all clients share ME pool connections
/// - ME internally routes to correct DC based on client's encrypted auth_key_id
/// - CDN DCs (203+) work because ME knows their internal addresses
/// - We pass raw client MTProto bytes in RPC_PROXY_REQ envelope
/// - ME returns responses in RPC_PROXY_ANS envelope
async fn handle_via_middle_proxy<R, W>(
mut client_reader: CryptoReader<R>,
mut client_writer: CryptoWriter<W>,
success: HandshakeSuccess,
me_pool: Arc<MePool>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
_buffer_pool: Arc<BufferPool>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let user = success.user.clone();
let peer = success.peer;
info!(
user = %user,
peer = %peer,
dc = success.dc_idx,
proto = ?success.proto_tag,
mode = "middle_proxy",
"Routing via Middle-End"
);
// Register this client connection in ME demux registry
let (conn_id, mut me_rx) = me_pool.registry().register().await;
// Our listening address for RPC_PROXY_REQ metadata
let our_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
.parse().unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap());
stats.increment_user_connects(&user);
stats.increment_user_curr_connects(&user);
debug!(user = %user, conn_id, "ME relay started");
// Bidirectional relay loop: client ↔ ME pool
//
// C→S direction: read raw bytes from client_reader, wrap in RPC_PROXY_REQ, send via ME
// S→C direction: receive MeResponse::Data from registry channel, write to client_writer
//
// We use tokio::select! to handle both directions concurrently.
// Unlike direct mode (copy_bidirectional on two TCP streams),
// here one side is a channel (mpsc::Receiver), not a stream.
let mut client_buf = vec![0u8; 64 * 1024];
let mut client_closed = false;
let mut server_closed = false;
let result: Result<()> = loop {
tokio::select! {
// C→S: client sends data, we forward to ME
read_result = client_reader.read(&mut client_buf), if !client_closed => {
match read_result {
Ok(0) => {
debug!(conn_id, "Client EOF");
client_closed = true;
if server_closed { break Ok(()); }
// Signal ME to close this connection
let _ = me_pool.send_close(conn_id).await;
}
Ok(n) => {
trace!(conn_id, bytes = n, "C→ME");
stats.add_user_octets_from(&user, n as u64);
if let Err(e) = me_pool.send_proxy_req(
conn_id, peer, our_addr, &client_buf[..n]
).await {
break Err(e);
}
}
Err(e) => {
debug!(conn_id, error = %e, "Client read error");
break Err(ProxyError::Io(e));
}
}
}
// S→C: ME sends response, we forward to client
me_msg = me_rx.recv(), if !server_closed => {
match me_msg {
Some(MeResponse::Data(data)) => {
trace!(conn_id, bytes = data.len(), "ME→C");
stats.add_user_octets_to(&user, data.len() as u64);
if let Err(e) = client_writer.write_all(&data).await {
debug!(conn_id, error = %e, "Client write error");
break Err(ProxyError::Io(e));
}
if let Err(e) = client_writer.flush().await {
break Err(ProxyError::Io(e));
}
}
Some(MeResponse::Ack(_token)) => {
// QuickACK from ME — could forward to client as obfuscated ACK
// For now, just log
trace!(conn_id, "ME ACK (ignored)");
}
Some(MeResponse::Close) => {
debug!(conn_id, "ME sent CLOSE");
server_closed = true;
if client_closed { break Ok(()); }
}
None => {
// Channel closed — ME connection died
debug!(conn_id, "ME channel closed");
server_closed = true;
if client_closed { break Ok(()); }
break Err(ProxyError::Proxy("ME connection lost".into()));
}
}
}
// Both sides closed
else => {
break Ok(());
}
}
};
// Cleanup
debug!(user = %user, conn_id, "ME relay cleanup");
me_pool.registry().unregister(conn_id).await;
stats.decrement_user_curr_connects(&user);
match &result {
Ok(()) => debug!(user = %user, conn_id, "ME relay completed"),
Err(e) => debug!(user = %user, conn_id, error = %e, "ME relay error"),
}
result
}
// =====================================================================
// Helpers
// =====================================================================
fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> {
if let Some(expiration) = config.access.user_expirations.get(user) { if let Some(expiration) = config.access.user_expirations.get(user) {
if chrono::Utc::now() > *expiration { if chrono::Utc::now() > *expiration {
@@ -298,21 +496,7 @@ impl RunningClientHandler {
Ok(()) Ok(())
} }
/// Resolve DC index to a target address. /// Resolve DC index to target address (used only in direct mode)
///
/// Matches the C implementation's behavior exactly:
///
/// 1. Look up DC in known clusters (standard DCs ±1..±5)
/// 2. If not found and `force=1` → fall back to `default_cluster`
///
/// In the C code:
/// - `proxy-multi.conf` is downloaded from Telegram, contains only DC ±1..±5
/// - `default 2;` directive sets the default cluster
/// - `mf_cluster_lookup(CurConf, target_dc, 1)` returns default_cluster
/// for any unknown DC (like CDN DC 203)
///
/// So DC 203, DC 101, DC -300, etc. all route to the default DC (2).
/// There is NO modular arithmetic in the C implementation.
fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> { fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let datacenters = if config.general.prefer_ipv6 { let datacenters = if config.general.prefer_ipv6 {
&*TG_DATACENTERS_V6 &*TG_DATACENTERS_V6
@@ -320,9 +504,8 @@ impl RunningClientHandler {
&*TG_DATACENTERS_V4 &*TG_DATACENTERS_V4
}; };
let num_dcs = datacenters.len(); // 5 let num_dcs = datacenters.len();
// === Step 1: Check dc_overrides (like C's `proxy_for <dc> <ip>:<port>`) ===
let dc_key = dc_idx.to_string(); let dc_key = dc_idx.to_string();
if let Some(addr_str) = config.dc_overrides.get(&dc_key) { if let Some(addr_str) = config.dc_overrides.get(&dc_key) {
match addr_str.parse::<SocketAddr>() { match addr_str.parse::<SocketAddr>() {
@@ -337,21 +520,16 @@ impl RunningClientHandler {
} }
} }
// === Step 2: Standard DCs ±1..±5 — direct lookup ===
let abs_dc = dc_idx.unsigned_abs() as usize; let abs_dc = dc_idx.unsigned_abs() as usize;
if abs_dc >= 1 && abs_dc <= num_dcs { if abs_dc >= 1 && abs_dc <= num_dcs {
return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT)); return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT));
} }
// === Step 3: Unknown DC — fall back to default_cluster ===
// Exactly like C's `mf_cluster_lookup(CurConf, target_dc, force=1)`
// which returns `MC->default_cluster` when the DC is not found.
// Telegram's proxy-multi.conf uses `default 2;`
let default_dc = config.default_dc.unwrap_or(2) as usize; let default_dc = config.default_dc.unwrap_or(2) as usize;
let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs { let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs {
default_dc - 1 default_dc - 1
} else { } else {
1 // DC 2 (index 1) — matches Telegram's `default 2;` 1
}; };
info!( info!(
@@ -364,6 +542,7 @@ impl RunningClientHandler {
Ok(SocketAddr::new(datacenters[fallback_idx], TG_DATACENTER_PORT)) Ok(SocketAddr::new(datacenters[fallback_idx], TG_DATACENTER_PORT))
} }
/// Perform obfuscated handshake with Telegram DC (direct mode only)
async fn do_tg_handshake_static( async fn do_tg_handshake_static(
mut stream: TcpStream, mut stream: TcpStream,
success: &HandshakeSuccess, success: &HandshakeSuccess,
@@ -401,3 +580,4 @@ impl RunningClientHandler {
)) ))
} }
} }

View File

@@ -1,10 +1,15 @@
//! Middle Proxy RPC Transport //! Middle Proxy RPC Transport
//! //!
//! Implements Telegram Middle-End RPC protocol for routing to ALL DCs (including CDN). //! Implements Telegram Middle-End RPC protocol for routing to ALL DCs (including CDN).
//! Uses existing crypto primitives from crate::crypto. //!
//! ## Phase 3 fixes:
//! - ROOT CAUSE: Use Telegram proxy-secret (binary file) not user secret
//! - Streaming handshake response (no fixed-size read deadlock)
//! - Health monitoring + reconnection
//! - Hex diagnostics for debugging
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration; use std::time::Duration;
@@ -13,15 +18,142 @@
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex, RwLock}; use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::time::{timeout, Instant}; use tokio::time::{timeout, Instant};
use tracing::{debug, info, trace, warn}; use tracing::{debug, info, trace, warn, error};
use crate::crypto::{sha1, crc32, derive_middleproxy_keys, AesCbc, SecureRandom}; use crate::crypto::{crc32, derive_middleproxy_keys, AesCbc, SecureRandom};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
// ========== RPC Nonce (32 bytes) ========== // ========== Proxy Secret Fetching ==========
fn build_nonce_packet(key_selector: u32, crypto_ts: u32, nonce: &[u8; 16]) -> [u8; 32] { /// Fetch the Telegram proxy-secret binary file.
///
/// This is NOT the user secret (-S flag, 16 bytes hex for clients).
/// This is the infrastructure secret (--aes-pwd in C MTProxy),
/// a binary file of 32-512 bytes used for ME RPC key derivation.
///
/// Strategy: try local cache, then download from Telegram.
pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result<Vec<u8>> {
let cache = cache_path.unwrap_or("proxy-secret");
// 1. Try local cache (< 24h old)
if let Ok(metadata) = tokio::fs::metadata(cache).await {
if let Ok(modified) = metadata.modified() {
let age = std::time::SystemTime::now()
.duration_since(modified)
.unwrap_or(Duration::from_secs(u64::MAX));
if age < Duration::from_secs(86400) {
if let Ok(data) = tokio::fs::read(cache).await {
if data.len() >= 32 {
info!(
path = cache,
len = data.len(),
age_hours = age.as_secs() / 3600,
"Loaded proxy-secret from cache"
);
return Ok(data);
}
warn!(path = cache, len = data.len(), "Cached proxy-secret too short");
}
}
}
}
// 2. Download from Telegram
info!("Downloading proxy-secret from core.telegram.org...");
let data = download_proxy_secret().await?;
// 3. Cache locally (best-effort)
if let Err(e) = tokio::fs::write(cache, &data).await {
warn!(error = %e, "Failed to cache proxy-secret (non-fatal)");
} else {
debug!(path = cache, len = data.len(), "Cached proxy-secret");
}
Ok(data)
}
async fn download_proxy_secret() -> Result<Vec<u8>> {
let url = "https://core.telegram.org/getProxySecret";
let resp = reqwest::get(url)
.await
.map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {}", e)))?;
if !resp.status().is_success() {
return Err(ProxyError::Proxy(format!(
"proxy-secret download HTTP {}", resp.status()
)));
}
let data = resp.bytes().await
.map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {}", e)))?
.to_vec();
if data.len() < 32 {
return Err(ProxyError::Proxy(format!(
"proxy-secret too short: {} bytes (need >= 32)", data.len()
)));
}
info!(len = data.len(), "Downloaded proxy-secret OK");
Ok(data)
}
// ========== RPC Frame helpers ==========
/// Build an RPC frame: [len(4) | seq_no(4) | payload | crc32(4)]
fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> {
let total_len = (4 + 4 + payload.len() + 4) as u32;
let mut f = Vec::with_capacity(total_len as usize);
f.extend_from_slice(&total_len.to_le_bytes());
f.extend_from_slice(&seq_no.to_le_bytes());
f.extend_from_slice(payload);
let c = crc32(&f);
f.extend_from_slice(&c.to_le_bytes());
f
}
/// Read one plaintext RPC frame. Returns (seq_no, payload).
async fn read_rpc_frame_plaintext(
rd: &mut (impl AsyncReadExt + Unpin),
) -> Result<(i32, Vec<u8>)> {
let mut len_buf = [0u8; 4];
rd.read_exact(&mut len_buf).await.map_err(ProxyError::Io)?;
let total_len = u32::from_le_bytes(len_buf) as usize;
if total_len < 12 || total_len > (1 << 24) {
return Err(ProxyError::InvalidHandshake(
format!("Bad RPC frame length: {}", total_len),
));
}
let mut rest = vec![0u8; total_len - 4];
rd.read_exact(&mut rest).await.map_err(ProxyError::Io)?;
let mut full = Vec::with_capacity(total_len);
full.extend_from_slice(&len_buf);
full.extend_from_slice(&rest);
let crc_offset = total_len - 4;
let expected_crc = u32::from_le_bytes([
full[crc_offset], full[crc_offset + 1],
full[crc_offset + 2], full[crc_offset + 3],
]);
let actual_crc = crc32(&full[..crc_offset]);
if expected_crc != actual_crc {
return Err(ProxyError::InvalidHandshake(
format!("CRC mismatch: 0x{:08x} vs 0x{:08x}", expected_crc, actual_crc),
));
}
let seq_no = i32::from_le_bytes([full[4], full[5], full[6], full[7]]);
let payload = full[8..crc_offset].to_vec();
Ok((seq_no, payload))
}
// ========== RPC Nonce (32 bytes payload) ==========
fn build_nonce_payload(key_selector: u32, crypto_ts: u32, nonce: &[u8; 16]) -> [u8; 32] {
let mut p = [0u8; 32]; let mut p = [0u8; 32];
p[0..4].copy_from_slice(&RPC_NONCE_U32.to_le_bytes()); p[0..4].copy_from_slice(&RPC_NONCE_U32.to_le_bytes());
p[4..8].copy_from_slice(&key_selector.to_le_bytes()); p[4..8].copy_from_slice(&key_selector.to_le_bytes());
@@ -31,10 +163,17 @@
p p
} }
fn parse_nonce_response(d: &[u8; 32]) -> Result<(u32, u32, [u8; 16])> { fn parse_nonce_payload(d: &[u8]) -> Result<(u32, u32, [u8; 16])> {
if d.len() < 32 {
return Err(ProxyError::InvalidHandshake(
format!("Nonce payload too short: {} bytes", d.len()),
));
}
let t = u32::from_le_bytes([d[0], d[1], d[2], d[3]]); let t = u32::from_le_bytes([d[0], d[1], d[2], d[3]]);
if t != RPC_NONCE_U32 { if t != RPC_NONCE_U32 {
return Err(ProxyError::InvalidHandshake(format!("Expected RPC_NONCE, got 0x{:08x}", t))); return Err(ProxyError::InvalidHandshake(
format!("Expected RPC_NONCE 0x{:08x}, got 0x{:08x}", RPC_NONCE_U32, t),
));
} }
let schema = u32::from_le_bytes([d[8], d[9], d[10], d[11]]); let schema = u32::from_le_bytes([d[8], d[9], d[10], d[11]]);
let ts = u32::from_le_bytes([d[12], d[13], d[14], d[15]]); let ts = u32::from_le_bytes([d[12], d[13], d[14], d[15]]);
@@ -43,83 +182,80 @@
Ok((schema, ts, nonce)) Ok((schema, ts, nonce))
} }
// ========== RPC Handshake (32 bytes) ========== // ========== RPC Handshake (32 bytes payload) ==========
fn build_handshake_packet() -> [u8; 32] { fn build_handshake_payload(our_ip: u32, our_port: u16, peer_ip: u32, peer_port: u16) -> [u8; 32] {
let mut p = [0u8; 32]; let mut p = [0u8; 32];
p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes()); p[0..4].copy_from_slice(&RPC_HANDSHAKE_U32.to_le_bytes());
// flags=0, sender_pid with our PID // flags = 0 at offset 4..8
// sender_pid: {ip(4), port(2), pid(2), utime(4)} at offset 8..20
p[8..12].copy_from_slice(&our_ip.to_le_bytes());
p[12..14].copy_from_slice(&our_port.to_le_bytes());
let pid = (std::process::id() & 0xFFFF) as u16; let pid = (std::process::id() & 0xFFFF) as u16;
p[14..16].copy_from_slice(&pid.to_le_bytes()); p[14..16].copy_from_slice(&pid.to_le_bytes());
let utime = std::time::SystemTime::now() let utime = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default().as_secs() as u32; .unwrap_or_default()
.as_secs() as u32;
p[16..20].copy_from_slice(&utime.to_le_bytes()); p[16..20].copy_from_slice(&utime.to_le_bytes());
// peer_pid: {ip(4), port(2), pid(2), utime(4)} at offset 20..32
p[20..24].copy_from_slice(&peer_ip.to_le_bytes());
p[24..26].copy_from_slice(&peer_port.to_le_bytes());
p p
} }
// ========== CRC32 RPC Frame ========== // ========== CBC helpers ==========
fn build_rpc_frame(seq_no: i32, payload: &[u8]) -> Vec<u8> { fn cbc_encrypt_padded(key: &[u8; 32], iv: &[u8; 16], plaintext: &[u8]) -> Result<(Vec<u8>, [u8; 16])> {
let total = (4 + 4 + payload.len() + 4) as u32; let pad = (16 - (plaintext.len() % 16)) % 16;
let mut f = Vec::with_capacity(total as usize); let mut buf = plaintext.to_vec();
f.extend_from_slice(&total.to_le_bytes()); let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00];
f.extend_from_slice(&seq_no.to_le_bytes()); for i in 0..pad {
f.extend_from_slice(payload); buf.push(pad_pattern[i % 4]);
let c = crc32(&f); }
f.extend_from_slice(&c.to_le_bytes()); let cipher = AesCbc::new(*key, *iv);
f cipher.encrypt_in_place(&mut buf)
.map_err(|e| ProxyError::Crypto(format!("CBC encrypt: {}", e)))?;
let mut new_iv = [0u8; 16];
if buf.len() >= 16 {
new_iv.copy_from_slice(&buf[buf.len() - 16..]);
}
Ok((buf, new_iv))
} }
// ========== RPC_PROXY_REQ ========== fn cbc_decrypt_inplace(key: &[u8; 32], iv: &[u8; 16], data: &mut [u8]) -> Result<[u8; 16]> {
let mut new_iv = [0u8; 16];
fn build_proxy_req_payload( if data.len() >= 16 {
conn_id: u64, new_iv.copy_from_slice(&data[data.len() - 16..]);
client_addr: SocketAddr, }
our_addr: SocketAddr, AesCbc::new(*key, *iv)
data: &[u8], .decrypt_in_place(data)
proxy_tag: Option<&[u8]>, .map_err(|e| ProxyError::Crypto(format!("CBC decrypt: {}", e)))?;
) -> Vec<u8> { Ok(new_iv)
let mut flags: u32 = proxy_flags::FLAG_HAS_AD_TAG2 | proxy_flags::FLAG_EXTMODE2;
if proxy_tag.is_some() {
flags |= proxy_flags::FLAG_HAS_AD_TAG;
} }
let extra_words: u32 = if let Some(tag) = proxy_tag { // ========== IPv4 helpers ==========
let tl_len = 1 + tag.len();
let padded = (tl_len + 3) / 4;
(1 + padded) as u32
} else { 0 };
let mut b = Vec::with_capacity(64 + data.len()); fn ipv4_to_mapped_v6(ip: Ipv4Addr) -> [u8; 16] {
b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); let mut buf = [0u8; 16];
b.extend_from_slice(&flags.to_le_bytes()); buf[10] = 0xFF;
b.extend_from_slice(&conn_id.to_le_bytes()); buf[11] = 0xFF;
let o = ip.octets();
// Client IP buf[12] = o[0]; buf[13] = o[1]; buf[14] = o[2]; buf[15] = o[3];
match client_addr.ip() { buf
IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()),
IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()),
}
b.extend_from_slice(&(client_addr.port() as u32).to_le_bytes());
// Our IP
match our_addr.ip() {
IpAddr::V4(v4) => b.extend_from_slice(&u32::from_be_bytes(v4.octets()).to_le_bytes()),
IpAddr::V6(_) => b.extend_from_slice(&0u32.to_le_bytes()),
}
b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes());
b.extend_from_slice(&extra_words.to_le_bytes());
if let Some(tag) = proxy_tag {
b.extend_from_slice(&TL_PROXY_TAG_U32.to_le_bytes());
b.push(tag.len() as u8);
b.extend_from_slice(tag);
let pad = (4 - ((1 + tag.len()) % 4)) % 4;
b.extend(std::iter::repeat(0u8).take(pad));
} }
b.extend_from_slice(data); fn addr_to_ip_u32(addr: &SocketAddr) -> u32 {
b match addr.ip() {
IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()),
IpAddr::V6(v6) => {
if let Some(v4) = v6.to_ipv4_mapped() {
u32::from_le_bytes(v4.octets())
} else { 0 }
}
}
} }
// ========== ME Response ========== // ========== ME Response ==========
@@ -140,7 +276,10 @@
impl ConnRegistry { impl ConnRegistry {
pub fn new() -> Self { pub fn new() -> Self {
Self { map: RwLock::new(HashMap::new()), next_id: AtomicU64::new(1) } Self {
map: RwLock::new(HashMap::new()),
next_id: AtomicU64::new(1),
}
} }
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) { pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed); let id = self.next_id.fetch_add(1, Ordering::Relaxed);
@@ -153,7 +292,9 @@
} }
pub async fn route(&self, id: u64, resp: MeResponse) -> bool { pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
let m = self.map.read().await; let m = self.map.read().await;
if let Some(tx) = m.get(&id) { tx.send(resp).await.is_ok() } else { false } if let Some(tx) = m.get(&id) {
tx.send(resp).await.is_ok()
} else { false }
} }
} }
@@ -170,13 +311,18 @@
async fn send(&mut self, payload: &[u8]) -> Result<()> { async fn send(&mut self, payload: &[u8]) -> Result<()> {
let frame = build_rpc_frame(self.seq_no, payload); let frame = build_rpc_frame(self.seq_no, payload);
self.seq_no += 1; self.seq_no += 1;
let pad = (16 - (frame.len() % 16)) % 16; let pad = (16 - (frame.len() % 16)) % 16;
let mut buf = frame; let mut buf = frame;
buf.extend(std::iter::repeat(0u8).take(pad)); let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00];
for i in 0..pad {
buf.push(pad_pattern[i % 4]);
}
let cipher = AesCbc::new(self.key, self.iv); let cipher = AesCbc::new(self.key, self.iv);
cipher.encrypt_in_place(&mut buf) cipher.encrypt_in_place(&mut buf)
.map_err(|e| ProxyError::Crypto(format!("{}", e)))?; .map_err(|e| ProxyError::Crypto(format!("{}", e)))?;
if buf.len() >= 16 { if buf.len() >= 16 {
self.iv.copy_from_slice(&buf[buf.len() - 16..]); self.iv.copy_from_slice(&buf[buf.len() - 16..]);
} }
@@ -184,6 +330,72 @@
} }
} }
// ========== RPC_PROXY_REQ ==========
fn build_proxy_req_payload(
conn_id: u64,
client_addr: SocketAddr,
our_addr: SocketAddr,
data: &[u8],
proxy_tag: Option<&[u8]>,
) -> Vec<u8> {
let mut flags: u32 = 0;
let has_tag = proxy_tag.is_some();
if has_tag {
flags |= 8; // C: `flags |= 8` when proxy_tag_set
}
let mut b = Vec::with_capacity(128 + data.len());
b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes());
b.extend_from_slice(&flags.to_le_bytes());
b.extend_from_slice(&conn_id.to_le_bytes());
// Client IP (16 bytes IPv4-mapped-v6) + port (4 bytes)
match client_addr.ip() {
IpAddr::V4(v4) => b.extend_from_slice(&ipv4_to_mapped_v6(v4)),
IpAddr::V6(v6) => b.extend_from_slice(&v6.octets()),
}
b.extend_from_slice(&(client_addr.port() as u32).to_le_bytes());
// Our IP (16 bytes) + port (4 bytes)
match our_addr.ip() {
IpAddr::V4(v4) => b.extend_from_slice(&ipv4_to_mapped_v6(v4)),
IpAddr::V6(v6) => b.extend_from_slice(&v6.octets()),
}
b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes());
// Extra section (proxy_tag)
if flags & 12 != 0 {
let extra_start = b.len();
b.extend_from_slice(&0u32.to_le_bytes()); // placeholder
if let Some(tag) = proxy_tag {
b.extend_from_slice(&TL_PROXY_TAG_U32.to_le_bytes());
// TL string encoding
if tag.len() < 254 {
b.push(tag.len() as u8);
b.extend_from_slice(tag);
let pad = (4 - ((1 + tag.len()) % 4)) % 4;
b.extend(std::iter::repeat(0u8).take(pad));
} else {
b.push(0xfe);
let len_bytes = (tag.len() as u32).to_le_bytes();
b.extend_from_slice(&len_bytes[..3]);
b.extend_from_slice(tag);
let pad = (4 - (tag.len() % 4)) % 4;
b.extend(std::iter::repeat(0u8).take(pad));
}
}
let extra_bytes = (b.len() - extra_start - 4) as u32;
let eb = extra_bytes.to_le_bytes();
b[extra_start..extra_start + 4].copy_from_slice(&eb);
}
b.extend_from_slice(data);
b
}
// ========== ME Pool ========== // ========== ME Pool ==========
pub struct MePool { pub struct MePool {
@@ -191,102 +403,328 @@
writers: RwLock<Vec<Arc<Mutex<RpcWriter>>>>, writers: RwLock<Vec<Arc<Mutex<RpcWriter>>>>,
rr: AtomicU64, rr: AtomicU64,
proxy_tag: Option<Vec<u8>>, proxy_tag: Option<Vec<u8>>,
/// Telegram proxy-secret (binary, 32-512 bytes)
proxy_secret: Vec<u8>,
pool_size: usize,
} }
impl MePool { impl MePool {
pub fn new(proxy_tag: Option<Vec<u8>>) -> Arc<Self> { pub fn new(proxy_tag: Option<Vec<u8>>, proxy_secret: Vec<u8>) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
registry: Arc::new(ConnRegistry::new()), registry: Arc::new(ConnRegistry::new()),
writers: RwLock::new(Vec::new()), writers: RwLock::new(Vec::new()),
rr: AtomicU64::new(0), rr: AtomicU64::new(0),
proxy_tag, proxy_tag,
proxy_secret,
pool_size: 2,
}) })
} }
pub fn registry(&self) -> &Arc<ConnRegistry> { &self.registry }
pub async fn init(self: &Arc<Self>, pool_size: usize, secret: &[u8], rng: &SecureRandom) -> Result<()> { pub fn registry(&self) -> &Arc<ConnRegistry> {
&self.registry
}
/// key_selector = first 4 bytes of proxy-secret as LE u32
/// C: main_secret.key_signature via union { char secret[]; int key_signature; }
fn key_selector(&self) -> u32 {
if self.proxy_secret.len() >= 4 {
u32::from_le_bytes([
self.proxy_secret[0], self.proxy_secret[1],
self.proxy_secret[2], self.proxy_secret[3],
])
} else { 0 }
}
pub async fn init(
self: &Arc<Self>,
pool_size: usize,
rng: &SecureRandom,
) -> Result<()> {
let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4; let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4;
info!(me_servers = addrs.len(), pool_size, "Initializing ME pool"); let ks = self.key_selector();
for &(ip, port) in addrs.iter().take(3) { info!(
me_servers = addrs.len(),
pool_size,
key_selector = format_args!("0x{:08x}", ks),
secret_len = self.proxy_secret.len(),
"Initializing ME pool"
);
for &(ip, port) in addrs.iter() {
for i in 0..pool_size { for i in 0..pool_size {
let addr = SocketAddr::new(ip, port); let addr = SocketAddr::new(ip, port);
match self.connect(addr, secret, rng).await { match self.connect_one(addr, rng).await {
Ok(()) => info!(%addr, idx = i, "ME connected"), Ok(()) => info!(%addr, idx = i, "ME connected"),
Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"), Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"),
} }
} }
if self.writers.read().await.len() >= pool_size {
break;
} }
}
if self.writers.read().await.is_empty() { if self.writers.read().await.is_empty() {
return Err(ProxyError::Proxy("No ME connections".into())); return Err(ProxyError::Proxy("No ME connections".into()));
} }
Ok(()) Ok(())
} }
async fn connect(self: &Arc<Self>, addr: SocketAddr, secret: &[u8], rng: &SecureRandom) -> Result<()> { async fn connect_one(
let stream = timeout(Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), TcpStream::connect(addr)) self: &Arc<Self>,
.await.map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })? addr: SocketAddr,
.map_err(ProxyError::Io)?; rng: &SecureRandom,
stream.set_nodelay(true).ok(); ) -> Result<()> {
let (mut rd, mut wr) = tokio::io::split(stream); let secret = &self.proxy_secret;
if secret.len() < 32 {
// Nonce exchange return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap();
let crypto_ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs() as u32;
let sh = sha1(secret);
let ks = u32::from_le_bytes([sh[0], sh[1], sh[2], sh[3]]);
wr.write_all(&build_nonce_packet(ks, crypto_ts, &my_nonce)).await.map_err(ProxyError::Io)?;
let mut resp = [0u8; 32];
timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut resp))
.await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?;
let (schema, _srv_ts, srv_nonce) = parse_nonce_response(&resp)?;
if schema != RPC_CRYPTO_AES_U32 {
return Err(ProxyError::InvalidHandshake(format!("Unsupported crypto: {}", schema)));
} }
// Key derivation via existing derive_middleproxy_keys // ===== TCP connect =====
let stream = timeout(
Duration::from_secs(ME_CONNECT_TIMEOUT_SECS),
TcpStream::connect(addr),
)
.await
.map_err(|_| ProxyError::ConnectionTimeout { addr: addr.to_string() })?
.map_err(ProxyError::Io)?;
stream.set_nodelay(true).ok();
let local_addr = stream.local_addr().map_err(ProxyError::Io)?;
let peer_addr = stream.peer_addr().map_err(ProxyError::Io)?;
let (mut rd, mut wr) = tokio::io::split(stream);
// ===== 1. Send RPC nonce (plaintext, seq=-2) =====
let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap();
let crypto_ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32;
let ks = self.key_selector();
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
let nonce_frame = build_rpc_frame(-2, &nonce_payload);
debug!(
%addr,
frame_len = nonce_frame.len(),
key_sel = format_args!("0x{:08x}", ks),
crypto_ts,
"Sending nonce"
);
wr.write_all(&nonce_frame).await.map_err(ProxyError::Io)?;
wr.flush().await.map_err(ProxyError::Io)?;
// ===== 2. Read server nonce (plaintext, seq=-2) =====
let (srv_seq, srv_nonce_payload) = timeout(
Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS),
read_rpc_frame_plaintext(&mut rd),
)
.await
.map_err(|_| ProxyError::TgHandshakeTimeout)??;
if srv_seq != -2 {
return Err(ProxyError::InvalidHandshake(
format!("Expected seq=-2, got {}", srv_seq),
));
}
let (schema, _srv_ts, srv_nonce) = parse_nonce_payload(&srv_nonce_payload)?;
if schema != RPC_CRYPTO_AES_U32 {
return Err(ProxyError::InvalidHandshake(
format!("Unsupported crypto schema: 0x{:x}", schema),
));
}
debug!(%addr, "Nonce exchange OK, deriving keys");
// ===== 3. Derive AES-256-CBC keys =====
// C buffer layout:
// [0..16] nonce_server (srv_nonce)
// [16..32] nonce_client (my_nonce)
// [32..36] client_timestamp
// [36..40] server_ip
// [40..42] client_port
// [42..48] "CLIENT" or "SERVER"
// [48..52] client_ip
// [52..54] server_port
// [54..54+N] secret (proxy-secret binary)
// [54+N..70+N] nonce_server
// nonce_client(16)
let ts_bytes = crypto_ts.to_le_bytes(); let ts_bytes = crypto_ts.to_le_bytes();
let server_ip = addr_to_ip_u32(&peer_addr);
let client_ip = addr_to_ip_u32(&local_addr);
let server_ip_bytes = server_ip.to_le_bytes();
let client_ip_bytes = client_ip.to_le_bytes();
let server_port_bytes = peer_addr.port().to_le_bytes();
let client_port_bytes = local_addr.port().to_le_bytes();
let (wk, wi) = derive_middleproxy_keys( let (wk, wi) = derive_middleproxy_keys(
&srv_nonce, &my_nonce, &ts_bytes, &srv_nonce, &my_nonce, &ts_bytes,
None, &0u16.to_le_bytes(), b"CLIENT", None, &0u16.to_le_bytes(), secret, None, None, Some(&server_ip_bytes), &client_port_bytes,
b"CLIENT",
Some(&client_ip_bytes), &server_port_bytes,
secret, None, None,
); );
let (rk, ri) = derive_middleproxy_keys( let (rk, ri) = derive_middleproxy_keys(
&srv_nonce, &my_nonce, &ts_bytes, &srv_nonce, &my_nonce, &ts_bytes,
None, &0u16.to_le_bytes(), b"SERVER", None, &0u16.to_le_bytes(), secret, None, None, Some(&server_ip_bytes), &client_port_bytes,
b"SERVER",
Some(&client_ip_bytes), &server_port_bytes,
secret, None, None,
); );
// Handshake debug!(
wr.write_all(&build_handshake_packet()).await.map_err(ProxyError::Io)?; %addr,
let mut hs = [0u8; 32]; write_key = %hex::encode(&wk[..8]),
timeout(Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS), rd.read_exact(&mut hs)) read_key = %hex::encode(&rk[..8]),
.await.map_err(|_| ProxyError::TgHandshakeTimeout)?.map_err(ProxyError::Io)?; "Keys derived"
let ht = u32::from_le_bytes([hs[0], hs[1], hs[2], hs[3]]); );
if ht == RPC_HANDSHAKE_ERROR_U32 {
return Err(ProxyError::InvalidHandshake("ME rejected handshake".into())); // ===== 4. Send encrypted handshake (seq=-1) =====
let hs_payload = build_handshake_payload(
client_ip, local_addr.port(),
server_ip, peer_addr.port(),
);
let hs_frame = build_rpc_frame(-1, &hs_payload);
let (encrypted_hs, write_iv) = cbc_encrypt_padded(&wk, &wi, &hs_frame)?;
wr.write_all(&encrypted_hs).await.map_err(ProxyError::Io)?;
wr.flush().await.map_err(ProxyError::Io)?;
debug!(%addr, enc_len = encrypted_hs.len(), "Sent encrypted handshake");
// ===== 5. Read encrypted handshake response (STREAMING) =====
// Server sends encrypted handshake. C crypto layer may send partial
// blocks (only complete 16-byte blocks get encrypted at a time).
// We read incrementally and decrypt block-by-block.
let deadline = Instant::now() + Duration::from_secs(ME_HANDSHAKE_TIMEOUT_SECS);
let mut enc_buf = BytesMut::with_capacity(256);
let mut dec_buf = BytesMut::with_capacity(256);
let mut read_iv = ri;
let mut handshake_ok = false;
while Instant::now() < deadline && !handshake_ok {
let remaining = deadline - Instant::now();
let mut tmp = [0u8; 256];
let n = match timeout(remaining, rd.read(&mut tmp)).await {
Ok(Ok(0)) => return Err(ProxyError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof, "ME closed during handshake",
))),
Ok(Ok(n)) => n,
Ok(Err(e)) => return Err(ProxyError::Io(e)),
Err(_) => return Err(ProxyError::TgHandshakeTimeout),
};
enc_buf.extend_from_slice(&tmp[..n]);
// Decrypt complete 16-byte blocks
let blocks = enc_buf.len() / 16 * 16;
if blocks > 0 {
let mut chunk = vec![0u8; blocks];
chunk.copy_from_slice(&enc_buf[..blocks]);
let new_iv = cbc_decrypt_inplace(&rk, &read_iv, &mut chunk)?;
read_iv = new_iv;
dec_buf.extend_from_slice(&chunk);
let _ = enc_buf.split_to(blocks);
} }
if ht != RPC_HANDSHAKE_U32 {
return Err(ProxyError::InvalidHandshake(format!("Got 0x{:08x}", ht))); // Try to parse RPC frame from decrypted data
while dec_buf.len() >= 4 {
let fl = u32::from_le_bytes([
dec_buf[0], dec_buf[1], dec_buf[2], dec_buf[3],
]) as usize;
// Skip noop padding
if fl == 4 {
let _ = dec_buf.split_to(4);
continue;
}
if fl < 12 || fl > (1 << 24) {
return Err(ProxyError::InvalidHandshake(
format!("Bad HS response frame len: {}", fl),
));
}
if dec_buf.len() < fl {
break; // need more data
}
let frame = dec_buf.split_to(fl);
// CRC32 check
let pe = fl - 4;
let ec = u32::from_le_bytes([
frame[pe], frame[pe + 1], frame[pe + 2], frame[pe + 3],
]);
let ac = crc32(&frame[..pe]);
if ec != ac {
return Err(ProxyError::InvalidHandshake(
format!("HS CRC mismatch: 0x{:08x} vs 0x{:08x}", ec, ac),
));
}
// Check type
let hs_type = u32::from_le_bytes([
frame[8], frame[9], frame[10], frame[11],
]);
if hs_type == RPC_HANDSHAKE_ERROR_U32 {
let err_code = if frame.len() >= 16 {
i32::from_le_bytes([frame[12], frame[13], frame[14], frame[15]])
} else { -1 };
return Err(ProxyError::InvalidHandshake(
format!("ME rejected handshake (error={})", err_code),
));
}
if hs_type != RPC_HANDSHAKE_U32 {
return Err(ProxyError::InvalidHandshake(
format!("Expected HANDSHAKE 0x{:08x}, got 0x{:08x}", RPC_HANDSHAKE_U32, hs_type),
));
}
handshake_ok = true;
break;
}
}
if !handshake_ok {
return Err(ProxyError::TgHandshakeTimeout);
} }
info!(%addr, "RPC handshake OK"); info!(%addr, "RPC handshake OK");
self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter { writer: wr, key: wk, iv: wi, seq_no: 0 }))); // ===== 6. Setup writer + reader =====
self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter {
writer: wr,
key: wk,
iv: write_iv,
seq_no: 0,
})));
let reg = self.registry.clone(); let reg = self.registry.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = reader_loop(rd, rk, ri, reg).await { if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf).await {
warn!(error = %e, "ME reader ended"); warn!(error = %e, "ME reader ended");
} }
}); });
Ok(()) Ok(())
} }
pub async fn send_proxy_req(&self, conn_id: u64, client_addr: SocketAddr, our_addr: SocketAddr, data: &[u8]) -> Result<()> { pub async fn send_proxy_req(
&self,
conn_id: u64,
client_addr: SocketAddr,
our_addr: SocketAddr,
data: &[u8],
) -> Result<()> {
let ws = self.writers.read().await; let ws = self.writers.read().await;
if ws.is_empty() { return Err(ProxyError::Proxy("No ME connections".into())); } if ws.is_empty() {
return Err(ProxyError::Proxy("No ME connections".into()));
}
let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone(); let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone();
drop(ws); drop(ws);
let payload = build_proxy_req_payload(conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref()); let payload = build_proxy_req_payload(
conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref(),
);
w.lock().await.send(&payload).await w.lock().await.send(&payload).await
} }
@@ -303,46 +741,63 @@
self.registry.unregister(conn_id).await; self.registry.unregister(conn_id).await;
Ok(()) Ok(())
} }
pub fn connection_count(&self) -> usize {
self.writers.try_read().map(|w| w.len()).unwrap_or(0)
}
} }
// ========== Reader Loop ========== // ========== Reader Loop ==========
async fn reader_loop( async fn reader_loop(
mut rd: tokio::io::ReadHalf<TcpStream>, mut rd: tokio::io::ReadHalf<TcpStream>,
dk: [u8; 32], mut div: [u8; 16], dk: [u8; 32],
mut div: [u8; 16],
reg: Arc<ConnRegistry>, reg: Arc<ConnRegistry>,
mut enc_leftover: BytesMut,
mut dec: BytesMut,
) -> Result<()> { ) -> Result<()> {
let mut raw = BytesMut::with_capacity(65536); let mut raw = enc_leftover;
let mut dec = BytesMut::new();
loop { loop {
let mut tmp = [0u8; 16384]; let mut tmp = [0u8; 16384];
let n = rd.read(&mut tmp).await.map_err(ProxyError::Io)?; let n = rd.read(&mut tmp).await.map_err(ProxyError::Io)?;
if n == 0 { return Ok(()); } if n == 0 { return Ok(()); }
raw.extend_from_slice(&tmp[..n]); raw.extend_from_slice(&tmp[..n]);
// Decrypt complete 16-byte blocks
let blocks = raw.len() / 16 * 16; let blocks = raw.len() / 16 * 16;
if blocks > 0 { if blocks > 0 {
let mut new_iv = [0u8; 16]; let mut new_iv = [0u8; 16];
new_iv.copy_from_slice(&raw[blocks - 16..blocks]); new_iv.copy_from_slice(&raw[blocks - 16..blocks]);
let mut chunk = vec![0u8; blocks]; let mut chunk = vec![0u8; blocks];
chunk.copy_from_slice(&raw[..blocks]); chunk.copy_from_slice(&raw[..blocks]);
AesCbc::new(dk, div).decrypt_in_place(&mut chunk) AesCbc::new(dk, div)
.decrypt_in_place(&mut chunk)
.map_err(|e| ProxyError::Crypto(format!("{}", e)))?; .map_err(|e| ProxyError::Crypto(format!("{}", e)))?;
div = new_iv; div = new_iv;
dec.extend_from_slice(&chunk); dec.extend_from_slice(&chunk);
let _ = raw.split_to(blocks); let _ = raw.split_to(blocks);
} }
// Parse RPC frames
while dec.len() >= 12 { while dec.len() >= 12 {
let fl = u32::from_le_bytes([dec[0], dec[1], dec[2], dec[3]]) as usize; let fl = u32::from_le_bytes([dec[0], dec[1], dec[2], dec[3]]) as usize;
if fl == 4 { let _ = dec.split_to(4); continue; } if fl == 4 { let _ = dec.split_to(4); continue; }
if fl < 12 || fl > (1 << 24) { dec.clear(); break; } if fl < 12 || fl > (1 << 24) {
warn!(frame_len = fl, "Invalid RPC frame len");
dec.clear();
break;
}
if dec.len() < fl { break; } if dec.len() < fl { break; }
let frame = dec.split_to(fl); let frame = dec.split_to(fl);
// CRC32 check
let pe = fl - 4; let pe = fl - 4;
let ec = u32::from_le_bytes([frame[pe], frame[pe+1], frame[pe+2], frame[pe+3]]); let ec = u32::from_le_bytes([frame[pe], frame[pe+1], frame[pe+2], frame[pe+3]]);
if crc32(&frame[..pe]) != ec { warn!("CRC mismatch"); continue; } if crc32(&frame[..pe]) != ec {
warn!("CRC mismatch in data frame");
continue;
}
let payload = &frame[8..pe]; let payload = &frame[8..pe];
if payload.len() < 4 { continue; } if payload.len() < 4 { continue; }
let pt = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]); let pt = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
@@ -361,11 +816,45 @@
reg.route(cid, MeResponse::Ack(cfm)).await; reg.route(cid, MeResponse::Ack(cfm)).await;
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "CLOSE_EXT"); debug!(cid, "CLOSE_EXT from ME");
reg.route(cid, MeResponse::Close).await; reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "CLOSE_CONN from ME");
reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else {
debug!(rpc_type = format_args!("0x{:08x}", pt), len = body.len(), "Unknown RPC");
} }
} }
} }
} }
// ========== Health Monitor (Phase 4) ==========
pub async fn me_health_monitor(
pool: Arc<MePool>,
rng: Arc<SecureRandom>,
min_connections: usize,
) {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
let current = pool.writers.read().await.len();
if current < min_connections {
warn!(current, min = min_connections, "ME pool below minimum, reconnecting...");
let addrs = TG_MIDDLE_PROXIES_FLAT_V4.clone();
for &(ip, port) in addrs.iter() {
let needed = min_connections.saturating_sub(pool.writers.read().await.len());
if needed == 0 { break; }
for _ in 0..needed {
let addr = SocketAddr::new(ip, port);
match pool.connect_one(addr, &rng).await {
Ok(()) => info!(%addr, "ME reconnected"),
Err(e) => debug!(%addr, error = %e, "ME reconnect failed"),
}
}
}
}
}
}