Merge branch 'main' into unix-socket

This commit is contained in:
Alexey
2026-02-15 02:48:24 +03:00
committed by GitHub
5 changed files with 540 additions and 3 deletions

View File

@@ -11,6 +11,7 @@ use tracing::{debug, warn};
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{HandshakeResult, ProxyError, Result};
use crate::ip_tracker::UserIpTracker;
use crate::protocol::constants::*;
use crate::protocol::tls;
use crate::stats::{ReplayChecker, Stats};
@@ -178,6 +179,7 @@ pub struct RunningClientHandler {
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
ip_tracker: Arc<UserIpTracker>,
}
impl ClientHandler {
@@ -191,6 +193,7 @@ impl ClientHandler {
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
ip_tracker: Arc<UserIpTracker>,
) -> RunningClientHandler {
RunningClientHandler {
stream,
@@ -202,6 +205,7 @@ impl ClientHandler {
buffer_pool,
rng,
me_pool,
ip_tracker,
}
}
}
@@ -346,6 +350,8 @@ impl RunningClientHandler {
self.rng,
self.me_pool,
local_addr,
peer,
self.ip_tracker,
)
.await
}
@@ -404,6 +410,8 @@ impl RunningClientHandler {
self.rng,
self.me_pool,
local_addr,
peer,
self.ip_tracker,
)
.await
}
@@ -423,6 +431,8 @@ impl RunningClientHandler {
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
local_addr: SocketAddr,
peer_addr: SocketAddr,
ip_tracker: Arc<UserIpTracker>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@@ -430,11 +440,36 @@ impl RunningClientHandler {
{
let user = &success.user;
if let Err(e) = Self::check_user_limits_static(user, &config, &stats) {
if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await {
warn!(user = %user, error = %e, "User limit exceeded");
return Err(e);
}
// IP Cleanup Guard: автоматически удаляет IP при выходе из scope
struct IpCleanupGuard {
tracker: Arc<UserIpTracker>,
user: String,
ip: std::net::IpAddr,
}
impl Drop for IpCleanupGuard {
fn drop(&mut self) {
let tracker = self.tracker.clone();
let user = self.user.clone();
let ip = self.ip;
tokio::spawn(async move {
tracker.remove_ip(&user, ip).await;
debug!(user = %user, ip = %ip, "IP cleaned up on disconnect");
});
}
}
let _cleanup = IpCleanupGuard {
tracker: ip_tracker,
user: user.clone(),
ip: peer_addr.ip(),
};
// Decide: middle proxy or direct
if config.general.use_middle_proxy {
if let Some(ref pool) = me_pool {
@@ -467,7 +502,13 @@ impl RunningClientHandler {
.await
}
fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> {
async fn check_user_limits_static(
user: &str,
config: &ProxyConfig,
stats: &Stats,
peer_addr: SocketAddr,
ip_tracker: &UserIpTracker,
) -> Result<()> {
if let Some(expiration) = config.access.user_expirations.get(user) {
if chrono::Utc::now() > *expiration {
return Err(ProxyError::UserExpired {
@@ -476,6 +517,19 @@ impl RunningClientHandler {
}
}
// IP limit check
if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await {
warn!(
user = %user,
ip = %peer_addr.ip(),
reason = %reason,
"IP limit exceeded"
);
return Err(ProxyError::ConnectionLimitExceeded {
user: user.to_string(),
});
}
if let Some(limit) = config.access.user_max_tcp_conns.get(user) {
if stats.get_user_curr_connects(user) >= *limit as u64 {
return Err(ProxyError::ConnectionLimitExceeded {