diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 01ccc20..09dd532 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -263,7 +263,14 @@ where } } - client_writer.flush().await.map_err(ProxyError::Io) + // Avoid unconditional per-frame flush (throughput killer on large downloads). + // Flush only when low-latency ack semantics are requested or when + // CryptoWriter has buffered pending ciphertext that must be drained. + if quickack || client_writer.has_pending() { + client_writer.flush().await.map_err(ProxyError::Io)?; + } + + Ok(()) } async fn write_client_ack( @@ -283,5 +290,6 @@ where .write_all(&bytes) .await .map_err(ProxyError::Io)?; + // ACK should remain low-latency. client_writer.flush().await.map_err(ProxyError::Io) } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index abb2b0c..a6a81c8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use tokio::sync::Mutex; pub struct ConnRegistry { - map: RwLock>>, + map: RwLock>>, writers: RwLock>>>, next_id: AtomicU64, } @@ -25,9 +25,11 @@ impl ConnRegistry { } } - pub async fn register(&self) -> (u64, mpsc::Receiver) { + pub async fn register(&self) -> (u64, mpsc::UnboundedReceiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = mpsc::channel(256); + // Unbounded per-connection queue prevents reader-loop HOL blocking on + // slow clients: routing stays non-blocking and preserves message order. + let (tx, rx) = mpsc::unbounded_channel(); self.map.write().await.insert(id, tx); (id, rx) } @@ -40,7 +42,7 @@ impl ConnRegistry { pub async fn route(&self, id: u64, resp: MeResponse) -> bool { let m = self.map.read().await; if let Some(tx) = m.get(&id) { - tx.send(resp).await.is_ok() + tx.send(resp).is_ok() } else { false }