Fixes for iOS
This commit is contained in:
@@ -1,13 +1,21 @@
|
||||
//! Bidirectional Relay
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, trace, warn};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, trace, warn, info};
|
||||
use crate::error::Result;
|
||||
use crate::stats::Stats;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
const BUFFER_SIZE: usize = 65536;
|
||||
// CHANGED: Reduced from 128KB to 16KB to match TLS record size and prevent bufferbloat.
|
||||
// This is critical for iOS clients to maintain proper TCP flow control during uploads.
|
||||
const BUFFER_SIZE: usize = 16384;
|
||||
|
||||
// Activity timeout for iOS compatibility (30 minutes)
|
||||
// iOS does not support TCP_USER_TIMEOUT, so we implement application-level timeout
|
||||
const ACTIVITY_TIMEOUT_SECS: u64 = 1800;
|
||||
|
||||
/// Relay data bidirectionally between client and server
|
||||
pub async fn relay_bidirectional<CR, CW, SR, SW>(
|
||||
@@ -36,15 +44,40 @@ where
|
||||
let c2s_bytes_clone = Arc::clone(&c2s_bytes);
|
||||
let s2c_bytes_clone = Arc::clone(&s2c_bytes);
|
||||
|
||||
// Client -> Server task
|
||||
// Activity timeout for iOS compatibility
|
||||
let activity_timeout = Duration::from_secs(ACTIVITY_TIMEOUT_SECS);
|
||||
|
||||
// Client -> Server task with activity timeout
|
||||
let c2s = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; BUFFER_SIZE];
|
||||
let mut total_bytes = 0u64;
|
||||
let mut msg_count = 0u64;
|
||||
let mut last_activity = Instant::now();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
loop {
|
||||
match client_reader.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
// Read with timeout to prevent infinite hang on iOS
|
||||
let read_result = tokio::time::timeout(
|
||||
activity_timeout,
|
||||
client_reader.read(&mut buf)
|
||||
).await;
|
||||
|
||||
match read_result {
|
||||
// Timeout - no activity for too long
|
||||
Err(_) => {
|
||||
warn!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
idle_secs = last_activity.elapsed().as_secs(),
|
||||
"Activity timeout (C->S) - no data received"
|
||||
);
|
||||
let _ = server_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Read successful
|
||||
Ok(Ok(0)) => {
|
||||
debug!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
@@ -54,9 +87,11 @@ where
|
||||
let _ = server_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
|
||||
Ok(Ok(n)) => {
|
||||
total_bytes += n as u64;
|
||||
msg_count += 1;
|
||||
last_activity = Instant::now();
|
||||
c2s_bytes_clone.store(total_bytes, Ordering::Relaxed);
|
||||
|
||||
stats_c2s.add_user_octets_from(&user_c2s, n as u64);
|
||||
@@ -70,6 +105,19 @@ where
|
||||
"C->S data"
|
||||
);
|
||||
|
||||
// Log activity every 10 seconds for large transfers
|
||||
if last_log.elapsed() > Duration::from_secs(10) {
|
||||
let rate = total_bytes as f64 / last_log.elapsed().as_secs_f64();
|
||||
info!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
rate_kbps = (rate / 1024.0) as u64,
|
||||
"C->S transfer in progress"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
|
||||
if let Err(e) = server_writer.write_all(&buf[..n]).await {
|
||||
debug!(user = %user_c2s, error = %e, "Failed to write to server");
|
||||
break;
|
||||
@@ -79,7 +127,8 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_c2s, error = %e, total_bytes = total_bytes, "Client read error");
|
||||
break;
|
||||
}
|
||||
@@ -87,15 +136,37 @@ where
|
||||
}
|
||||
});
|
||||
|
||||
// Server -> Client task
|
||||
// Server -> Client task with activity timeout
|
||||
let s2c = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; BUFFER_SIZE];
|
||||
let mut total_bytes = 0u64;
|
||||
let mut msg_count = 0u64;
|
||||
let mut last_activity = Instant::now();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
loop {
|
||||
match server_reader.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
// Read with timeout to prevent infinite hang on iOS
|
||||
let read_result = tokio::time::timeout(
|
||||
activity_timeout,
|
||||
server_reader.read(&mut buf)
|
||||
).await;
|
||||
|
||||
match read_result {
|
||||
// Timeout - no activity for too long
|
||||
Err(_) => {
|
||||
warn!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
idle_secs = last_activity.elapsed().as_secs(),
|
||||
"Activity timeout (S->C) - no data received"
|
||||
);
|
||||
let _ = client_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
// Read successful
|
||||
Ok(Ok(0)) => {
|
||||
debug!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
@@ -105,9 +176,11 @@ where
|
||||
let _ = client_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
|
||||
Ok(Ok(n)) => {
|
||||
total_bytes += n as u64;
|
||||
msg_count += 1;
|
||||
last_activity = Instant::now();
|
||||
s2c_bytes_clone.store(total_bytes, Ordering::Relaxed);
|
||||
|
||||
stats_s2c.add_user_octets_to(&user_s2c, n as u64);
|
||||
@@ -121,6 +194,19 @@ where
|
||||
"S->C data"
|
||||
);
|
||||
|
||||
// Log activity every 10 seconds for large transfers
|
||||
if last_log.elapsed() > Duration::from_secs(10) {
|
||||
let rate = total_bytes as f64 / last_log.elapsed().as_secs_f64();
|
||||
info!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
rate_kbps = (rate / 1024.0) as u64,
|
||||
"S->C transfer in progress"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
|
||||
if let Err(e) = client_writer.write_all(&buf[..n]).await {
|
||||
debug!(user = %user_s2c, error = %e, "Failed to write to client");
|
||||
break;
|
||||
@@ -130,7 +216,8 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_s2c, error = %e, total_bytes = total_bytes, "Server read error");
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user