use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tracing::{debug, info, warn}; use crate::config::ProxyConfig; use crate::crypto::SecureRandom; use crate::error::Result; use crate::protocol::constants::*; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; use crate::proxy::relay::relay_bidirectional; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::UpstreamManager; pub(crate) async fn handle_via_direct( client_reader: CryptoReader, client_writer: CryptoWriter, success: HandshakeSuccess, upstream_manager: Arc, stats: Arc, config: Arc, buffer_pool: Arc, rng: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static, { let user = &success.user; let dc_addr = get_dc_addr_static(success.dc_idx, &config)?; info!( user = %user, peer = %success.peer, dc = success.dc_idx, dc_addr = %dc_addr, proto = ?success.proto_tag, mode = "direct", "Connecting to Telegram DC" ); let tg_stream = upstream_manager .connect(dc_addr, Some(success.dc_idx)) .await?; debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); let (tg_reader, tg_writer) = do_tg_handshake_static(tg_stream, &success, &config, rng.as_ref()).await?; debug!(peer = %success.peer, "TG handshake complete, starting relay"); stats.increment_user_connects(user); stats.increment_user_curr_connects(user); let relay_result = relay_bidirectional( client_reader, client_writer, tg_reader, tg_writer, user, Arc::clone(&stats), buffer_pool, ) .await; stats.decrement_user_curr_connects(user); match &relay_result { Ok(()) => debug!(user = %user, "Direct relay completed"), Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"), } relay_result } fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { let datacenters = if config.general.prefer_ipv6 { &*TG_DATACENTERS_V6 } else { &*TG_DATACENTERS_V4 }; let num_dcs = datacenters.len(); let dc_key = dc_idx.to_string(); if let Some(addr_str) = config.dc_overrides.get(&dc_key) { match addr_str.parse::() { Ok(addr) => { debug!(dc_idx = dc_idx, addr = %addr, "Using DC override from config"); return Ok(addr); } Err(_) => { warn!(dc_idx = dc_idx, addr_str = %addr_str, "Invalid DC override address in config, ignoring"); } } } let abs_dc = dc_idx.unsigned_abs() as usize; if abs_dc >= 1 && abs_dc <= num_dcs { return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT)); } let default_dc = config.default_dc.unwrap_or(2) as usize; let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs { default_dc - 1 } else { 1 }; info!( original_dc = dc_idx, fallback_dc = (fallback_idx + 1) as u16, fallback_addr = %datacenters[fallback_idx], "Special DC ---> default_cluster" ); Ok(SocketAddr::new( datacenters[fallback_idx], TG_DATACENTER_PORT, )) } async fn do_tg_handshake_static( mut stream: TcpStream, success: &HandshakeSuccess, config: &ProxyConfig, rng: &SecureRandom, ) -> Result<( CryptoReader, CryptoWriter, )> { let (nonce, _tg_enc_key, _tg_enc_iv, _tg_dec_key, _tg_dec_iv) = generate_tg_nonce( success.proto_tag, success.dc_idx, &success.dec_key, success.dec_iv, rng, config.general.fast_mode, ); let (encrypted_nonce, tg_encryptor, tg_decryptor) = encrypt_tg_nonce_with_ciphers(&nonce); debug!( peer = %success.peer, nonce_head = %hex::encode(&nonce[..16]), "Sending nonce to Telegram" ); stream.write_all(&encrypted_nonce).await?; stream.flush().await?; let (read_half, write_half) = stream.into_split(); Ok(( CryptoReader::new(read_half, tg_decryptor), CryptoWriter::new(write_half, tg_encryptor), )) }