Middle Proxy Minimal

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-15 13:14:50 +03:00
parent 3d2e996cea
commit 94a7058cc6
10 changed files with 319 additions and 84 deletions

View File

@@ -0,0 +1,87 @@
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use regex::Regex;
use tracing::{debug, info, warn};
use crate::error::Result;
use super::MePool;
use super::secret::download_proxy_secret;
#[derive(Debug, Clone, Default)]
pub struct ProxyConfigData {
pub map: HashMap<i32, Vec<(IpAddr, u16)>>,
pub default_dc: Option<i32>,
}
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
let text = reqwest::get(url)
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let re_proxy = Regex::new(r"proxy_for\s+(-?\d+)\s+([^\s:]+):(\d+)\s*;").unwrap();
let re_default = Regex::new(r"default\s+(-?\d+)\s*;").unwrap();
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
for cap in re_proxy.captures_iter(&text) {
if let (Some(dc), Some(host), Some(port)) = (cap.get(1), cap.get(2), cap.get(3)) {
if let Ok(dc_idx) = dc.as_str().parse::<i32>() {
if let Ok(ip) = host.as_str().parse::<IpAddr>() {
if let Ok(port_num) = port.as_str().parse::<u16>() {
map.entry(dc_idx).or_default().push((ip, port_num));
}
}
}
}
}
let default_dc = re_default
.captures(&text)
.and_then(|c| c.get(1))
.and_then(|m| m.as_str().parse::<i32>().ok());
Ok(ProxyConfigData { map, default_dc })
}
pub async fn me_config_updater(pool: Arc<MePool>, interval: Duration) {
let mut tick = tokio::time::interval(interval);
loop {
tick.tick().await;
// Update proxy config v4
if let Ok(cfg) = fetch_proxy_config("https://core.telegram.org/getProxyConfig").await {
let changed = pool.update_proxy_maps(cfg.map.clone(), None).await;
if let Some(dc) = cfg.default_dc {
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed);
}
if changed {
info!("ME config updated (v4)");
} else {
debug!("ME config v4 unchanged");
}
} else {
warn!("getProxyConfig update failed");
}
// Update proxy config v6 (optional)
if let Ok(cfg_v6) = fetch_proxy_config("https://core.telegram.org/getProxyConfigV6").await {
let _ = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
}
// Update proxy-secret
match download_proxy_secret().await {
Ok(secret) => {
if pool.update_secret(secret).await {
info!("proxy-secret updated and pool reconnect scheduled");
}
}
Err(e) => warn!(error = %e, "proxy-secret update failed"),
}
}
}

View File

@@ -5,7 +5,6 @@ use std::time::Duration;
use tracing::{debug, info, warn};
use crate::crypto::SecureRandom;
use crate::protocol::constants::TG_MIDDLE_PROXIES_FLAT_V4;
use super::MePool;
@@ -19,13 +18,13 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, min_co
min = min_connections,
"ME pool below minimum, reconnecting..."
);
let addrs = TG_MIDDLE_PROXIES_FLAT_V4.clone();
for &(ip, port) in addrs.iter() {
let needed = min_connections.saturating_sub(pool.connection_count());
if needed == 0 {
break;
}
for _ in 0..needed {
let map = pool.proxy_map_v4.read().await.clone();
for (_dc, addrs) in map.iter() {
for &(ip, port) in addrs {
let needed = min_connections.saturating_sub(pool.connection_count());
if needed == 0 {
break;
}
let addr = SocketAddr::new(ip, port);
match pool.connect_one(addr, &rng).await {
Ok(()) => info!(%addr, "ME reconnected"),

View File

@@ -8,6 +8,7 @@ mod reader;
mod registry;
mod send;
mod secret;
mod config_updater;
mod wire;
use bytes::Bytes;
@@ -16,6 +17,7 @@ pub use health::me_health_monitor;
pub use pool::MePool;
pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret;
pub use config_updater::{fetch_proxy_config, me_config_updater};
pub use wire::proto_flags_for_tag;
#[derive(Debug)]

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicI32, AtomicU64};
use std::time::Duration;
use bytes::BytesMut;
@@ -32,11 +32,14 @@ pub struct MePool {
pub(super) writers: Arc<RwLock<Vec<(SocketAddr, Arc<Mutex<RpcWriter>>)>>> ,
pub(super) rr: AtomicU64,
pub(super) proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>,
pub(super) proxy_secret: Arc<RwLock<Vec<u8>>>,
pub(super) nat_ip_cfg: Option<IpAddr>,
pub(super) nat_ip_detected: OnceLock<IpAddr>,
pub(super) nat_ip_detected: Arc<RwLock<Option<IpAddr>>>,
pub(super) nat_probe: bool,
pub(super) nat_stun: Option<String>,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
pool_size: usize,
}
@@ -47,18 +50,24 @@ impl MePool {
nat_ip: Option<IpAddr>,
nat_probe: bool,
nat_stun: Option<String>,
proxy_map_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
proxy_map_v6: HashMap<i32, Vec<(IpAddr, u16)>>,
default_dc: Option<i32>,
) -> Arc<Self> {
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
writers: Arc::new(RwLock::new(Vec::new())),
rr: AtomicU64::new(0),
proxy_tag,
proxy_secret,
proxy_secret: Arc::new(RwLock::new(proxy_secret)),
nat_ip_cfg: nat_ip,
nat_ip_detected: OnceLock::new(),
nat_ip_detected: Arc::new(RwLock::new(None)),
nat_probe,
nat_stun,
pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
default_dc: AtomicI32::new(default_dc.unwrap_or(0)),
})
}
@@ -80,39 +89,95 @@ impl MePool {
self.writers.clone()
}
fn key_selector(&self) -> u32 {
if self.proxy_secret.len() >= 4 {
u32::from_le_bytes([
self.proxy_secret[0],
self.proxy_secret[1],
self.proxy_secret[2],
self.proxy_secret[3],
])
pub async fn update_proxy_maps(
&self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
new_v6: Option<HashMap<i32, Vec<(IpAddr, u16)>>>,
) -> bool {
let mut changed = false;
{
let mut guard = self.proxy_map_v4.write().await;
if !new_v4.is_empty() && *guard != new_v4 {
*guard = new_v4;
changed = true;
}
}
if let Some(v6) = new_v6 {
let mut guard = self.proxy_map_v6.write().await;
if !v6.is_empty() && *guard != v6 {
*guard = v6;
}
}
changed
}
pub async fn update_secret(&self, new_secret: Vec<u8>) -> bool {
if new_secret.len() < 32 {
warn!(len = new_secret.len(), "proxy-secret update ignored (too short)");
return false;
}
let mut guard = self.proxy_secret.write().await;
if *guard != new_secret {
*guard = new_secret;
drop(guard);
self.reconnect_all().await;
return true;
}
false
}
pub async fn reconnect_all(&self) {
let mut ws = self.writers.write().await;
ws.clear();
}
async fn key_selector(&self) -> u32 {
let secret = self.proxy_secret.read().await;
if secret.len() >= 4 {
u32::from_le_bytes([secret[0], secret[1], secret[2], secret[3]])
} else {
0
}
}
pub async fn init(self: &Arc<Self>, pool_size: usize, rng: &SecureRandom) -> Result<()> {
let addrs = &*TG_MIDDLE_PROXIES_FLAT_V4;
let ks = self.key_selector();
let map = self.proxy_map_v4.read().await;
let ks = self.key_selector().await;
info!(
me_servers = addrs.len(),
me_servers = map.len(),
pool_size,
key_selector = format_args!("0x{ks:08x}"),
secret_len = self.proxy_secret.len(),
secret_len = self.proxy_secret.read().await.len(),
"Initializing ME pool"
);
for &(ip, port) in addrs.iter() {
for i in 0..pool_size {
let addr = SocketAddr::new(ip, port);
// Ensure at least one connection per DC
for (dc, addrs) in map.iter() {
if addrs.is_empty() {
continue;
}
// round-robin first address
let &(ip, port) = addrs.get(0).unwrap();
let addr = SocketAddr::new(ip, port);
match self.connect_one(addr, rng).await {
Ok(()) => info!(%addr, dc = %dc, "ME connected"),
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed"),
}
}
// Additional connections up to pool_size total
for (dc, addrs) in map.iter() {
for (i, (ip, port)) in addrs.iter().enumerate().skip(1) {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
match self.connect_one(addr, rng).await {
Ok(()) => info!(%addr, idx = i, "ME connected"),
Err(e) => warn!(%addr, idx = i, error = %e, "ME connect failed"),
Ok(()) => info!(%addr, dc = %dc, idx = i, "ME connected"),
Err(e) => warn!(%addr, dc = %dc, idx = i, error = %e, "ME connect failed"),
}
}
if self.writers.read().await.len() >= pool_size {
if self.connection_count() >= pool_size {
break;
}
}
@@ -128,7 +193,8 @@ impl MePool {
addr: SocketAddr,
rng: &SecureRandom,
) -> Result<()> {
let secret = &self.proxy_secret;
let secret_guard = self.proxy_secret.read().await;
let secret: Vec<u8> = secret_guard.clone();
if secret.len() < 32 {
return Err(ProxyError::Proxy(
"proxy-secret too short for ME auth".into(),
@@ -165,7 +231,7 @@ impl MePool {
.unwrap_or_default()
.as_secs() as u32;
let ks = self.key_selector();
let ks = self.key_selector().await;
let nonce_payload = build_nonce_payload(ks, crypto_ts, &my_nonce);
let nonce_frame = build_rpc_frame(-2, &nonce_payload);
let dump = hex_dump(&nonce_frame[..nonce_frame.len().min(44)]);
@@ -266,7 +332,7 @@ impl MePool {
b"CLIENT",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
secret,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
@@ -279,7 +345,7 @@ impl MePool {
b"SERVER",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
secret,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
@@ -293,7 +359,7 @@ impl MePool {
b"CLIENT",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
secret,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
@@ -306,7 +372,7 @@ impl MePool {
b"SERVER",
clt_ip_opt.as_ref().map(|x| &x[..]),
&server_port_bytes,
secret,
&secret,
clt_v6_opt.as_ref(),
srv_v6_opt.as_ref(),
);
@@ -330,7 +396,7 @@ impl MePool {
prekey_sha256_client = %hex_dump(&sha256(&prekey_client)),
prekey_sha256_server = %hex_dump(&sha256(&prekey_server)),
hs_plain = %hex_dump(&hs_frame),
proxy_secret_sha256 = %hex_dump(&sha256(secret)),
proxy_secret_sha256 = %hex_dump(&sha256(&secret)),
"ME diag: derived keys and handshake plaintext"
);
}

View File

@@ -10,7 +10,7 @@ impl MePool {
pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr {
let nat_ip = self
.nat_ip_cfg
.or_else(|| self.nat_ip_detected.get().copied());
.or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| (*g).clone()));
let Some(nat_ip) = nat_ip else {
return ip;
@@ -60,13 +60,16 @@ impl MePool {
return None;
}
if let Some(ip) = self.nat_ip_detected.get().copied() {
if let Some(ip) = self.nat_ip_detected.read().await.clone() {
return Some(ip);
}
match fetch_public_ipv4().await {
match fetch_public_ipv4_with_retry().await {
Ok(Some(ip)) => {
let _ = self.nat_ip_detected.set(IpAddr::V4(ip));
{
let mut guard = self.nat_ip_detected.write().await;
*guard = Some(IpAddr::V4(ip));
}
info!(public_ip = %ip, "Auto-detected public IP for NAT translation");
Some(IpAddr::V4(ip))
}
@@ -98,8 +101,22 @@ impl MePool {
}
}
async fn fetch_public_ipv4() -> Result<Option<Ipv4Addr>> {
let res = reqwest::get("https://checkip.amazonaws.com").await.map_err(|e| {
async fn fetch_public_ipv4_with_retry() -> Result<Option<Ipv4Addr>> {
let providers = [
"https://checkip.amazonaws.com",
"http://v4.ident.me",
"http://ipv4.icanhazip.com",
];
for url in providers {
if let Ok(Some(ip)) = fetch_public_ipv4_once(url).await {
return Ok(Some(ip));
}
}
Ok(None)
}
async fn fetch_public_ipv4_once(url: &str) -> Result<Option<Ipv4Addr>> {
let res = reqwest::get(url).await.map_err(|e| {
ProxyError::Proxy(format!("public IP detection request failed: {e}"))
})?;

View File

@@ -51,7 +51,7 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result<Vec<u8>> {
}
}
async fn download_proxy_secret() -> Result<Vec<u8>> {
pub async fn download_proxy_secret() -> Result<Vec<u8>> {
let resp = reqwest::get("https://core.telegram.org/getProxySecret")
.await
.map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?;

View File

@@ -6,7 +6,7 @@ use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, TG_MIDDLE_PROXIES_V4};
use crate::protocol::constants::{RPC_CLOSE_EXT_U32};
use super::MePool;
use super::codec::RpcWriter;
@@ -39,7 +39,7 @@ impl MePool {
let writers: Vec<(SocketAddr, Arc<Mutex<RpcWriter>>)> = ws.iter().cloned().collect();
drop(ws);
let candidate_indices = candidate_indices_for_dc(&writers, target_dc);
let candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await;
if candidate_indices.is_empty() {
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
}
@@ -106,41 +106,54 @@ impl MePool {
pub fn connection_count(&self) -> usize {
self.writers.try_read().map(|w| w.len()).unwrap_or(0)
}
}
pub(super) async fn candidate_indices_for_dc(
&self,
writers: &[(SocketAddr, Arc<Mutex<RpcWriter>>)],
target_dc: i16,
) -> Vec<usize> {
let mut preferred = Vec::<SocketAddr>::new();
let key = target_dc as i32;
let map = self.proxy_map_v4.read().await;
fn candidate_indices_for_dc(
writers: &[(SocketAddr, Arc<Mutex<RpcWriter>>)],
target_dc: i16,
) -> Vec<usize> {
let mut preferred = Vec::<SocketAddr>::new();
let key = target_dc as i32;
if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&key) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&abs) {
if let Some(v) = map.get(&key) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = TG_MIDDLE_PROXIES_V4.get(&-abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = map.get(&abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
}
if preferred.is_empty() {
return (0..writers.len()).collect();
if preferred.is_empty() {
let abs = key.abs();
if let Some(v) = map.get(&-abs) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
if preferred.is_empty() {
let def = self.default_dc.load(Ordering::Relaxed);
if def != 0 {
if let Some(v) = map.get(&def) {
preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
}
}
if preferred.is_empty() {
return (0..writers.len()).collect();
}
let mut out = Vec::new();
for (idx, (addr, _)) in writers.iter().enumerate() {
if preferred.iter().any(|p| p == addr) {
out.push(idx);
}
}
if out.is_empty() {
return (0..writers.len()).collect();
}
out
}
let mut out = Vec::new();
for (idx, (addr, _)) in writers.iter().enumerate() {
if preferred.iter().any(|p| p == addr) {
out.push(idx);
}
}
if out.is_empty() {
return (0..writers.len()).collect();
}
out
}

View File

@@ -28,9 +28,7 @@ fn ipv4_to_mapped_v6_c_compat(ip: Ipv4Addr) -> [u8; 16] {
buf[8..12].copy_from_slice(&(-0x10000i32).to_le_bytes());
// Matches tl_store_int(htonl(remote_ip_host_order)).
let host_order = u32::from_ne_bytes(ip.octets());
let network_order = host_order.to_be();
buf[12..16].copy_from_slice(&network_order.to_le_bytes());
buf[12..16].copy_from_slice(&ip.octets());
buf
}
@@ -104,3 +102,17 @@ pub fn proto_flags_for_tag(tag: crate::protocol::constants::ProtoTag, has_proxy_
ProtoTag::Secure => flags | RPC_FLAG_PAD | RPC_FLAG_INTERMEDIATE,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ipv4_mapped_encoding() {
let ip = Ipv4Addr::new(149, 154, 175, 50);
let buf = ipv4_to_mapped_v6_c_compat(ip);
assert_eq!(&buf[0..10], &[0u8; 10]);
assert_eq!(&buf[10..12], &[0xff, 0xff]);
assert_eq!(&buf[12..16], &[149, 154, 175, 50]);
}
}