diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index 5ea7c66..86cd2bd 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -201,55 +201,46 @@ pub static RESERVED_NONCE_CONTINUES: &[[u8; 4]] = &[ // ============= RPC Constants (for Middle Proxy) ============= -/// RPC Proxy Request bytes (LE for 0x36cef1ee) -pub const RPC_PROXY_REQ_BYTES: [u8; 4] = [0xee, 0xf1, 0xce, 0x36]; -/// RPC Proxy Answer bytes (LE for 0x4403da0d) -pub const RPC_PROXY_ANS_BYTES: [u8; 4] = [0x0d, 0xda, 0x03, 0x44]; -/// RPC Close Extended bytes (LE for 0x5eb634a2) -pub const RPC_CLOSE_EXT_BYTES: [u8; 4] = [0xa2, 0x34, 0xb6, 0x5e]; -/// RPC Simple ACK bytes (LE for 0x3bac409b) -pub const RPC_SIMPLE_ACK_BYTES: [u8; 4] = [0x9b, 0x40, 0xac, 0x3b]; -/// RPC Unknown bytes (LE for 0x5730a2df) -pub const RPC_UNKNOWN_BYTES: [u8; 4] = [0xdf, 0xa2, 0x30, 0x57]; -/// RPC Handshake bytes (LE for 0x7682eef5) -pub const RPC_HANDSHAKE_BYTES: [u8; 4] = [0xf5, 0xee, 0x82, 0x76]; -/// RPC Nonce bytes (LE for 0x7acb87aa) -pub const RPC_NONCE_BYTES: [u8; 4] = [0xaa, 0x87, 0xcb, 0x7a]; +/// RPC Proxy Request -// === RPC Proxy Flags (from Erlang mtp_rpc.erl) === +/// RPC Flags (from Erlang mtp_rpc.erl) pub const RPC_FLAG_NOT_ENCRYPTED: u32 = 0x2; pub const RPC_FLAG_HAS_AD_TAG: u32 = 0x8; -pub const RPC_FLAG_MAGIC: u32 = 0x1000; -pub const RPC_FLAG_EXTMODE2: u32 = 0x20000; -pub const RPC_FLAG_PAD: u32 = 0x8000000; -pub const RPC_FLAG_INTERMEDIATE: u32 = 0x20000000; -pub const RPC_FLAG_ABRIDGED: u32 = 0x40000000; -pub const RPC_FLAG_QUICKACK: u32 = 0x80000000; +pub const RPC_FLAG_MAGIC: u32 = 0x1000; +pub const RPC_FLAG_EXTMODE2: u32 = 0x20000; +pub const RPC_FLAG_PAD: u32 = 0x8000000; +pub const RPC_FLAG_INTERMEDIATE: u32 = 0x20000000; +pub const RPC_FLAG_ABRIDGED: u32 = 0x40000000; +pub const RPC_FLAG_QUICKACK: u32 = 0x80000000; -pub const RPC_PROXY_REQ: u32 = 0x36cef1ee; -pub const RPC_PROXY_ANS: u32 = 0x4403da0d; -pub const RPC_CLOSE_CONN: u32 = 0x1fcf425d; -pub const RPC_CLOSE_EXT: u32 = 0x5eb634a2; -pub const RPC_SIMPLE_ACK: u32 = 0x3bac409b; -pub const TL_PROXY_TAG: u32 = 0xdb1e26ae; +pub const RPC_PROXY_REQ: [u8; 4] = [0xee, 0xf1, 0xce, 0x36]; +/// RPC Proxy Answer +pub const RPC_PROXY_ANS: [u8; 4] = [0x0d, 0xda, 0x03, 0x44]; +/// RPC Close Extended +pub const RPC_CLOSE_EXT: [u8; 4] = [0xa2, 0x34, 0xb6, 0x5e]; +/// RPC Simple ACK +pub const RPC_SIMPLE_ACK: [u8; 4] = [0x9b, 0x40, 0xac, 0x3b]; +/// RPC Unknown +pub const RPC_UNKNOWN: [u8; 4] = [0xdf, 0xa2, 0x30, 0x57]; +/// RPC Handshake +pub const RPC_HANDSHAKE: [u8; 4] = [0xf5, 0xee, 0x82, 0x76]; +/// RPC Nonce +pub const RPC_NONCE: [u8; 4] = [0xaa, 0x87, 0xcb, 0x7a]; -pub const RPC_PING_U32: u32 = 0x5730a2df; -pub const RPC_PONG_U32: u32 = 0x8430eaa7; - -/// Backward-compatible names for legacy call sites. +/// RPC Flags pub mod rpc_flags { - use super::*; - pub const FLAG_NOT_ENCRYPTED: u32 = RPC_FLAG_NOT_ENCRYPTED; - pub const FLAG_HAS_AD_TAG: u32 = RPC_FLAG_HAS_AD_TAG; - pub const FLAG_MAGIC: u32 = RPC_FLAG_MAGIC; - pub const FLAG_EXTMODE2: u32 = RPC_FLAG_EXTMODE2; - pub const FLAG_PAD: u32 = RPC_FLAG_PAD; - pub const FLAG_INTERMEDIATE: u32 = RPC_FLAG_INTERMEDIATE; - pub const FLAG_ABRIDGED: u32 = RPC_FLAG_ABRIDGED; - pub const FLAG_QUICKACK: u32 = RPC_FLAG_QUICKACK; + pub const FLAG_NOT_ENCRYPTED: u32 = 0x2; + pub const FLAG_HAS_AD_TAG: u32 = 0x8; + pub const FLAG_MAGIC: u32 = 0x1000; + pub const FLAG_EXTMODE2: u32 = 0x20000; + pub const FLAG_PAD: u32 = 0x8000000; + pub const FLAG_INTERMEDIATE: u32 = 0x20000000; + pub const FLAG_ABRIDGED: u32 = 0x40000000; + pub const FLAG_QUICKACK: u32 = 0x80000000; } -// ============= Middle-End Proxy Servers ============= + + // ============= Middle-End Proxy Servers ============= pub const ME_PROXY_PORT: u16 = 8888; pub static TG_MIDDLE_PROXIES_FLAT_V4: LazyLock> = LazyLock::new(|| { @@ -262,41 +253,42 @@ pub mod rpc_flags { ] }); -// ============= RPC Constants (u32 native endian) ============= -// From mtproto-common.h + net-tcp-rpc-common.h + mtproto-proxy.c - -pub const RPC_NONCE_U32: u32 = 0x7acb87aa; -pub const RPC_HANDSHAKE_U32: u32 = 0x7682eef5; -pub const RPC_HANDSHAKE_ERROR_U32: u32 = 0x6a27beda; -pub const TL_PROXY_TAG_U32: u32 = TL_PROXY_TAG; // mtproto-proxy.c:121 - -// mtproto-common.h -pub const RPC_PROXY_REQ_U32: u32 = RPC_PROXY_REQ; -pub const RPC_PROXY_ANS_U32: u32 = RPC_PROXY_ANS; -pub const RPC_CLOSE_CONN_U32: u32 = RPC_CLOSE_CONN; -pub const RPC_CLOSE_EXT_U32: u32 = RPC_CLOSE_EXT; -pub const RPC_SIMPLE_ACK_U32: u32 = RPC_SIMPLE_ACK; + // ============= RPC Constants (u32 native endian) ============= + // From mtproto-common.h + net-tcp-rpc-common.h + mtproto-proxy.c -pub const RPC_CRYPTO_NONE_U32: u32 = 0; -pub const RPC_CRYPTO_AES_U32: u32 = 1; - -pub mod proxy_flags { - use super::*; - pub const FLAG_HAS_AD_TAG: u32 = 1; - pub const FLAG_NOT_ENCRYPTED: u32 = RPC_FLAG_NOT_ENCRYPTED; - pub const FLAG_HAS_AD_TAG2: u32 = RPC_FLAG_HAS_AD_TAG; - pub const FLAG_MAGIC: u32 = RPC_FLAG_MAGIC; - pub const FLAG_EXTMODE2: u32 = RPC_FLAG_EXTMODE2; - pub const FLAG_PAD: u32 = RPC_FLAG_PAD; - pub const FLAG_INTERMEDIATE: u32 = RPC_FLAG_INTERMEDIATE; - pub const FLAG_ABRIDGED: u32 = RPC_FLAG_ABRIDGED; - pub const FLAG_QUICKACK: u32 = RPC_FLAG_QUICKACK; -} - -pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5; -pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10; - -#[cfg(test)] + pub const RPC_NONCE_U32: u32 = 0x7acb87aa; + pub const RPC_HANDSHAKE_U32: u32 = 0x7682eef5; + pub const RPC_HANDSHAKE_ERROR_U32: u32 = 0x6a27beda; + pub const TL_PROXY_TAG_U32: u32 = 0xdb1e26ae; // mtproto-proxy.c:121 + + // mtproto-common.h + pub const RPC_PROXY_REQ_U32: u32 = 0x36cef1ee; + pub const RPC_PROXY_ANS_U32: u32 = 0x4403da0d; + pub const RPC_CLOSE_CONN_U32: u32 = 0x1fcf425d; + pub const RPC_CLOSE_EXT_U32: u32 = 0x5eb634a2; + pub const RPC_SIMPLE_ACK_U32: u32 = 0x3bac409b; + pub const RPC_PING_U32: u32 = 0x5730a2df; + pub const RPC_PONG_U32: u32 = 0x8430eaa7; + + pub const RPC_CRYPTO_NONE_U32: u32 = 0; + pub const RPC_CRYPTO_AES_U32: u32 = 1; + + pub mod proxy_flags { + pub const FLAG_HAS_AD_TAG: u32 = 1; + pub const FLAG_NOT_ENCRYPTED: u32 = 0x2; + pub const FLAG_HAS_AD_TAG2: u32 = 0x8; + pub const FLAG_MAGIC: u32 = 0x1000; + pub const FLAG_EXTMODE2: u32 = 0x20000; + pub const FLAG_PAD: u32 = 0x8000000; + pub const FLAG_INTERMEDIATE: u32 = 0x20000000; + pub const FLAG_ABRIDGED: u32 = 0x40000000; + pub const FLAG_QUICKACK: u32 = 0x80000000; + } + + pub const ME_CONNECT_TIMEOUT_SECS: u64 = 5; + pub const ME_HANDSHAKE_TIMEOUT_SECS: u64 = 10; + + #[cfg(test)] mod tests { use super::*; @@ -327,4 +319,4 @@ mod tests { assert_eq!(TG_DATACENTERS_V4.len(), 5); assert_eq!(TG_DATACENTERS_V6.len(), 5); } -} +} \ No newline at end of file diff --git a/src/proxy/client.rs b/src/proxy/client.rs index b90d447..1168a32 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -338,176 +338,221 @@ /// - CDN DCs (203+) work because ME knows their internal addresses /// - We pass raw client MTProto bytes in RPC_PROXY_REQ envelope /// - ME returns responses in RPC_PROXY_ANS envelope - async fn handle_via_middle_proxy( - mut client_reader: CryptoReader, - mut client_writer: CryptoWriter, - success: HandshakeSuccess, - me_pool: Arc, - stats: Arc, - config: Arc, - _buffer_pool: Arc, - ) -> Result<()> - where - R: AsyncRead + Unpin + Send + 'static, - W: AsyncWrite + Unpin + Send + 'static, - { - let user = success.user.clone(); - let peer = success.peer; - - info!( - user = %user, - peer = %peer, - dc = success.dc_idx, - proto = ?success.proto_tag, - mode = "middle_proxy", - "Routing via Middle-End" - ); - - // Register this client connection in ME demux registry - let (conn_id, mut me_rx) = me_pool.registry().register().await; - - // Our listening address for RPC_PROXY_REQ metadata - let our_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) - .parse().unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap()); - - stats.increment_user_connects(&user); - stats.increment_user_curr_connects(&user); - - let proto_flags = proto_flags_for_tag(Some(success.proto_tag as u32)); - let is_secure = matches!(success.proto_tag, ProtoTag::Secure); - debug!(user = %user, conn_id, proto_flags = format_args!("0x{:08x}", proto_flags), "ME relay started"); - - // Bidirectional relay loop: client ↔ ME pool - // - // C→S direction: read intermediate frame from client, send payload via RPC_PROXY_REQ - // S→C direction: receive raw payload from ME, add client intermediate framing - // - // We use tokio::select! to handle both directions concurrently. - // Unlike direct mode (copy_bidirectional on two TCP streams), - // here one side is a channel (mpsc::Receiver), not a stream. - - let mut client_closed = false; - let mut server_closed = false; - - let result: Result<()> = loop { - tokio::select! { - // C→S: client sends one intermediate frame, we forward payload to ME - read_result = async { - let mut len_buf = [0u8; 4]; - client_reader.read_exact(&mut len_buf).await?; - let raw_len = u32::from_le_bytes(len_buf); - let payload_len = (raw_len & 0x7fff_ffff) as usize; - if payload_len > MAX_MSG_LEN { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("client frame too large: {}", payload_len), - )); - } + + async fn handle_via_middle_proxy( + mut client_reader: CryptoReader, + mut client_writer: CryptoWriter, + success: HandshakeSuccess, + me_pool: Arc, + stats: Arc, + config: Arc, + _buffer_pool: Arc, + ) -> Result<()> + where + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, + { + let user = success.user.clone(); + let peer = success.peer; - let mut payload = vec![0u8; payload_len]; - client_reader.read_exact(&mut payload).await?; - Ok::, std::io::Error>(payload) - }, if !client_closed => { - match read_result { - Ok(payload) => { - trace!(conn_id, bytes = payload.len(), "C frame -> ME payload"); - stats.add_user_octets_from(&user, payload.len() as u64); - if let Err(e) = me_pool.send_proxy_req( - conn_id, peer, our_addr, &payload, proto_flags - ).await { - break Err(e); + info!( + user = %user, + peer = %peer, + dc = success.dc_idx, + proto = ?success.proto_tag, + mode = "middle_proxy", + "Routing via Middle-End" + ); + + let (conn_id, mut me_rx) = me_pool.registry().register().await; + + let our_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) + .parse().unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap()); + + stats.increment_user_connects(&user); + stats.increment_user_curr_connects(&user); + + let proto_flags = proto_flags_for_tag(success.proto_tag); + debug!(user = %user, conn_id, proto_flags = format_args!("0x{:08x}", proto_flags), "ME relay started"); + + // We need to handle framing here. + // Client sends: [Len:4][Payload...] (Intermediate/Secure) + // We must strip Len and send Payload to ME. + // ME sends: [Payload...] + // We must add [Len:4] and send to Client. + + // For Secure mode, Len has padding bit (MSB). + let is_secure = success.proto_tag == crate::protocol::constants::ProtoTag::Secure; + + let mut client_closed = false; + let mut server_closed = false; + + // Split client_reader/writer to use in select! + // CryptoReader/Writer don't support splitting easily without Arc/Mutex or unsafe, + // but here we are in a loop. + // We can't easily split them because they wrap the underlying stream. + // However, we can use a loop with select! on read and rx. + + let mut len_buf = [0u8; 4]; + let mut reading_len = true; + let mut current_payload_len = 0; + let mut payload_buf = Vec::new(); + + let result: Result<()> = loop { + tokio::select! { + // C->S: Read length, then payload + res = async { + if reading_len { + client_reader.read_exact(&mut len_buf).await.map(|_| true) + } else { + // Read payload + // We need to read exactly current_payload_len + if payload_buf.len() < current_payload_len { + let needed = current_payload_len - payload_buf.len(); + let mut chunk = vec![0u8; needed]; + let n = client_reader.read(&mut chunk).await?; + if n == 0 { return Ok(false); } // EOF + payload_buf.extend_from_slice(&chunk[..n]); + Ok(true) + } else { + Ok(true) // Should not happen + } + } + }, if !client_closed => { + match res { + Ok(true) => { + if reading_len { + // Got length + let raw_len = u32::from_le_bytes(len_buf); + // In secure mode, MSB is padding flag. In intermediate, it's just len. + // But wait, standard intermediate doesn't use MSB for padding. + // Secure mode DOES. + // Let's trust the protocol tag. + let len = if is_secure { + raw_len & 0x7FFFFFFF + } else { + raw_len + }; + + current_payload_len = len as usize; + // Sanity check + if current_payload_len > 16 * 1024 * 1024 { + debug!(conn_id, len=current_payload_len, "Client sent huge frame"); + break Err(ProxyError::Proxy("Frame too large".into())); + } + payload_buf.clear(); + payload_buf.reserve(current_payload_len); + reading_len = false; + } else { + // Got some payload data + if payload_buf.len() == current_payload_len { + // Full frame received + trace!(conn_id, bytes = current_payload_len, "C->ME (Frame complete)"); + stats.add_user_octets_from(&user, current_payload_len as u64); + + // Send to ME + // Note: In secure mode, we send the PADDING bytes too? + // Erlang mtp_intermediate: strips 4 bytes len. + // Erlang mtp_secure: strips 4 bytes len. + // The payload includes the padding if it was added? + // Actually, secure layer (mtp_secure.erl) handles padding removal? + // No, mtp_secure just sets padding=>true for intermediate codec. + // The intermediate codec (mtp_intermediate.erl) just extracts the packet. + // The packet passed to RPC is the payload. + // If secure mode adds random padding at the end, it is part of the payload + // that ME receives? + // Let's look at C code. + // ext-server.c: reads packet_len. + // if (packet_len & 0x80000000) -> has padding. + // It reads the full packet. + // Then it passes it to forward_tcp_query. + // So YES, we send the full payload including padding to ME. + + if let Err(e) = me_pool.send_proxy_req( + conn_id, peer, our_addr, &payload_buf, proto_flags + ).await { + break Err(e); + } + + // Reset for next frame + reading_len = true; } } - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { - debug!(conn_id, "Client EOF"); - client_closed = true; - if server_closed { break Ok(()); } - // Signal ME to close this connection - let _ = me_pool.send_close(conn_id).await; + } + Ok(false) => { + // EOF + debug!(conn_id, "Client EOF"); + client_closed = true; + let _ = me_pool.send_close(conn_id).await; + if server_closed { break Ok(()); } + } + Err(e) => { + debug!(conn_id, error = %e, "Client read error"); + break Err(ProxyError::Io(e)); + } + } + } + + // S->C: ME sends data, we wrap and send to client + me_msg = me_rx.recv(), if !server_closed => { + match me_msg { + Some(MeResponse::Data(data)) => { + trace!(conn_id, bytes = data.len(), "ME->C"); + stats.add_user_octets_to(&user, data.len() as u64); + + // Wrap in intermediate frame + let len = data.len() as u32; + // For secure mode, we might need to add padding? + // C code: forward_mtproto_packet -> just sends data. + // But wait, C code adds framing in net-tcp-rpc-ext-server.c? + // No, forward_tcp_query sends RPC_PROXY_REQ. + // ME sends RPC_PROXY_ANS. + // The data in ANS is the MTProto packet. + // We need to send it to client. + // If client is Intermediate/Secure, we MUST add the 4-byte length prefix. + // Secure mode: usually we don't ADD padding on response, we just send valid packets. + // But we MUST send the length. + + if let Err(e) = client_writer.write_all(&len.to_le_bytes()).await { + break Err(ProxyError::Io(e)); } - Err(e) => { - debug!(conn_id, error = %e, "Client read error"); + if let Err(e) = client_writer.write_all(&data).await { + break Err(ProxyError::Io(e)); + } + if let Err(e) = client_writer.flush().await { break Err(ProxyError::Io(e)); } } - } - - // S→C: ME sends response, we forward to client - me_msg = me_rx.recv(), if !server_closed => { - match me_msg { - Some(MeResponse::Data(data)) => { - let mut frame_len = data.len() as u32; - let mut secure_padding = [0u8; 1]; - if is_secure && data.len() % 4 == 0 { - frame_len += 1; - secure_padding[0] = 0; - } - - trace!(conn_id, bytes = data.len(), frame_len, "ME payload -> C frame"); - stats.add_user_octets_to(&user, frame_len as u64 + 4); - - if let Err(e) = client_writer.write_all(&frame_len.to_le_bytes()).await { - debug!(conn_id, error = %e, "Client write header error"); - break Err(ProxyError::Io(e)); - } - if let Err(e) = client_writer.write_all(&data).await { - debug!(conn_id, error = %e, "Client write error"); - break Err(ProxyError::Io(e)); - } - if frame_len as usize > data.len() { - if let Err(e) = client_writer.write_all(&secure_padding).await { - debug!(conn_id, error = %e, "Client write padding error"); - break Err(ProxyError::Io(e)); - } - } - if let Err(e) = client_writer.flush().await { - break Err(ProxyError::Io(e)); - } - } - Some(MeResponse::Ack(_token)) => { - // QuickACK from ME — could forward to client as obfuscated ACK - // For now, just log - trace!(conn_id, "ME ACK (ignored)"); - } - Some(MeResponse::Close) => { - debug!(conn_id, "ME sent CLOSE"); - server_closed = true; - if client_closed { break Ok(()); } - } - None => { - // Channel closed — ME connection died - debug!(conn_id, "ME channel closed"); - if client_closed { break Ok(()); } - break Err(ProxyError::Proxy("ME connection lost".into())); - } + Some(MeResponse::Ack(_)) => { + trace!(conn_id, "ME ACK"); + } + Some(MeResponse::Close) => { + debug!(conn_id, "ME sent CLOSE"); + server_closed = true; + if client_closed { break Ok(()); } + // We should probably close client connection too + break Ok(()); + } + None => { + debug!(conn_id, "ME channel closed"); + server_closed = true; + if client_closed { break Ok(()); } + break Err(ProxyError::Proxy("ME connection lost".into())); } } - - // Both sides closed - else => { - break Ok(()); - } } - }; - - // Cleanup - debug!(user = %user, conn_id, "ME relay cleanup"); - me_pool.registry().unregister(conn_id).await; - stats.decrement_user_curr_connects(&user); - - match &result { - Ok(()) => debug!(user = %user, conn_id, "ME relay completed"), - Err(e) => debug!(user = %user, conn_id, error = %e, "ME relay error"), } - - result - } - - // ===================================================================== - // Helpers - // ===================================================================== - - fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { + }; + + // Cleanup + debug!(user = %user, conn_id, "ME relay cleanup"); + me_pool.registry().unregister(conn_id).await; + stats.decrement_user_curr_connects(&user); + result + } + + + fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { if let Some(expiration) = config.access.user_expirations.get(user) { if chrono::Utc::now() > *expiration { return Err(ProxyError::UserExpired { user: user.to_string() }); @@ -613,4 +658,4 @@ )) } } - + \ No newline at end of file diff --git a/src/transport/middle_proxy.rs b/src/transport/middle_proxy.rs index 6759bb7..3b08c3c 100644 --- a/src/transport/middle_proxy.rs +++ b/src/transport/middle_proxy.rs @@ -250,7 +250,11 @@ fn ipv4_to_mapped_v6(ip: Ipv4Addr) -> [u8; 16] { fn addr_to_ip_u32(addr: &SocketAddr) -> u32 { match addr.ip() { IpAddr::V4(v4) => u32::from_be_bytes(v4.octets()), - IpAddr::V6(_) => 0, + IpAddr::V6(v6) => { + if let Some(v4) = v6.to_ipv4_mapped() { + u32::from_be_bytes(v4.octets()) + } else { 0 } + } } } @@ -328,6 +332,7 @@ impl RpcWriter { // ========== RPC_PROXY_REQ ========== + fn build_proxy_req_payload( conn_id: u64, client_addr: SocketAddr, @@ -336,10 +341,18 @@ fn build_proxy_req_payload( proxy_tag: Option<&[u8]>, proto_flags: u32, ) -> Vec { - // proto_flags are provided by caller and must already match C/Erlang transport flags. - let flags: u32 = proto_flags; + // flags are pre-calculated by proto_flags_for_tag + // We just need to ensure FLAG_HAS_AD_TAG is set if we have a tag (it is set by default in our new function, but let's be safe) + let mut flags = proto_flags; + + // The C code logic: + // flags = (transport_flags) | 0x1000 | 0x20000 | 0x8 (if tag) + // Our proto_flags_for_tag returns: 0x8 | 0x1000 | 0x20000 | transport_flags + // So we are good. + + let b_cap = 128 + data.len(); + let mut b = Vec::with_capacity(b_cap); - let mut b = Vec::with_capacity(128 + data.len()); b.extend_from_slice(&RPC_PROXY_REQ_U32.to_le_bytes()); b.extend_from_slice(&flags.to_le_bytes()); b.extend_from_slice(&conn_id.to_le_bytes()); @@ -359,7 +372,7 @@ fn build_proxy_req_payload( b.extend_from_slice(&(our_addr.port() as u32).to_le_bytes()); // Extra section (proxy_tag) - if flags & (RPC_FLAG_HAS_AD_TAG | 0x4) != 0 { + if flags & 12 != 0 { let extra_start = b.len(); b.extend_from_slice(&0u32.to_le_bytes()); // placeholder @@ -869,44 +882,20 @@ async fn reader_loop( // ========== Proto flags ========== /// Map ProtoTag to C-compatible RPC_PROXY_REQ transport flags. -/// Returned value already includes ad_tag/magic/extmode2. -pub fn proto_flags_for_tag(proto_tag: Option) -> u32 { +/// C: RPC_F_COMPACT(0x40000000)=abridged, RPC_F_MEDIUM(0x20000000)=intermediate/secure +/// The 0x1000(magic) and 0x8(proxy_tag) are added inside build_proxy_req_payload. + +pub fn proto_flags_for_tag(tag: crate::protocol::constants::ProtoTag) -> u32 { + use crate::protocol::constants::*; let mut flags = RPC_FLAG_HAS_AD_TAG | RPC_FLAG_MAGIC | RPC_FLAG_EXTMODE2; - match proto_tag { - Some(0xdddddddd) => { - // Secure - flags |= RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE; - } - Some(0xeeeeeeee) => { - // Intermediate - flags |= RPC_FLAG_INTERMEDIATE; - } - Some(0xefefefef) => { - // Abridged - flags |= RPC_FLAG_ABRIDGED; - } - _ => { - flags |= RPC_FLAG_INTERMEDIATE; - } - } - flags -} - -#[cfg(test)] -mod tests { - use super::proto_flags_for_tag; - - #[test] - fn proto_flags_secure_matches_reference() { - assert_eq!(proto_flags_for_tag(Some(0xdddddddd)), 0x28021008); - } - - #[test] - fn proto_flags_intermediate_matches_reference() { - assert_eq!(proto_flags_for_tag(Some(0xeeeeeeee)), 0x20021008); + match tag { + ProtoTag::Abridged => flags | RPC_FLAG_ABRIDGED, + ProtoTag::Intermediate => flags | RPC_FLAG_INTERMEDIATE, + ProtoTag::Secure => flags | RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE, } } + // ========== Health Monitor (Phase 4) ========== pub async fn me_health_monitor(