diff --git a/src/main.rs b/src/main.rs index d572788..5ae4f66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,16 +29,184 @@ use crate::proxy::{ClientHandler, handle_client_stream}; use crate::transport::{create_unix_listener, cleanup_unix_socket}; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; +use crate::proxy::ClientHandler; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::MePool; use crate::transport::{ListenOptions, UpstreamManager, create_listener}; use crate::util::ip::detect_ip; +fn parse_cli() -> (String, bool, Option) { + let mut config_path = "config.toml".to_string(); + let mut silent = false; + let mut log_level: Option = None; + + let args: Vec = std::env::args().skip(1).collect(); + + // Check for --init first (handled before tokio) + if let Some(init_opts) = cli::parse_init_args(&args) { + if let Err(e) = cli::run_init(init_opts) { + eprintln!("[telemt] Init failed: {}", e); + std::process::exit(1); + } + std::process::exit(0); + } + + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--silent" | "-s" => { + silent = true; + } + "--log-level" => { + i += 1; + if i < args.len() { + log_level = Some(args[i].clone()); + } + } + s if s.starts_with("--log-level=") => { + log_level = Some(s.trim_start_matches("--log-level=").to_string()); + } + "--help" | "-h" => { + eprintln!("Usage: telemt [config.toml] [OPTIONS]"); + eprintln!(); + eprintln!("Options:"); + eprintln!(" --silent, -s Suppress info logs"); + eprintln!(" --log-level debug|verbose|normal|silent"); + eprintln!(" --help, -h Show this help"); + eprintln!(); + eprintln!("Setup (fire-and-forget):"); + eprintln!( + " --init Generate config, install systemd service, start" + ); + eprintln!(" --port Listen port (default: 443)"); + eprintln!( + " --domain TLS domain for masking (default: www.google.com)" + ); + eprintln!( + " --secret 32-char hex secret (auto-generated if omitted)" + ); + eprintln!(" --user Username (default: user)"); + eprintln!(" --config-dir Config directory (default: /etc/telemt)"); + eprintln!(" --no-start Don't start the service after install"); + std::process::exit(0); + } + s if !s.starts_with('-') => { + config_path = s.to_string(); + } + other => { + eprintln!("Unknown option: {}", other); + } + } + i += 1; + } + + (config_path, silent, log_level) +} + +fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { + info!("--- Proxy Links ({}) ---", host); + for user_name in &config.general.links.show { + if let Some(secret) = config.access.users.get(user_name) { + info!("User: {}", user_name); + if config.general.modes.classic { + info!(" Classic: tg://proxy?server={}&port={}&secret={}", + host, port, secret); + } + if config.general.modes.secure { + info!(" DD: tg://proxy?server={}&port={}&secret=dd{}", + host, port, secret); + } + if config.general.modes.tls { + let domain_hex = hex::encode(&config.censorship.tls_domain); + info!(" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}", + host, port, secret, domain_hex); + } + } else { + warn!("User '{}' listed in [general.links] show not found in [access.users]", user_name); + } + } + info!("------------------------"); +} + #[tokio::main] async fn main() -> std::result::Result<(), Box> { + let (config_path, cli_silent, cli_log_level) = parse_cli(); - let config = Arc::new(ProxyConfig::load("config.toml")?); + let config = match ProxyConfig::load(&config_path) { + Ok(c) => c, + Err(e) => { + if std::path::Path::new(&config_path).exists() { + eprintln!("[telemt] Error: {}", e); + std::process::exit(1); + } else { + let default = ProxyConfig::default(); + std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap(); + eprintln!("[telemt] Created default config at {}", config_path); + default + } + } + }; + + if let Err(e) = config.validate() { + eprintln!("[telemt] Invalid config: {}", e); + std::process::exit(1); + } + + let has_rust_log = std::env::var("RUST_LOG").is_ok(); + let effective_log_level = if cli_silent { + LogLevel::Silent + } else if let Some(ref s) = cli_log_level { + LogLevel::from_str_loose(s) + } else { + config.general.log_level.clone() + }; + + let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt::Layer::default()) + .init(); + + info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")); + info!("Log level: {}", effective_log_level); + info!( + "Modes: classic={} secure={} tls={}", + config.general.modes.classic, config.general.modes.secure, config.general.modes.tls + ); + info!("TLS domain: {}", config.censorship.tls_domain); + if let Some(ref sock) = config.censorship.mask_unix_sock { + info!("Mask: {} -> unix:{}", config.censorship.mask, sock); + if !std::path::Path::new(sock).exists() { + warn!( + "Unix socket '{}' does not exist yet. Masking will fail until it appears.", + sock + ); + } + } else { + info!( + "Mask: {} -> {}:{}", + config.censorship.mask, + config + .censorship + .mask_host + .as_deref() + .unwrap_or(&config.censorship.tls_domain), + config.censorship.mask_port + ); + } + + if config.censorship.tls_domain == "www.google.com" { + warn!("Using default tls_domain. Consider setting a custom domain."); + } + + for w in &config.warnings { + warn!("{}", w); + } + + let prefer_ipv6 = config.general.prefer_ipv6; + let use_middle_proxy = config.general.use_middle_proxy; + let config = Arc::new(config); let stats = Arc::new(Stats::new()); let rng = Arc::new(SecureRandom::new()); @@ -53,11 +221,198 @@ async fn main() -> std::result::Result<(), Box> { // IP Tracker initialization let ip_tracker = Arc::new(UserIpTracker::new()); ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + + if !config.access.user_max_unique_ips.is_empty() { + info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); + } - // Middle proxy pool (optional) - let me_pool: Option> = None; + // Connection concurrency limit + let _max_connections = Arc::new(Semaphore::new(10_000)); + + // ===================================================================== + // Middle Proxy initialization (if enabled) + // ===================================================================== + let me_pool: Option> = if use_middle_proxy { + info!("=== Middle Proxy Mode ==="); + + // ad_tag (proxy_tag) for advertising + let proxy_tag = config.general.ad_tag.as_ref().map(|tag| { + hex::decode(tag).unwrap_or_else(|_| { + warn!("Invalid ad_tag hex, middle proxy ad_tag will be empty"); + Vec::new() + }) + }); + + // ============================================================= + // CRITICAL: Download Telegram proxy-secret (NOT user secret!) + // + // C MTProxy uses TWO separate secrets: + // -S flag = 16-byte user secret for client obfuscation + // --aes-pwd = 32-512 byte binary file for ME RPC auth + // + // proxy-secret is from: https://core.telegram.org/getProxySecret + // ============================================================= + let proxy_secret_path = config.general.proxy_secret_path.as_deref(); + match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { + Ok(proxy_secret) => { + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); + + let pool = MePool::new( + proxy_tag, + proxy_secret, + config.general.middle_proxy_nat_ip, + config.general.middle_proxy_nat_probe, + config.general.middle_proxy_nat_stun.clone(), + ); + + match pool.init(2, &rng).await { + Ok(()) => { + info!("Middle-End pool initialized successfully"); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, 2, + ) + .await; + }); + + Some(pool) + } + Err(e) => { + error!(error = %e, "Failed to initialize ME pool. Falling back to direct mode."); + None + } + } + } + Err(e) => { + error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode."); + None + } + } + } else { + None + }; + + if me_pool.is_some() { + info!("Transport: Middle Proxy (supports all DCs including CDN)"); + } else { + info!("Transport: Direct TCP (standard DCs only)"); + } + + // Startup DC ping (only meaningful in direct mode) + if me_pool.is_none() { + info!("================= Telegram DC Connectivity ================="); + + let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await; + + for upstream_result in &ping_results { + // Show which IP version is in use and which is fallback + if upstream_result.both_available { + if prefer_ipv6 { + info!(" IPv6 in use and IPv4 is fallback"); + } else { + info!(" IPv4 in use and IPv6 is fallback"); + } + } else { + let v6_works = upstream_result + .v6_results + .iter() + .any(|r| r.rtt_ms.is_some()); + let v4_works = upstream_result + .v4_results + .iter() + .any(|r| r.rtt_ms.is_some()); + if v6_works && !v4_works { + info!(" IPv6 only (IPv4 unavailable)"); + } else if v4_works && !v6_works { + info!(" IPv4 only (IPv6 unavailable)"); + } else if !v6_works && !v4_works { + info!(" No connectivity!"); + } + } + + info!(" via {}", upstream_result.upstream_name); + info!("============================================================"); + + // Print IPv6 results first + for dc in &upstream_result.v6_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + // Align: IPv6 addresses are longer, use fewer tabs + // [2001:b28:f23d:f001::a]:443 = ~28 chars + info!(" DC{} [IPv6] {}:\t\t{:.0} ms", dc.dc_idx, addr_str, rtt); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!(" DC{} [IPv6] {}:\t\tFAIL ({})", dc.dc_idx, addr_str, err); + } + } + } + + info!("============================================================"); + + // Print IPv4 results + for dc in &upstream_result.v4_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + // Align: IPv4 addresses are shorter, use more tabs + // 149.154.175.50:443 = ~18 chars + info!( + " DC{} [IPv4] {}:\t\t\t\t{:.0} ms", + dc.dc_idx, addr_str, rtt + ); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!( + " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", + dc.dc_idx, addr_str, err + ); + } + } + } + + info!("============================================================"); + } + } + + // Background tasks + let um_clone = upstream_manager.clone(); + tokio::spawn(async move { + um_clone.run_health_checks(prefer_ipv6).await; + }); + + let rc_clone = replay_checker.clone(); + tokio::spawn(async move { + rc_clone.run_periodic_cleanup().await; + }); let detected_ip = detect_ip().await; + debug!( + "Detected IPs: v4={:?} v6={:?}", + detected_ip.ipv4, detected_ip.ipv6 + ); let mut listeners = Vec::new(); @@ -72,6 +427,27 @@ async fn main() -> std::result::Result<(), Box> { Ok(socket) => { let listener = TcpListener::from_std(socket.into())?; info!("Listening on {}", addr); + + let public_ip = if let Some(ip) = listener_conf.announce_ip { + ip + } else if listener_conf.ip.is_unspecified() { + if listener_conf.ip.is_ipv4() { + detected_ip.ipv4.unwrap_or(listener_conf.ip) + } else { + detected_ip.ipv6.unwrap_or(listener_conf.ip) + } + } else { + listener_conf.ip + }; + + // Per-listener links (only when public_host is NOT set) + let links = &config.general.links; + if links.public_host.is_none() && !links.show.is_empty() { + let link_host = public_ip.to_string(); + let link_port = links.public_port.unwrap_or(config.server.port); + print_proxy_links(&link_host, link_port, &config); + } + listeners.push(listener); } Err(e) => { @@ -80,96 +456,122 @@ async fn main() -> std::result::Result<(), Box> { } } + // Unix socket listener #[cfg(unix)] - if let Some(ref unix_path) = config.server.listen_unix_sock { - - let std_listener = create_unix_listener(unix_path)?; - let unix_listener = UnixListener::from_std(std_listener)?; - - let config = config.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let ip_tracker = ip_tracker.clone(); - - tokio::spawn(async move { - - let unix_conn_counter = - std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); - - loop { - - match unix_listener.accept().await { - - Ok((stream, _)) => { - - let conn_id = unix_conn_counter.fetch_add( - 1, - std::sync::atomic::Ordering::Relaxed - ); - - let fake_peer = - SocketAddr::from(([127,0,0,1], conn_id as u16)); - - let config = config.clone(); - let stats = stats.clone(); - let upstream_manager = upstream_manager.clone(); - let replay_checker = replay_checker.clone(); - let buffer_pool = buffer_pool.clone(); - let rng = rng.clone(); - let me_pool = me_pool.clone(); - let ip_tracker = ip_tracker.clone(); - - tokio::spawn(async move { - - if let Err(e) = handle_client_stream( - - stream, - fake_peer, - config, - stats, - upstream_manager, - replay_checker, - buffer_pool, - rng, - me_pool, - ip_tracker.clone(), - - ).await { - - debug!( - error = %e, - "Unix socket connection error" - ); - } - - }); - + let unix_sock_path = if let Some(ref unix_path) = config.server.listen_unix_sock { + match create_unix_listener(unix_path) { + Ok(std_listener) => { + // Set socket file permissions if configured + if let Some(ref perm_str) = config.server.listen_unix_sock_perm { + if let Ok(mode) = u32::from_str_radix(perm_str, 8) { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions( + unix_path, + std::fs::Permissions::from_mode(mode), + )?; } - - Err(e) => { - - error!("Unix socket accept error: {}", e); - - tokio::time::sleep( - Duration::from_millis(100) - ).await; - - } - } + let unix_listener = UnixListener::from_std(std_listener)?; + info!("Listening on unix:{}", unix_path); + + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + + let unix_conn_counter = std::sync::Arc::new( + std::sync::atomic::AtomicU64::new(1) + ); + + tokio::spawn(async move { + loop { + match unix_listener.accept().await { + Ok((stream, _unix_addr)) => { + let conn_id = unix_conn_counter.fetch_add( + 1, std::sync::atomic::Ordering::Relaxed + ); + let fake_peer = SocketAddr::from(([127, 0, 0, 1], conn_id as u16)); + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + + tokio::spawn(async move { + if let Err(e) = handle_client_stream( + stream, fake_peer, config, stats, + upstream_manager, replay_checker, buffer_pool, rng, + me_pool, + ).await { + debug!(error = %e, "Unix socket connection error"); + } + }); + } + Err(e) => { + error!("Unix socket accept error: {}", e); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }); + + Some(unix_path.clone()) } + Err(e) => { + error!("Failed to bind to unix:{}: {}", unix_path, e); + None + } + } + } else { + None + }; - }); - + // Links with explicit public_host (independent of TCP listeners) + let links = &config.general.links; + if let Some(ref public_host) = links.public_host { + if !links.show.is_empty() { + let link_port = links.public_port.unwrap_or(config.server.port); + print_proxy_links(public_host, link_port, &config); + } } - for listener in listeners { + // Warn if links were configured but couldn't be shown + // (no TCP listeners succeeded and no public_host set) + let links = &config.general.links; + if listeners.is_empty() && links.public_host.is_none() && !links.show.is_empty() { + warn!("Proxy links not shown: no TCP listeners bound. Set [general.links] public_host or fix listener errors above."); + } + if listeners.is_empty() { + #[cfg(unix)] + if unix_sock_path.is_none() { + error!("No listeners. Exiting."); + std::process::exit(1); + } + #[cfg(not(unix))] + { + error!("No listeners. Exiting."); + std::process::exit(1); + } + } + + // Switch to user-configured log level after startup + let runtime_filter = if has_rust_log { + EnvFilter::from_default_env() + } else { + EnvFilter::new(effective_log_level.to_filter_str()) + }; + filter_handle + .reload(runtime_filter) + .expect("Failed to switch log filter"); + + for listener in listeners { let config = config.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); @@ -180,13 +582,9 @@ async fn main() -> std::result::Result<(), Box> { let ip_tracker = ip_tracker.clone(); tokio::spawn(async move { - loop { - match listener.accept().await { - Ok((stream, peer_addr)) => { - let config = config.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); @@ -197,9 +595,7 @@ async fn main() -> std::result::Result<(), Box> { let ip_tracker = ip_tracker.clone(); tokio::spawn(async move { - if let Err(e) = ClientHandler::new( - stream, peer_addr, config, @@ -210,46 +606,33 @@ async fn main() -> std::result::Result<(), Box> { rng, me_pool, ip_tracker, - ) .run() .await { - - debug!( - peer = %peer_addr, - error = %e, - "Connection error" - ); - + debug!(peer = %peer_addr, error = %e, "Connection error"); } - }); - } - Err(e) => { - error!("Accept error: {}", e); - - tokio::time::sleep( - Duration::from_millis(100) - ).await; - + tokio::time::sleep(Duration::from_millis(100)).await; } - } - } - }); - } - signal::ctrl_c().await?; - - info!("Shutdown complete"); + match signal::ctrl_c().await { + Ok(()) => { + info!("Shutting down..."); + #[cfg(unix)] + if let Some(ref path) = unix_sock_path { + cleanup_unix_socket(path); + } + } + Err(e) => error!("Signal error: {}", e), + } Ok(()) - } diff --git a/src/proxy/client.rs b/src/proxy/client.rs index b96d102..8b4e8bc 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -24,6 +24,11 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; +/// Handle a client connection from any stream type (TCP, Unix socket) +/// +/// This is the generic entry point for client handling. Unlike `ClientHandler::new().run()`, +/// it skips TCP-specific socket configuration (TCP_NODELAY, keepalive, TCP_USER_TIMEOUT) +/// which is appropriate for non-TCP streams like Unix sockets. pub async fn handle_client_stream( mut stream: S, peer: SocketAddr, @@ -34,7 +39,6 @@ pub async fn handle_client_stream( buffer_pool: Arc, rng: Arc, me_pool: Option>, - ip_tracker: Arc, ) -> Result<()> where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -106,18 +110,9 @@ where }; RunningClientHandler::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - upstream_manager, - stats, - config, - buffer_pool, - rng, - me_pool, + crypto_reader, crypto_writer, success, + upstream_manager, stats, config, buffer_pool, rng, me_pool, local_addr, - peer, - ip_tracker.clone(), ).await } else { if !config.general.modes.classic && !config.general.modes.secure { @@ -148,18 +143,9 @@ where }; RunningClientHandler::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - upstream_manager, - stats, - config, - buffer_pool, - rng, - me_pool, + crypto_reader, crypto_writer, success, + upstream_manager, stats, config, buffer_pool, rng, me_pool, local_addr, - peer, - ip_tracker.clone(), ).await } }).await; @@ -181,7 +167,6 @@ where } } - pub struct ClientHandler; pub struct RunningClientHandler { @@ -230,7 +215,6 @@ impl RunningClientHandler { self.stats.increment_connects_all(); let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, "New connection"); if let Err(e) = configure_client_socket( @@ -269,7 +253,6 @@ impl RunningClientHandler { let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); @@ -282,7 +265,6 @@ impl RunningClientHandler { async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> { let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; @@ -376,7 +358,6 @@ impl RunningClientHandler { async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> { let peer = self.peer; - let ip_tracker = self.ip_tracker.clone(); if !self.config.general.modes.classic && !self.config.general.modes.secure { debug!(peer = %peer, "Non-TLS modes disabled"); @@ -437,9 +418,9 @@ impl RunningClientHandler { /// Main dispatch after successful handshake. /// Two modes: - /// - Direct: TCP relay to TG DC (existing behavior) + /// - Direct: TCP relay to TG DC (existing behavior) /// - Middle Proxy: RPC multiplex through ME pool (new — supports CDN DCs) - async fn handle_authenticated_static( + pub(crate) async fn handle_authenticated_static( client_reader: CryptoReader, client_writer: CryptoWriter, success: HandshakeSuccess,