From 829f53c12391de1bfc831cf467444ac4106a8508 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 11 Jan 2026 22:59:51 +0300 Subject: [PATCH] Fixes for iOS --- src/config/mod.rs | 7 ++- src/main.rs | 13 +++-- src/protocol/constants.rs | 5 +- src/proxy/client.rs | 7 ++- src/proxy/relay.rs | 111 +++++++++++++++++++++++++++++++++----- src/stream/buffer_pool.rs | 5 +- src/transport/socket.rs | 15 ++---- 7 files changed, 127 insertions(+), 36 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index d8a4806..bbe3f61 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -163,9 +163,12 @@ fn default_port() -> u16 { 443 } fn default_tls_domain() -> String { "www.google.com".to_string() } fn default_mask_port() -> u16 { 443 } fn default_replay_check_len() -> usize { 65536 } -fn default_handshake_timeout() -> u64 { 10 } +// CHANGED: Increased handshake timeout for bad mobile networks +fn default_handshake_timeout() -> u64 { 15 } fn default_connect_timeout() -> u64 { 10 } -fn default_keepalive() -> u64 { 600 } +// CHANGED: Reduced keepalive from 600s to 60s. +// Mobile NATs often drop idle connections after 60-120s. +fn default_keepalive() -> u64 { 60 } fn default_ack_timeout() -> u64 { 300 } fn default_listen_addr() -> String { "0.0.0.0".to_string() } fn default_fake_cert_len() -> usize { 2048 } diff --git a/src/main.rs b/src/main.rs index 45bd872..fde2a59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ mod util; use crate::config::ProxyConfig; use crate::proxy::ClientHandler; -use crate::stats::Stats; +use crate::stats::{Stats, ReplayChecker}; use crate::transport::{create_listener, ListenOptions, UpstreamManager}; use crate::util::ip::detect_ip; @@ -55,6 +55,9 @@ async fn main() -> Result<(), Box> { let config = Arc::new(config); let stats = Arc::new(Stats::new()); + // CHANGED: Initialize global ReplayChecker here instead of per-connection + let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len)); + // Initialize Upstream Manager let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); @@ -145,13 +148,11 @@ async fn main() -> Result<(), Box> { } // Accept loop - // For simplicity in this slice, we just spawn a task for each listener - // In a real high-perf scenario, we might want a more complex accept loop - for listener in listeners { let config = config.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); tokio::spawn(async move { loop { @@ -160,6 +161,7 @@ async fn main() -> Result<(), Box> { let config = config.clone(); let stats = stats.clone(); let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); tokio::spawn(async move { if let Err(e) = ClientHandler::new( @@ -167,7 +169,8 @@ async fn main() -> Result<(), Box> { peer_addr, config, stats, - upstream_manager + upstream_manager, + replay_checker // Pass global checker ).run().await { // Log only relevant errors // debug!("Connection error: {}", e); diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index 17857e4..d09473e 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -167,7 +167,10 @@ pub const DEFAULT_ACK_TIMEOUT_SECS: u64 = 300; // ============= Buffer Sizes ============= /// Default buffer size -pub const DEFAULT_BUFFER_SIZE: usize = 65536; +/// CHANGED: Reduced from 64KB to 16KB to match TLS record size and align with +/// the new buffering strategy for better iOS upload performance. +pub const DEFAULT_BUFFER_SIZE: usize = 16384; + /// Small buffer size for bad client handling pub const SMALL_BUFFER_SIZE: usize = 8192; diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 2cb4d9d..11e3a81 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -45,11 +45,10 @@ impl ClientHandler { config: Arc, stats: Arc, upstream_manager: Arc, + replay_checker: Arc, // CHANGED: Accept global checker ) -> RunningClientHandler { - // Note: ReplayChecker should be shared globally for proper replay protection - // Creating it per-connection disables replay protection across connections - // TODO: Pass Arc from main.rs - let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len)); + // CHANGED: Removed local creation of ReplayChecker. + // It is now passed from main.rs to ensure global replay protection. RunningClientHandler { stream, diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index c531a1a..f90b247 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -1,13 +1,21 @@ //! Bidirectional Relay use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; -use tracing::{debug, trace, warn}; +use tokio::time::Instant; +use tracing::{debug, trace, warn, info}; use crate::error::Result; use crate::stats::Stats; use std::sync::atomic::{AtomicU64, Ordering}; -const BUFFER_SIZE: usize = 65536; +// CHANGED: Reduced from 128KB to 16KB to match TLS record size and prevent bufferbloat. +// This is critical for iOS clients to maintain proper TCP flow control during uploads. +const BUFFER_SIZE: usize = 16384; + +// Activity timeout for iOS compatibility (30 minutes) +// iOS does not support TCP_USER_TIMEOUT, so we implement application-level timeout +const ACTIVITY_TIMEOUT_SECS: u64 = 1800; /// Relay data bidirectionally between client and server pub async fn relay_bidirectional( @@ -36,15 +44,40 @@ where let c2s_bytes_clone = Arc::clone(&c2s_bytes); let s2c_bytes_clone = Arc::clone(&s2c_bytes); - // Client -> Server task + // Activity timeout for iOS compatibility + let activity_timeout = Duration::from_secs(ACTIVITY_TIMEOUT_SECS); + + // Client -> Server task with activity timeout let c2s = tokio::spawn(async move { let mut buf = vec![0u8; BUFFER_SIZE]; let mut total_bytes = 0u64; let mut msg_count = 0u64; + let mut last_activity = Instant::now(); + let mut last_log = Instant::now(); loop { - match client_reader.read(&mut buf).await { - Ok(0) => { + // Read with timeout to prevent infinite hang on iOS + let read_result = tokio::time::timeout( + activity_timeout, + client_reader.read(&mut buf) + ).await; + + match read_result { + // Timeout - no activity for too long + Err(_) => { + warn!( + user = %user_c2s, + total_bytes = total_bytes, + msgs = msg_count, + idle_secs = last_activity.elapsed().as_secs(), + "Activity timeout (C->S) - no data received" + ); + let _ = server_writer.shutdown().await; + break; + } + + // Read successful + Ok(Ok(0)) => { debug!( user = %user_c2s, total_bytes = total_bytes, @@ -54,9 +87,11 @@ where let _ = server_writer.shutdown().await; break; } - Ok(n) => { + + Ok(Ok(n)) => { total_bytes += n as u64; msg_count += 1; + last_activity = Instant::now(); c2s_bytes_clone.store(total_bytes, Ordering::Relaxed); stats_c2s.add_user_octets_from(&user_c2s, n as u64); @@ -70,6 +105,19 @@ where "C->S data" ); + // Log activity every 10 seconds for large transfers + if last_log.elapsed() > Duration::from_secs(10) { + let rate = total_bytes as f64 / last_log.elapsed().as_secs_f64(); + info!( + user = %user_c2s, + total_bytes = total_bytes, + msgs = msg_count, + rate_kbps = (rate / 1024.0) as u64, + "C->S transfer in progress" + ); + last_log = Instant::now(); + } + if let Err(e) = server_writer.write_all(&buf[..n]).await { debug!(user = %user_c2s, error = %e, "Failed to write to server"); break; @@ -79,7 +127,8 @@ where break; } } - Err(e) => { + + Ok(Err(e)) => { debug!(user = %user_c2s, error = %e, total_bytes = total_bytes, "Client read error"); break; } @@ -87,15 +136,37 @@ where } }); - // Server -> Client task + // Server -> Client task with activity timeout let s2c = tokio::spawn(async move { let mut buf = vec![0u8; BUFFER_SIZE]; let mut total_bytes = 0u64; let mut msg_count = 0u64; + let mut last_activity = Instant::now(); + let mut last_log = Instant::now(); loop { - match server_reader.read(&mut buf).await { - Ok(0) => { + // Read with timeout to prevent infinite hang on iOS + let read_result = tokio::time::timeout( + activity_timeout, + server_reader.read(&mut buf) + ).await; + + match read_result { + // Timeout - no activity for too long + Err(_) => { + warn!( + user = %user_s2c, + total_bytes = total_bytes, + msgs = msg_count, + idle_secs = last_activity.elapsed().as_secs(), + "Activity timeout (S->C) - no data received" + ); + let _ = client_writer.shutdown().await; + break; + } + + // Read successful + Ok(Ok(0)) => { debug!( user = %user_s2c, total_bytes = total_bytes, @@ -105,9 +176,11 @@ where let _ = client_writer.shutdown().await; break; } - Ok(n) => { + + Ok(Ok(n)) => { total_bytes += n as u64; msg_count += 1; + last_activity = Instant::now(); s2c_bytes_clone.store(total_bytes, Ordering::Relaxed); stats_s2c.add_user_octets_to(&user_s2c, n as u64); @@ -121,6 +194,19 @@ where "S->C data" ); + // Log activity every 10 seconds for large transfers + if last_log.elapsed() > Duration::from_secs(10) { + let rate = total_bytes as f64 / last_log.elapsed().as_secs_f64(); + info!( + user = %user_s2c, + total_bytes = total_bytes, + msgs = msg_count, + rate_kbps = (rate / 1024.0) as u64, + "S->C transfer in progress" + ); + last_log = Instant::now(); + } + if let Err(e) = client_writer.write_all(&buf[..n]).await { debug!(user = %user_s2c, error = %e, "Failed to write to client"); break; @@ -130,7 +216,8 @@ where break; } } - Err(e) => { + + Ok(Err(e)) => { debug!(user = %user_s2c, error = %e, total_bytes = total_bytes, "Server read error"); break; } diff --git a/src/stream/buffer_pool.rs b/src/stream/buffer_pool.rs index 55be736..ac4a5f9 100644 --- a/src/stream/buffer_pool.rs +++ b/src/stream/buffer_pool.rs @@ -11,8 +11,9 @@ use std::sync::Arc; // ============= Configuration ============= -/// Default buffer size (64KB - good for MTProto) -pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024; +/// Default buffer size +/// CHANGED: Reduced from 64KB to 16KB to match TLS record size and prevent bufferbloat. +pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024; /// Default maximum number of pooled buffers pub const DEFAULT_MAX_BUFFERS: usize = 1024; diff --git a/src/transport/socket.rs b/src/transport/socket.rs index 09e5148..a07c21c 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -30,20 +30,13 @@ pub fn configure_tcp_socket( socket.set_tcp_keepalive(&keepalive)?; } - // Set buffer sizes - set_buffer_sizes(&socket, 65536, 65536)?; + // CHANGED: Removed manual buffer size setting (was 256KB). + // Allowing the OS kernel to handle TCP window scaling (Autotuning) is critical + // for mobile clients to avoid bufferbloat and stalled connections during uploads. Ok(()) } -/// Set socket buffer sizes -fn set_buffer_sizes(socket: &socket2::SockRef, recv: usize, send: usize) -> Result<()> { - // These may fail on some systems, so we ignore errors - let _ = socket.set_recv_buffer_size(recv); - let _ = socket.set_send_buffer_size(send); - Ok(()) -} - /// Configure socket for accepting client connections pub fn configure_client_socket( stream: &TcpStream, @@ -65,6 +58,8 @@ pub fn configure_client_socket( socket.set_tcp_keepalive(&keepalive)?; // Set TCP user timeout (Linux only) + // NOTE: iOS does not support TCP_USER_TIMEOUT - application-level timeout + // is implemented in relay_bidirectional instead #[cfg(target_os = "linux")] { use std::os::unix::io::AsRawFd;