From dddf9f30dca90914f32ff67f7f41c7fa12b25346 Mon Sep 17 00:00:00 2001 From: Andrey Akifev Date: Mon, 16 Feb 2026 12:49:16 +0700 Subject: [PATCH 1/2] Fix HOL --- src/transport/middle_proxy/registry.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index abb2b0c..a6a81c8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use tokio::sync::Mutex; pub struct ConnRegistry { - map: RwLock>>, + map: RwLock>>, writers: RwLock>>>, next_id: AtomicU64, } @@ -25,9 +25,11 @@ impl ConnRegistry { } } - pub async fn register(&self) -> (u64, mpsc::Receiver) { + pub async fn register(&self) -> (u64, mpsc::UnboundedReceiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = mpsc::channel(256); + // Unbounded per-connection queue prevents reader-loop HOL blocking on + // slow clients: routing stays non-blocking and preserves message order. + let (tx, rx) = mpsc::unbounded_channel(); self.map.write().await.insert(id, tx); (id, rx) } @@ -40,7 +42,7 @@ impl ConnRegistry { pub async fn route(&self, id: u64, resp: MeResponse) -> bool { let m = self.map.read().await; if let Some(tx) = m.get(&id) { - tx.send(resp).await.is_ok() + tx.send(resp).is_ok() } else { false } From e54fb3fffc7413bc9e4d14688a149c0a84deb7e8 Mon Sep 17 00:00:00 2001 From: Andrey Akifev Date: Mon, 16 Feb 2026 12:49:49 +0700 Subject: [PATCH 2/2] Reduce per-frame flush overhead --- src/proxy/middle_relay.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 01ccc20..09dd532 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -263,7 +263,14 @@ where } } - client_writer.flush().await.map_err(ProxyError::Io) + // Avoid unconditional per-frame flush (throughput killer on large downloads). + // Flush only when low-latency ack semantics are requested or when + // CryptoWriter has buffered pending ciphertext that must be drained. + if quickack || client_writer.has_pending() { + client_writer.flush().await.map_err(ProxyError::Io)?; + } + + Ok(()) } async fn write_client_ack( @@ -283,5 +290,6 @@ where .write_all(&bytes) .await .map_err(ProxyError::Io)?; + // ACK should remain low-latency. client_writer.flush().await.map_err(ProxyError::Io) }