RPC Flags Fixes

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-13 14:28:47 +03:00
parent f1c1f42de8
commit e62b41ae64
3 changed files with 236 additions and 106 deletions

View File

@@ -249,12 +249,8 @@ fn ipv4_to_mapped_v6(ip: Ipv4Addr) -> [u8; 16] {
fn addr_to_ip_u32(addr: &SocketAddr) -> u32 {
match addr.ip() {
IpAddr::V4(v4) => u32::from_le_bytes(v4.octets()),
IpAddr::V6(v6) => {
if let Some(v4) = v6.to_ipv4_mapped() {
u32::from_le_bytes(v4.octets())
} else { 0 }
}
IpAddr::V4(v4) => u32::from_be_bytes(v4.octets()),
IpAddr::V6(_) => 0,
}
}
@@ -338,12 +334,10 @@ fn build_proxy_req_payload(
our_addr: SocketAddr,
data: &[u8],
proxy_tag: Option<&[u8]>,
proto_flags: u32,
) -> Vec<u8> {
let mut flags: u32 = 0;
let has_tag = proxy_tag.is_some();
if has_tag {
flags |= 8; // C: `flags |= 8` when proxy_tag_set
}
// proto_flags are provided by caller and must already match C/Erlang transport flags.
let flags: u32 = proto_flags;
let mut b = Vec::with_capacity(128 + data.len());
b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes());
@@ -365,7 +359,7 @@ fn build_proxy_req_payload(
b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes());
// Extra section (proxy_tag)
if flags & 12 != 0 {
if flags & (RPC_FLAG_HAS_AD_TAG | 0x4) != 0 {
let extra_start = b.len();
b.extend_from_slice(&0u32.to_le_bytes()); // placeholder
@@ -400,7 +394,7 @@ fn build_proxy_req_payload(
pub struct MePool {
registry: Arc<ConnRegistry>,
writers: RwLock<Vec<Arc<Mutex<RpcWriter>>>>,
writers: Arc<RwLock<Vec<Arc<Mutex<RpcWriter>>>>>,
rr: AtomicU64,
proxy_tag: Option<Vec<u8>>,
/// Telegram proxy-secret (binary, 32-512 bytes)
@@ -412,7 +406,7 @@ impl MePool {
pub fn new(proxy_tag: Option<Vec<u8>>, proxy_secret: Vec<u8>) -> Arc<Self> {
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
writers: RwLock::new(Vec::new()),
writers: Arc::new(RwLock::new(Vec::new())),
rr: AtomicU64::new(0),
proxy_tag,
proxy_secret,
@@ -424,6 +418,10 @@ impl MePool {
&self.registry
}
fn writers_arc(&self) -> Arc<RwLock<Vec<Arc<Mutex<RpcWriter>>>>> {
self.writers.clone()
}
/// key_selector = first 4 bytes of proxy-secret as LE u32
/// C: main_secret.key_signature via union { char secret[]; int key_signature; }
fn key_selector(&self) -> u32 {
@@ -692,18 +690,25 @@ impl MePool {
info!(%addr, "RPC handshake OK");
// ===== 6. Setup writer + reader =====
self.writers.write().await.push(Arc::new(Mutex::new(RpcWriter {
let rpc_w = Arc::new(Mutex::new(RpcWriter {
writer: wr,
key: wk,
iv: write_iv,
seq_no: 0,
})));
}));
self.writers.write().await.push(rpc_w.clone());
let reg = self.registry.clone();
let w_pong = rpc_w.clone();
let w_pool = self.writers_arc();
tokio::spawn(async move {
if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf).await {
if let Err(e) = reader_loop(rd, rk, read_iv, reg, enc_buf, dec_buf, w_pong.clone()).await {
warn!(error = %e, "ME reader ended");
}
// Remove dead writer from pool
let mut ws = w_pool.write().await;
ws.retain(|w| !Arc::ptr_eq(w, &w_pong));
info!(remaining = ws.len(), "Dead ME writer removed from pool");
});
Ok(())
@@ -715,17 +720,32 @@ impl MePool {
client_addr: SocketAddr,
our_addr: SocketAddr,
data: &[u8],
proto_flags: u32,
) -> Result<()> {
let ws = self.writers.read().await;
if ws.is_empty() {
return Err(ProxyError::Proxy("No ME connections".into()));
}
let w = ws[self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len()].clone();
drop(ws);
let payload = build_proxy_req_payload(
conn_id, client_addr, our_addr, data, self.proxy_tag.as_deref(),
conn_id, client_addr, our_addr, data,
self.proxy_tag.as_deref(), proto_flags,
);
w.lock().await.send(&payload).await
loop {
let ws = self.writers.read().await;
if ws.is_empty() {
return Err(ProxyError::Proxy("All ME connections dead".into()));
}
let idx = self.rr.fetch_add(1, Ordering::Relaxed) as usize % ws.len();
let w = ws[idx].clone();
drop(ws);
match w.lock().await.send(&payload).await {
Ok(()) => return Ok(()),
Err(e) => {
warn!(error = %e, "ME write failed, removing dead conn");
let mut ws = self.writers.write().await;
ws.retain(|o| !Arc::ptr_eq(o, &w));
if ws.is_empty() {
return Err(ProxyError::Proxy("All ME connections dead".into()));
}
}
}
}
}
pub async fn send_close(&self, conn_id: u64) -> Result<()> {
@@ -736,7 +756,11 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
let _ = w.lock().await.send(&p).await;
if let Err(e) = w.lock().await.send(&p).await {
debug!(error = %e, "ME close write failed");
let mut ws = self.writers.write().await;
ws.retain(|o| !Arc::ptr_eq(o, &w));
}
}
self.registry.unregister(conn_id).await;
Ok(())
@@ -756,6 +780,7 @@ async fn reader_loop(
reg: Arc<ConnRegistry>,
mut enc_leftover: BytesMut,
mut dec: BytesMut,
writer: Arc<Mutex<RpcWriter>>,
) -> Result<()> {
let mut raw = enc_leftover;
loop {
@@ -824,6 +849,16 @@ async fn reader_loop(
debug!(cid, "CLOSE_CONN from ME");
reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
trace!(ping_id, "RPC_PING -> PONG");
let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes());
if let Err(e) = writer.lock().await.send(&pong).await {
warn!(error = %e, "PONG send failed");
break;
}
} else {
debug!(rpc_type = format_args!("0x{:08x}", pt), len = body.len(), "Unknown RPC");
}
@@ -831,6 +866,47 @@ async fn reader_loop(
}
}
// ========== Proto flags ==========
/// Map ProtoTag to C-compatible RPC_PROXY_REQ transport flags.
/// Returned value already includes ad_tag/magic/extmode2.
pub fn proto_flags_for_tag(proto_tag: Option<u32>) -> u32 {
let mut flags = RPC_FLAG_HAS_AD_TAG | RPC_FLAG_MAGIC | RPC_FLAG_EXTMODE2;
match proto_tag {
Some(0xdddddddd) => {
// Secure
flags |= RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE;
}
Some(0xeeeeeeee) => {
// Intermediate
flags |= RPC_FLAG_INTERMEDIATE;
}
Some(0xefefefef) => {
// Abridged
flags |= RPC_FLAG_ABRIDGED;
}
_ => {
flags |= RPC_FLAG_INTERMEDIATE;
}
}
flags
}
#[cfg(test)]
mod tests {
use super::proto_flags_for_tag;
#[test]
fn proto_flags_secure_matches_reference() {
assert_eq!(proto_flags_for_tag(Some(0xdddddddd)), 0x28021008);
}
#[test]
fn proto_flags_intermediate_matches_reference() {
assert_eq!(proto_flags_for_tag(Some(0xeeeeeeee)), 0x20021008);
}
}
// ========== Health Monitor (Phase 4) ==========
pub async fn me_health_monitor(