Middle Proxy läuft wie auf Schienen...
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,7 @@ use crate::error::Result;
|
||||
|
||||
use super::MePool;
|
||||
use super::secret::download_proxy_secret;
|
||||
use crate::crypto::SecureRandom;
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ProxyConfigData {
|
||||
@@ -49,8 +50,10 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
|
||||
Ok(ProxyConfigData { map, default_dc })
|
||||
}
|
||||
|
||||
pub async fn me_config_updater(pool: Arc<MePool>, interval: Duration) {
|
||||
pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interval: Duration) {
|
||||
let mut tick = tokio::time::interval(interval);
|
||||
// skip immediate tick to avoid double-fetch right after startup
|
||||
tick.tick().await;
|
||||
loop {
|
||||
tick.tick().await;
|
||||
|
||||
@@ -61,7 +64,8 @@ pub async fn me_config_updater(pool: Arc<MePool>, interval: Duration) {
|
||||
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
if changed {
|
||||
info!("ME config updated (v4)");
|
||||
info!("ME config updated (v4), reconciling connections");
|
||||
pool.reconcile_connections(&rng).await;
|
||||
} else {
|
||||
debug!("ME config v4 unchanged");
|
||||
}
|
||||
|
||||
@@ -3,32 +3,42 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
use crate::crypto::SecureRandom;
|
||||
|
||||
use super::MePool;
|
||||
|
||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, min_connections: usize) {
|
||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
let current = pool.connection_count();
|
||||
if current < min_connections {
|
||||
warn!(
|
||||
current,
|
||||
min = min_connections,
|
||||
"ME pool below minimum, reconnecting..."
|
||||
);
|
||||
let map = pool.proxy_map_v4.read().await.clone();
|
||||
for (_dc, addrs) in map.iter() {
|
||||
for &(ip, port) in addrs {
|
||||
let needed = min_connections.saturating_sub(pool.connection_count());
|
||||
if needed == 0 {
|
||||
break;
|
||||
}
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
// Per-DC coverage check
|
||||
let map = pool.proxy_map_v4.read().await.clone();
|
||||
let writer_addrs: std::collections::HashSet<SocketAddr> = pool
|
||||
.writers
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(a, _)| *a)
|
||||
.collect();
|
||||
|
||||
for (dc, addrs) in map.iter() {
|
||||
let dc_addrs: Vec<SocketAddr> = addrs
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect();
|
||||
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
|
||||
if !has_coverage {
|
||||
warn!(dc = %dc, "DC has no ME coverage, reconnecting...");
|
||||
let mut shuffled = dc_addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
for addr in shuffled {
|
||||
match pool.connect_one(addr, &rng).await {
|
||||
Ok(()) => info!(%addr, "ME reconnected"),
|
||||
Err(e) => debug!(%addr, error = %e, "ME reconnect failed"),
|
||||
Ok(()) => {
|
||||
info!(%addr, dc = %dc, "ME reconnected for DC coverage");
|
||||
break;
|
||||
}
|
||||
Err(e) => debug!(%addr, dc = %dc, error = %e, "ME reconnect failed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::time::Duration;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use rand::Rng;
|
||||
use rand::seq::SliceRandom;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
@@ -89,6 +90,30 @@ impl MePool {
|
||||
self.writers.clone()
|
||||
}
|
||||
|
||||
pub async fn reconcile_connections(&self, rng: &SecureRandom) {
|
||||
use std::collections::HashSet;
|
||||
let map = self.proxy_map_v4.read().await.clone();
|
||||
let writers = self.writers.read().await;
|
||||
let current: HashSet<SocketAddr> = writers.iter().map(|(a, _)| *a).collect();
|
||||
drop(writers);
|
||||
|
||||
for (_dc, addrs) in map.iter() {
|
||||
let dc_addrs: Vec<SocketAddr> = addrs
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect();
|
||||
if !dc_addrs.iter().any(|a| current.contains(a)) {
|
||||
let mut shuffled = dc_addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
for addr in shuffled {
|
||||
if self.connect_one(addr, rng).await.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_proxy_maps(
|
||||
&self,
|
||||
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
@@ -127,8 +152,9 @@ impl MePool {
|
||||
}
|
||||
|
||||
pub async fn reconnect_all(&self) {
|
||||
let mut ws = self.writers.write().await;
|
||||
ws.clear();
|
||||
// Graceful: do not drop all at once. New connections will use updated secret.
|
||||
// Existing writers remain until health monitor replaces them.
|
||||
// No-op here to avoid total outage.
|
||||
}
|
||||
|
||||
async fn key_selector(&self) -> u32 {
|
||||
@@ -151,30 +177,39 @@ impl MePool {
|
||||
"Initializing ME pool"
|
||||
);
|
||||
|
||||
// Ensure at least one connection per DC
|
||||
// Ensure at least one connection per DC with failover over all addresses
|
||||
for (dc, addrs) in map.iter() {
|
||||
if addrs.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// round-robin first address
|
||||
let &(ip, port) = addrs.get(0).unwrap();
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
match self.connect_one(addr, rng).await {
|
||||
Ok(()) => info!(%addr, dc = %dc, "ME connected"),
|
||||
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed"),
|
||||
let mut connected = false;
|
||||
let mut shuffled = addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
for (ip, port) in shuffled {
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
match self.connect_one(addr, rng).await {
|
||||
Ok(()) => {
|
||||
info!(%addr, dc = %dc, "ME connected");
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
Err(e) => warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next"),
|
||||
}
|
||||
}
|
||||
if !connected {
|
||||
warn!(dc = %dc, "All ME servers for DC failed at init");
|
||||
}
|
||||
}
|
||||
|
||||
// Additional connections up to pool_size total
|
||||
// Additional connections up to pool_size total (round-robin across DCs)
|
||||
for (dc, addrs) in map.iter() {
|
||||
for (i, (ip, port)) in addrs.iter().enumerate().skip(1) {
|
||||
for (ip, port) in addrs {
|
||||
if self.connection_count() >= pool_size {
|
||||
break;
|
||||
}
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
match self.connect_one(addr, rng).await {
|
||||
Ok(()) => info!(%addr, dc = %dc, idx = i, "ME connected"),
|
||||
Err(e) => warn!(%addr, dc = %dc, idx = i, error = %e, "ME connect failed"),
|
||||
if let Err(e) = self.connect_one(addr, rng).await {
|
||||
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed");
|
||||
}
|
||||
}
|
||||
if self.connection_count() >= pool_size {
|
||||
@@ -189,7 +224,7 @@ impl MePool {
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_one(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
rng: &SecureRandom,
|
||||
) -> Result<()> {
|
||||
|
||||
@@ -145,7 +145,7 @@ async fn fetch_stun_binding(stun_addr: &str) -> Result<Option<std::net::SocketAd
|
||||
req[0..2].copy_from_slice(&0x0001u16.to_be_bytes()); // Binding Request
|
||||
req[2..4].copy_from_slice(&0u16.to_be_bytes()); // length
|
||||
req[4..8].copy_from_slice(&0x2112A442u32.to_be_bytes()); // magic cookie
|
||||
rand::thread_rng().fill_bytes(&mut req[8..20]);
|
||||
rand::rng().fill_bytes(&mut req[8..20]);
|
||||
|
||||
socket
|
||||
.send(&req)
|
||||
|
||||
@@ -4,9 +4,13 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
|
||||
use super::MeResponse;
|
||||
use super::codec::RpcWriter;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct ConnRegistry {
|
||||
map: RwLock<HashMap<u64, mpsc::Sender<MeResponse>>>,
|
||||
writers: RwLock<HashMap<u64, Arc<Mutex<RpcWriter>>>>,
|
||||
next_id: AtomicU64,
|
||||
}
|
||||
|
||||
@@ -16,6 +20,7 @@ impl ConnRegistry {
|
||||
let start = rand::random::<u64>() | 1;
|
||||
Self {
|
||||
map: RwLock::new(HashMap::new()),
|
||||
writers: RwLock::new(HashMap::new()),
|
||||
next_id: AtomicU64::new(start),
|
||||
}
|
||||
}
|
||||
@@ -29,6 +34,7 @@ impl ConnRegistry {
|
||||
|
||||
pub async fn unregister(&self, id: u64) {
|
||||
self.map.write().await.remove(&id);
|
||||
self.writers.write().await.remove(&id);
|
||||
}
|
||||
|
||||
pub async fn route(&self, id: u64, resp: MeResponse) -> bool {
|
||||
@@ -39,4 +45,14 @@ impl ConnRegistry {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_writer(&self, id: u64, w: Arc<Mutex<RpcWriter>>) {
|
||||
let mut guard = self.writers.write().await;
|
||||
guard.entry(id).or_insert_with(|| w);
|
||||
}
|
||||
|
||||
pub async fn get_writer(&self, id: u64) -> Option<Arc<Mutex<RpcWriter>>> {
|
||||
let guard = self.writers.read().await;
|
||||
guard.get(&id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,11 +6,13 @@ use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::{RPC_CLOSE_EXT_U32};
|
||||
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
|
||||
|
||||
use super::MePool;
|
||||
use super::codec::RpcWriter;
|
||||
use super::wire::build_proxy_req_payload;
|
||||
use crate::crypto::SecureRandom;
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
impl MePool {
|
||||
pub async fn send_proxy_req(
|
||||
@@ -39,9 +41,28 @@ impl MePool {
|
||||
let writers: Vec<(SocketAddr, Arc<Mutex<RpcWriter>>)> = ws.iter().cloned().collect();
|
||||
drop(ws);
|
||||
|
||||
let candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await;
|
||||
let mut candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await;
|
||||
if candidate_indices.is_empty() {
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
// Emergency: try to connect to target DC addresses on the fly, then recompute writers
|
||||
let map = self.proxy_map_v4.read().await;
|
||||
if let Some(addrs) = map.get(&(target_dc as i32)) {
|
||||
let mut shuffled = addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
drop(map);
|
||||
for (ip, port) in shuffled {
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
if self.connect_one(addr, &SecureRandom::new()).await.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let ws2 = self.writers.read().await;
|
||||
let writers: Vec<(SocketAddr, Arc<Mutex<RpcWriter>>)> = ws2.iter().cloned().collect();
|
||||
drop(ws2);
|
||||
candidate_indices = self.candidate_indices_for_dc(&writers, target_dc).await;
|
||||
}
|
||||
if candidate_indices.is_empty() {
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
}
|
||||
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
|
||||
|
||||
@@ -85,10 +106,7 @@ impl MePool {
|
||||
}
|
||||
|
||||
pub async fn send_close(&self, conn_id: u64) -> Result<()> {
|
||||
let ws = self.writers.read().await;
|
||||
if !ws.is_empty() {
|
||||
let w = ws[0].1.clone();
|
||||
drop(ws);
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||
@@ -97,6 +115,8 @@ impl MePool {
|
||||
let mut ws = self.writers.write().await;
|
||||
ws.retain(|(_, o)| !Arc::ptr_eq(o, &w));
|
||||
}
|
||||
} else {
|
||||
debug!(conn_id, "ME close skipped (writer missing)");
|
||||
}
|
||||
|
||||
self.registry.unregister(conn_id).await;
|
||||
|
||||
@@ -58,7 +58,7 @@ pub(crate) fn build_proxy_req_payload(
|
||||
append_mapped_addr_and_port(&mut b, client_addr);
|
||||
append_mapped_addr_and_port(&mut b, our_addr);
|
||||
|
||||
if proto_flags & 12 != 0 {
|
||||
if proto_flags & RPC_FLAG_HAS_AD_TAG != 0 {
|
||||
let extra_start = b.len();
|
||||
b.extend_from_slice(&0u32.to_le_bytes());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user