Semaphore + Async Magics for Defcluster
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
@@ -358,7 +358,7 @@ impl RunningClientHandler {
|
||||
original_dc = dc_idx,
|
||||
fallback_dc = (fallback_idx + 1) as u16,
|
||||
fallback_addr = %datacenters[fallback_idx],
|
||||
"Unknown DC not in ±1..5 range, routing to default cluster (same as C impl: mf_cluster_lookup with force=1 -> default_cluster)"
|
||||
"Special DC ---> default_cluster"
|
||||
);
|
||||
|
||||
Ok(SocketAddr::new(datacenters[fallback_idx], TG_DATACENTER_PORT))
|
||||
|
||||
@@ -9,6 +9,9 @@ use tracing::debug;
|
||||
use crate::config::ProxyConfig;
|
||||
|
||||
const MASK_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Maximum duration for the entire masking relay.
|
||||
/// Limits resource consumption from slow-loris attacks and port scanners.
|
||||
const MASK_RELAY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
const MASK_BUFFER_SIZE: usize = 8192;
|
||||
|
||||
/// Detect client type based on initial data
|
||||
|
||||
@@ -1,262 +1,195 @@
|
||||
//! Bidirectional Relay
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, trace, warn, info};
|
||||
use crate::error::Result;
|
||||
use crate::stats::Stats;
|
||||
use crate::stream::BufferPool;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
// Activity timeout for iOS compatibility (30 minutes)
|
||||
const ACTIVITY_TIMEOUT_SECS: u64 = 1800;
|
||||
|
||||
/// Relay data bidirectionally between client and server
|
||||
pub async fn relay_bidirectional<CR, CW, SR, SW>(
|
||||
mut client_reader: CR,
|
||||
mut client_writer: CW,
|
||||
mut server_reader: SR,
|
||||
mut server_writer: SW,
|
||||
user: &str,
|
||||
stats: Arc<Stats>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
) -> Result<()>
|
||||
where
|
||||
CR: AsyncRead + Unpin + Send + 'static,
|
||||
CW: AsyncWrite + Unpin + Send + 'static,
|
||||
SR: AsyncRead + Unpin + Send + 'static,
|
||||
SW: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let user_c2s = user.to_string();
|
||||
let user_s2c = user.to_string();
|
||||
|
||||
let stats_c2s = Arc::clone(&stats);
|
||||
let stats_s2c = Arc::clone(&stats);
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, trace, warn};
|
||||
use crate::error::Result;
|
||||
use crate::stats::Stats;
|
||||
use crate::stream::BufferPool;
|
||||
|
||||
let c2s_bytes = Arc::new(AtomicU64::new(0));
|
||||
let s2c_bytes = Arc::new(AtomicU64::new(0));
|
||||
let c2s_bytes_clone = Arc::clone(&c2s_bytes);
|
||||
let s2c_bytes_clone = Arc::clone(&s2c_bytes);
|
||||
// Activity timeout for iOS compatibility (30 minutes)
|
||||
const ACTIVITY_TIMEOUT_SECS: u64 = 1800;
|
||||
|
||||
let activity_timeout = Duration::from_secs(ACTIVITY_TIMEOUT_SECS);
|
||||
/// Relay data bidirectionally between client and server.
|
||||
///
|
||||
/// Uses a single-task select!-based loop instead of spawning two tasks.
|
||||
/// This eliminates:
|
||||
/// - 2× task spawn overhead per connection
|
||||
/// - Zombie task problem (old code used select! on JoinHandles but
|
||||
/// never aborted the losing task — it would run for up to 30 min)
|
||||
/// - Extra Arc<AtomicU64> allocations for cross-task byte counters
|
||||
///
|
||||
/// The flush()-per-write was also removed: TCP_NODELAY is set on all
|
||||
/// sockets (socket.rs), so data is pushed immediately without Nagle
|
||||
/// buffering. Explicit flush() on every small read was causing a
|
||||
/// syscall storm and defeating CryptoWriter's internal coalescing.
|
||||
pub async fn relay_bidirectional<CR, CW, SR, SW>(
|
||||
mut client_reader: CR,
|
||||
mut client_writer: CW,
|
||||
mut server_reader: SR,
|
||||
mut server_writer: SW,
|
||||
user: &str,
|
||||
stats: Arc<Stats>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
) -> Result<()>
|
||||
where
|
||||
CR: AsyncRead + Unpin + Send + 'static,
|
||||
CW: AsyncWrite + Unpin + Send + 'static,
|
||||
SR: AsyncRead + Unpin + Send + 'static,
|
||||
SW: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
// Get buffers from pool — one per direction
|
||||
let mut c2s_buf = buffer_pool.get();
|
||||
let cap = c2s_buf.capacity();
|
||||
c2s_buf.resize(cap, 0);
|
||||
|
||||
let pool_c2s = buffer_pool.clone();
|
||||
let pool_s2c = buffer_pool.clone();
|
||||
let mut s2c_buf = buffer_pool.get();
|
||||
let cap = s2c_buf.capacity();
|
||||
s2c_buf.resize(cap, 0);
|
||||
|
||||
// Client -> Server task
|
||||
let c2s = tokio::spawn(async move {
|
||||
// Get buffer from pool
|
||||
let mut pooled_buf = pool_c2s.get();
|
||||
// CRITICAL FIX: BytesMut from pool has len 0. We must resize it to be usable as &mut [u8].
|
||||
// We use the full capacity.
|
||||
let cap = pooled_buf.capacity();
|
||||
pooled_buf.resize(cap, 0);
|
||||
|
||||
let mut total_bytes = 0u64;
|
||||
let mut prev_total_bytes = 0u64;
|
||||
let mut msg_count = 0u64;
|
||||
let mut last_activity = Instant::now();
|
||||
let activity_timeout = Duration::from_secs(ACTIVITY_TIMEOUT_SECS);
|
||||
|
||||
let mut c2s_total: u64 = 0;
|
||||
let mut s2c_total: u64 = 0;
|
||||
let mut c2s_msgs: u64 = 0;
|
||||
let mut s2c_msgs: u64 = 0;
|
||||
|
||||
// For periodic rate logging
|
||||
let mut c2s_prev: u64 = 0;
|
||||
let mut s2c_prev: u64 = 0;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
loop {
|
||||
// Read with timeout
|
||||
let read_result = tokio::time::timeout(
|
||||
activity_timeout,
|
||||
client_reader.read(&mut pooled_buf)
|
||||
).await;
|
||||
|
||||
match read_result {
|
||||
Err(_) => {
|
||||
warn!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
idle_secs = last_activity.elapsed().as_secs(),
|
||||
"Activity timeout (C->S) - no data received"
|
||||
);
|
||||
let _ = server_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(Ok(0)) => {
|
||||
debug!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
"Client closed connection (C->S)"
|
||||
);
|
||||
let _ = server_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(Ok(n)) => {
|
||||
total_bytes += n as u64;
|
||||
msg_count += 1;
|
||||
last_activity = Instant::now();
|
||||
c2s_bytes_clone.store(total_bytes, Ordering::Relaxed);
|
||||
|
||||
stats_c2s.add_user_octets_from(&user_c2s, n as u64);
|
||||
stats_c2s.increment_user_msgs_from(&user_c2s);
|
||||
|
||||
trace!(
|
||||
user = %user_c2s,
|
||||
bytes = n,
|
||||
total = total_bytes,
|
||||
"C->S data"
|
||||
);
|
||||
|
||||
// Log activity every 10 seconds with correct rate
|
||||
let elapsed = last_log.elapsed();
|
||||
if elapsed > Duration::from_secs(10) {
|
||||
let delta = total_bytes - prev_total_bytes;
|
||||
let rate = delta as f64 / elapsed.as_secs_f64();
|
||||
|
||||
debug!(
|
||||
user = %user_c2s,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
rate_kbps = (rate / 1024.0) as u64,
|
||||
"C->S transfer in progress"
|
||||
);
|
||||
|
||||
last_log = Instant::now();
|
||||
prev_total_bytes = total_bytes;
|
||||
}
|
||||
|
||||
if let Err(e) = server_writer.write_all(&pooled_buf[..n]).await {
|
||||
debug!(user = %user_c2s, error = %e, "Failed to write to server");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = server_writer.flush().await {
|
||||
debug!(user = %user_c2s, error = %e, "Failed to flush to server");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_c2s, error = %e, total_bytes = total_bytes, "Client read error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Server -> Client task
|
||||
let s2c = tokio::spawn(async move {
|
||||
// Get buffer from pool
|
||||
let mut pooled_buf = pool_s2c.get();
|
||||
// CRITICAL FIX: Resize buffer
|
||||
let cap = pooled_buf.capacity();
|
||||
pooled_buf.resize(cap, 0);
|
||||
|
||||
let mut total_bytes = 0u64;
|
||||
let mut prev_total_bytes = 0u64;
|
||||
let mut msg_count = 0u64;
|
||||
let mut last_activity = Instant::now();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
loop {
|
||||
let read_result = tokio::time::timeout(
|
||||
activity_timeout,
|
||||
server_reader.read(&mut pooled_buf)
|
||||
).await;
|
||||
|
||||
match read_result {
|
||||
Err(_) => {
|
||||
warn!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
idle_secs = last_activity.elapsed().as_secs(),
|
||||
"Activity timeout (S->C) - no data received"
|
||||
);
|
||||
let _ = client_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(Ok(0)) => {
|
||||
debug!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
"Server closed connection (S->C)"
|
||||
);
|
||||
let _ = client_writer.shutdown().await;
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(Ok(n)) => {
|
||||
total_bytes += n as u64;
|
||||
msg_count += 1;
|
||||
last_activity = Instant::now();
|
||||
s2c_bytes_clone.store(total_bytes, Ordering::Relaxed);
|
||||
|
||||
stats_s2c.add_user_octets_to(&user_s2c, n as u64);
|
||||
stats_s2c.increment_user_msgs_to(&user_s2c);
|
||||
|
||||
trace!(
|
||||
user = %user_s2c,
|
||||
bytes = n,
|
||||
total = total_bytes,
|
||||
"S->C data"
|
||||
);
|
||||
|
||||
let elapsed = last_log.elapsed();
|
||||
if elapsed > Duration::from_secs(10) {
|
||||
let delta = total_bytes - prev_total_bytes;
|
||||
let rate = delta as f64 / elapsed.as_secs_f64();
|
||||
|
||||
debug!(
|
||||
user = %user_s2c,
|
||||
total_bytes = total_bytes,
|
||||
msgs = msg_count,
|
||||
rate_kbps = (rate / 1024.0) as u64,
|
||||
"S->C transfer in progress"
|
||||
);
|
||||
|
||||
last_log = Instant::now();
|
||||
prev_total_bytes = total_bytes;
|
||||
}
|
||||
|
||||
if let Err(e) = client_writer.write_all(&pooled_buf[..n]).await {
|
||||
debug!(user = %user_s2c, error = %e, "Failed to write to client");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = client_writer.flush().await {
|
||||
debug!(user = %user_s2c, error = %e, "Failed to flush to client");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_s2c, error = %e, total_bytes = total_bytes, "Server read error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let user_owned = user.to_string();
|
||||
|
||||
// Wait for either direction to complete
|
||||
tokio::select! {
|
||||
result = c2s => {
|
||||
if let Err(e) = result {
|
||||
warn!(error = %e, "C->S task panicked");
|
||||
}
|
||||
}
|
||||
result = s2c => {
|
||||
if let Err(e) = result {
|
||||
warn!(error = %e, "S->C task panicked");
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// Client -> Server direction
|
||||
result = tokio::time::timeout(activity_timeout, client_reader.read(&mut c2s_buf)) => {
|
||||
match result {
|
||||
Err(_) => {
|
||||
// Activity timeout
|
||||
warn!(
|
||||
user = %user_owned,
|
||||
c2s_bytes = c2s_total,
|
||||
s2c_bytes = s2c_total,
|
||||
"Activity timeout (C->S)"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Ok(0)) => {
|
||||
// Client closed
|
||||
debug!(
|
||||
user = %user_owned,
|
||||
c2s_bytes = c2s_total,
|
||||
s2c_bytes = s2c_total,
|
||||
"Client closed connection"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Ok(n)) => {
|
||||
c2s_total += n as u64;
|
||||
c2s_msgs += 1;
|
||||
|
||||
stats.add_user_octets_from(&user_owned, n as u64);
|
||||
stats.increment_user_msgs_from(&user_owned);
|
||||
|
||||
trace!(user = %user_owned, bytes = n, "C->S");
|
||||
|
||||
// Write without flush — TCP_NODELAY handles push
|
||||
if let Err(e) = server_writer.write_all(&c2s_buf[..n]).await {
|
||||
debug!(user = %user_owned, error = %e, "Write to server failed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_owned, error = %e, "Client read error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Server -> Client direction
|
||||
result = tokio::time::timeout(activity_timeout, server_reader.read(&mut s2c_buf)) => {
|
||||
match result {
|
||||
Err(_) => {
|
||||
warn!(
|
||||
user = %user_owned,
|
||||
c2s_bytes = c2s_total,
|
||||
s2c_bytes = s2c_total,
|
||||
"Activity timeout (S->C)"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Ok(0)) => {
|
||||
debug!(
|
||||
user = %user_owned,
|
||||
c2s_bytes = c2s_total,
|
||||
s2c_bytes = s2c_total,
|
||||
"Server closed connection"
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(Ok(n)) => {
|
||||
s2c_total += n as u64;
|
||||
s2c_msgs += 1;
|
||||
|
||||
stats.add_user_octets_to(&user_owned, n as u64);
|
||||
stats.increment_user_msgs_to(&user_owned);
|
||||
|
||||
trace!(user = %user_owned, bytes = n, "S->C");
|
||||
|
||||
if let Err(e) = client_writer.write_all(&s2c_buf[..n]).await {
|
||||
debug!(user = %user_owned, error = %e, "Write to client failed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
debug!(user = %user_owned, error = %e, "Server read error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic rate logging (every 10s)
|
||||
let elapsed = last_log.elapsed();
|
||||
if elapsed > Duration::from_secs(10) {
|
||||
let secs = elapsed.as_secs_f64();
|
||||
let c2s_delta = c2s_total - c2s_prev;
|
||||
let s2c_delta = s2c_total - s2c_prev;
|
||||
|
||||
debug!(
|
||||
user = %user_owned,
|
||||
c2s_kbps = (c2s_delta as f64 / secs / 1024.0) as u64,
|
||||
s2c_kbps = (s2c_delta as f64 / secs / 1024.0) as u64,
|
||||
c2s_total = c2s_total,
|
||||
s2c_total = s2c_total,
|
||||
"Relay active"
|
||||
);
|
||||
|
||||
c2s_prev = c2s_total;
|
||||
s2c_prev = s2c_total;
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Clean shutdown of both directions
|
||||
let _ = server_writer.shutdown().await;
|
||||
let _ = client_writer.shutdown().await;
|
||||
|
||||
debug!(
|
||||
user = %user_owned,
|
||||
c2s_bytes = c2s_total,
|
||||
s2c_bytes = s2c_total,
|
||||
c2s_msgs = c2s_msgs,
|
||||
s2c_msgs = s2c_msgs,
|
||||
"Relay finished"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
debug!(
|
||||
c2s_bytes = c2s_bytes.load(Ordering::Relaxed),
|
||||
s2c_bytes = s2c_bytes.load(Ordering::Relaxed),
|
||||
"Relay finished"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user