diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 726d238..7271e8c 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -11,6 +11,7 @@ use tracing::{debug, warn}; use crate::config::ProxyConfig; use crate::crypto::SecureRandom; use crate::error::{HandshakeResult, ProxyError, Result}; +use crate::ip_tracker::UserIpTracker; use crate::protocol::constants::*; use crate::protocol::tls; use crate::stats::{ReplayChecker, Stats}; @@ -35,6 +36,7 @@ pub struct RunningClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + ip_tracker: Arc, } impl ClientHandler { @@ -48,6 +50,7 @@ impl ClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + ip_tracker: Arc, ) -> RunningClientHandler { RunningClientHandler { stream, @@ -59,6 +62,7 @@ impl ClientHandler { buffer_pool, rng, me_pool, + ip_tracker, } } } @@ -203,6 +207,8 @@ impl RunningClientHandler { self.rng, self.me_pool, local_addr, + peer, + self.ip_tracker, ) .await } @@ -261,6 +267,8 @@ impl RunningClientHandler { self.rng, self.me_pool, local_addr, + peer, + self.ip_tracker, ) .await } @@ -280,6 +288,8 @@ impl RunningClientHandler { rng: Arc, me_pool: Option>, local_addr: SocketAddr, + peer_addr: SocketAddr, + ip_tracker: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -287,11 +297,36 @@ impl RunningClientHandler { { let user = &success.user; - if let Err(e) = Self::check_user_limits_static(user, &config, &stats) { + if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await { warn!(user = %user, error = %e, "User limit exceeded"); return Err(e); } + // IP Cleanup Guard: автоматически удаляет IP при выходе из scope + struct IpCleanupGuard { + tracker: Arc, + user: String, + ip: std::net::IpAddr, + } + + impl Drop for IpCleanupGuard { + fn drop(&mut self) { + let tracker = self.tracker.clone(); + let user = self.user.clone(); + let ip = self.ip; + tokio::spawn(async move { + tracker.remove_ip(&user, ip).await; + debug!(user = %user, ip = %ip, "IP cleaned up on disconnect"); + }); + } + } + + let _cleanup = IpCleanupGuard { + tracker: ip_tracker, + user: user.clone(), + ip: peer_addr.ip(), + }; + // Decide: middle proxy or direct if config.general.use_middle_proxy { if let Some(ref pool) = me_pool { @@ -324,7 +359,13 @@ impl RunningClientHandler { .await } - fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { + async fn check_user_limits_static( + user: &str, + config: &ProxyConfig, + stats: &Stats, + peer_addr: SocketAddr, + ip_tracker: &UserIpTracker, + ) -> Result<()> { if let Some(expiration) = config.access.user_expirations.get(user) { if chrono::Utc::now() > *expiration { return Err(ProxyError::UserExpired { @@ -333,6 +374,19 @@ impl RunningClientHandler { } } + // IP limit check + if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await { + warn!( + user = %user, + ip = %peer_addr.ip(), + reason = %reason, + "IP limit exceeded" + ); + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); + } + if let Some(limit) = config.access.user_max_tcp_conns.get(user) { if stats.get_user_curr_connects(user) >= *limit as u64 { return Err(ProxyError::ConnectionLimitExceeded {