diff --git a/config.toml b/config.toml index 0c840c3..e876222 100644 --- a/config.toml +++ b/config.toml @@ -68,6 +68,9 @@ hello = "00000000000000000000000000000000" # [access.user_max_tcp_conns] # hello = 50 +# [access.user_max_unique_ips] +# hello = 5 + # [access.user_data_quota] # hello = 1073741824 # 1 GB @@ -84,4 +87,4 @@ weight = 10 # weight = 1 [dc_overrides] -"203" = "91.105.192.100:443" \ No newline at end of file +"203" = "91.105.192.100:443" diff --git a/src/config/mod.rs b/src/config/mod.rs index 7ecda14..a5ad659 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -319,6 +319,9 @@ pub struct AccessConfig { #[serde(default)] pub user_data_quota: HashMap, + #[serde(default)] + pub user_max_unique_ips: HashMap, + #[serde(default = "default_replay_check_len")] pub replay_check_len: usize, @@ -341,6 +344,7 @@ impl Default for AccessConfig { user_max_tcp_conns: HashMap::new(), user_expirations: HashMap::new(), user_data_quota: HashMap::new(), + user_max_unique_ips: HashMap::new(), replay_check_len: default_replay_check_len(), replay_window_secs: default_replay_window_secs(), ignore_time_skew: false, diff --git a/src/ip_tracker.rs b/src/ip_tracker.rs new file mode 100644 index 0000000..17cebc7 --- /dev/null +++ b/src/ip_tracker.rs @@ -0,0 +1,462 @@ +// src/ip_tracker.rs +// Модуль для отслеживания и ограничения уникальных IP-адресов пользователей + +use std::collections::{HashMap, HashSet}; +use std::net::IpAddr; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Трекер уникальных IP-адресов для каждого пользователя MTProxy +/// +/// Предоставляет thread-safe механизм для: +/// - Отслеживания активных IP-адресов каждого пользователя +/// - Ограничения количества уникальных IP на пользователя +/// - Автоматической очистки при отключении клиентов +#[derive(Debug, Clone)] +pub struct UserIpTracker { + /// Маппинг: Имя пользователя -> Множество активных IP-адресов + active_ips: Arc>>>, + + /// Маппинг: Имя пользователя -> Максимально разрешенное количество уникальных IP + max_ips: Arc>>, +} + +impl UserIpTracker { + /// Создать новый пустой трекер + pub fn new() -> Self { + Self { + active_ips: Arc::new(RwLock::new(HashMap::new())), + max_ips: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Установить лимит уникальных IP для конкретного пользователя + /// + /// # Arguments + /// * `username` - Имя пользователя + /// * `max_ips` - Максимальное количество одновременно активных IP-адресов + pub async fn set_user_limit(&self, username: &str, max_ips: usize) { + let mut limits = self.max_ips.write().await; + limits.insert(username.to_string(), max_ips); + } + + /// Загрузить лимиты из конфигурации + /// + /// # Arguments + /// * `limits` - HashMap с лимитами из config.toml + pub async fn load_limits(&self, limits: &HashMap) { + let mut max_ips = self.max_ips.write().await; + for (user, limit) in limits { + max_ips.insert(user.clone(), *limit); + } + } + + /// Проверить, может ли пользователь подключиться с данного IP-адреса + /// и добавить IP в список активных, если проверка успешна + /// + /// # Arguments + /// * `username` - Имя пользователя + /// * `ip` - IP-адрес клиента + /// + /// # Returns + /// * `Ok(())` - Подключение разрешено, IP добавлен в активные + /// * `Err(String)` - Подключение отклонено с описанием причины + pub async fn check_and_add(&self, username: &str, ip: IpAddr) -> Result<(), String> { + // Получаем лимит для пользователя + let max_ips = self.max_ips.read().await; + let limit = match max_ips.get(username) { + Some(limit) => *limit, + None => { + // Если лимит не задан - разрешаем безлимитный доступ + drop(max_ips); + let mut active_ips = self.active_ips.write().await; + let user_ips = active_ips + .entry(username.to_string()) + .or_insert_with(HashSet::new); + user_ips.insert(ip); + return Ok(()); + } + }; + drop(max_ips); + + // Проверяем и обновляем активные IP + let mut active_ips = self.active_ips.write().await; + let user_ips = active_ips + .entry(username.to_string()) + .or_insert_with(HashSet::new); + + // Если IP уже есть в списке - это повторное подключение, разрешаем + if user_ips.contains(&ip) { + return Ok(()); + } + + // Проверяем, не превышен ли лимит + if user_ips.len() >= limit { + return Err(format!( + "IP limit reached for user '{}': {}/{} unique IPs already connected", + username, + user_ips.len(), + limit + )); + } + + // Лимит не превышен - добавляем новый IP + user_ips.insert(ip); + Ok(()) + } + + /// Удалить IP-адрес из списка активных при отключении клиента + /// + /// # Arguments + /// * `username` - Имя пользователя + /// * `ip` - IP-адрес отключившегося клиента + pub async fn remove_ip(&self, username: &str, ip: IpAddr) { + let mut active_ips = self.active_ips.write().await; + + if let Some(user_ips) = active_ips.get_mut(username) { + user_ips.remove(&ip); + + // Если у пользователя не осталось активных IP - удаляем запись + // для экономии памяти + if user_ips.is_empty() { + active_ips.remove(username); + } + } + } + + /// Получить текущее количество активных IP-адресов для пользователя + /// + /// # Arguments + /// * `username` - Имя пользователя + /// + /// # Returns + /// Количество уникальных активных IP-адресов + pub async fn get_active_ip_count(&self, username: &str) -> usize { + let active_ips = self.active_ips.read().await; + active_ips + .get(username) + .map(|ips| ips.len()) + .unwrap_or(0) + } + + /// Получить список всех активных IP-адресов для пользователя + /// + /// # Arguments + /// * `username` - Имя пользователя + /// + /// # Returns + /// Вектор с активными IP-адресами + pub async fn get_active_ips(&self, username: &str) -> Vec { + let active_ips = self.active_ips.read().await; + active_ips + .get(username) + .map(|ips| ips.iter().copied().collect()) + .unwrap_or_else(Vec::new) + } + + /// Получить статистику по всем пользователям + /// + /// # Returns + /// Вектор кортежей: (имя_пользователя, количество_активных_IP, лимит) + pub async fn get_stats(&self) -> Vec<(String, usize, usize)> { + let active_ips = self.active_ips.read().await; + let max_ips = self.max_ips.read().await; + + let mut stats = Vec::new(); + + // Собираем статистику по пользователям с активными подключениями + for (username, user_ips) in active_ips.iter() { + let limit = max_ips.get(username).copied().unwrap_or(0); + stats.push((username.clone(), user_ips.len(), limit)); + } + + stats.sort_by(|a, b| a.0.cmp(&b.0)); // Сортируем по имени пользователя + stats + } + + /// Очистить все активные IP для пользователя (при необходимости) + /// + /// # Arguments + /// * `username` - Имя пользователя + pub async fn clear_user_ips(&self, username: &str) { + let mut active_ips = self.active_ips.write().await; + active_ips.remove(username); + } + + /// Очистить всю статистику (использовать с осторожностью!) + pub async fn clear_all(&self) { + let mut active_ips = self.active_ips.write().await; + active_ips.clear(); + } + + /// Проверить, подключен ли пользователь с данного IP + /// + /// # Arguments + /// * `username` - Имя пользователя + /// * `ip` - IP-адрес для проверки + /// + /// # Returns + /// `true` если IP активен, `false` если нет + pub async fn is_ip_active(&self, username: &str, ip: IpAddr) -> bool { + let active_ips = self.active_ips.read().await; + active_ips + .get(username) + .map(|ips| ips.contains(&ip)) + .unwrap_or(false) + } + + /// Получить лимит для пользователя + /// + /// # Arguments + /// * `username` - Имя пользователя + /// + /// # Returns + /// Лимит IP-адресов или None, если лимит не установлен + pub async fn get_user_limit(&self, username: &str) -> Option { + let max_ips = self.max_ips.read().await; + max_ips.get(username).copied() + } + + /// Форматировать статистику в читаемый текст + /// + /// # Returns + /// Строка со статистикой для логов или мониторинга + pub async fn format_stats(&self) -> String { + let stats = self.get_stats().await; + + if stats.is_empty() { + return String::from("No active users"); + } + + let mut output = String::from("User IP Statistics:\n"); + output.push_str("==================\n"); + + for (username, active_count, limit) in stats { + output.push_str(&format!( + "User: {:<20} Active IPs: {}/{}\n", + username, + active_count, + if limit > 0 { limit.to_string() } else { "unlimited".to_string() } + )); + + let ips = self.get_active_ips(&username).await; + for ip in ips { + output.push_str(&format!(" └─ {}\n", ip)); + } + } + + output + } +} + +impl Default for UserIpTracker { + fn default() -> Self { + Self::new() + } +} + +// ============================================================================ +// ТЕСТЫ +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + + fn test_ipv4(oct1: u8, oct2: u8, oct3: u8, oct4: u8) -> IpAddr { + IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4)) + } + + fn test_ipv6() -> IpAddr { + IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)) + } + + #[tokio::test] + async fn test_basic_ip_limit() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 2).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + let ip3 = test_ipv4(192, 168, 1, 3); + + // Первые два IP должны быть приняты + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + + // Третий IP должен быть отклонен + assert!(tracker.check_and_add("test_user", ip3).await.is_err()); + + // Проверяем счетчик + assert_eq!(tracker.get_active_ip_count("test_user").await, 2); + } + + #[tokio::test] + async fn test_reconnection_from_same_ip() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 2).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + + // Первое подключение + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + + // Повторное подключение с того же IP должно пройти + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + + // Счетчик не должен увеличиться + assert_eq!(tracker.get_active_ip_count("test_user").await, 1); + } + + #[tokio::test] + async fn test_ip_removal() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 2).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + let ip3 = test_ipv4(192, 168, 1, 3); + + // Добавляем два IP + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + + // Третий не должен пройти + assert!(tracker.check_and_add("test_user", ip3).await.is_err()); + + // Удаляем первый IP + tracker.remove_ip("test_user", ip1).await; + + // Теперь третий должен пройти + assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); + + assert_eq!(tracker.get_active_ip_count("test_user").await, 2); + } + + #[tokio::test] + async fn test_no_limit() { + let tracker = UserIpTracker::new(); + // Не устанавливаем лимит для test_user + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + let ip3 = test_ipv4(192, 168, 1, 3); + + // Без лимита все IP должны проходить + assert!(tracker.check_and_add("test_user", ip1).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip2).await.is_ok()); + assert!(tracker.check_and_add("test_user", ip3).await.is_ok()); + + assert_eq!(tracker.get_active_ip_count("test_user").await, 3); + } + + #[tokio::test] + async fn test_multiple_users() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("user1", 2).await; + tracker.set_user_limit("user2", 1).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + + // user1 может использовать 2 IP + assert!(tracker.check_and_add("user1", ip1).await.is_ok()); + assert!(tracker.check_and_add("user1", ip2).await.is_ok()); + + // user2 может использовать только 1 IP + assert!(tracker.check_and_add("user2", ip1).await.is_ok()); + assert!(tracker.check_and_add("user2", ip2).await.is_err()); + } + + #[tokio::test] + async fn test_ipv6_support() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 2).await; + + let ipv4 = test_ipv4(192, 168, 1, 1); + let ipv6 = test_ipv6(); + + // Должны работать оба типа адресов + assert!(tracker.check_and_add("test_user", ipv4).await.is_ok()); + assert!(tracker.check_and_add("test_user", ipv6).await.is_ok()); + + assert_eq!(tracker.get_active_ip_count("test_user").await, 2); + } + + #[tokio::test] + async fn test_get_active_ips() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("test_user", 3).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + + tracker.check_and_add("test_user", ip1).await.unwrap(); + tracker.check_and_add("test_user", ip2).await.unwrap(); + + let active_ips = tracker.get_active_ips("test_user").await; + assert_eq!(active_ips.len(), 2); + assert!(active_ips.contains(&ip1)); + assert!(active_ips.contains(&ip2)); + } + + #[tokio::test] + async fn test_stats() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("user1", 3).await; + tracker.set_user_limit("user2", 2).await; + + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + + tracker.check_and_add("user1", ip1).await.unwrap(); + tracker.check_and_add("user2", ip2).await.unwrap(); + + let stats = tracker.get_stats().await; + assert_eq!(stats.len(), 2); + + // Проверяем наличие обоих пользователей в статистике + assert!(stats.iter().any(|(name, _, _)| name == "user1")); + assert!(stats.iter().any(|(name, _, _)| name == "user2")); + } + + #[tokio::test] + async fn test_clear_user_ips() { + let tracker = UserIpTracker::new(); + let ip1 = test_ipv4(192, 168, 1, 1); + + tracker.check_and_add("test_user", ip1).await.unwrap(); + assert_eq!(tracker.get_active_ip_count("test_user").await, 1); + + tracker.clear_user_ips("test_user").await; + assert_eq!(tracker.get_active_ip_count("test_user").await, 0); + } + + #[tokio::test] + async fn test_is_ip_active() { + let tracker = UserIpTracker::new(); + let ip1 = test_ipv4(192, 168, 1, 1); + let ip2 = test_ipv4(192, 168, 1, 2); + + tracker.check_and_add("test_user", ip1).await.unwrap(); + + assert!(tracker.is_ip_active("test_user", ip1).await); + assert!(!tracker.is_ip_active("test_user", ip2).await); + } + + #[tokio::test] + async fn test_load_limits_from_config() { + let tracker = UserIpTracker::new(); + + let mut config_limits = HashMap::new(); + config_limits.insert("user1".to_string(), 5); + config_limits.insert("user2".to_string(), 3); + + tracker.load_limits(&config_limits).await; + + assert_eq!(tracker.get_user_limit("user1").await, Some(5)); + assert_eq!(tracker.get_user_limit("user2").await, Some(3)); + assert_eq!(tracker.get_user_limit("user3").await, None); + } +} diff --git a/src/main.rs b/src/main.rs index f0a01e6..5ae4f66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ mod cli; mod config; mod crypto; mod error; +mod ip_tracker; mod protocol; mod proxy; mod stats; @@ -27,6 +28,8 @@ use crate::proxy::{ClientHandler, handle_client_stream}; #[cfg(unix)] use crate::transport::{create_unix_listener, cleanup_unix_socket}; use crate::crypto::SecureRandom; +use crate::ip_tracker::UserIpTracker; +use crate::proxy::ClientHandler; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::MePool; @@ -215,6 +218,14 @@ async fn main() -> std::result::Result<(), Box> { let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); + // IP Tracker initialization + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.load_limits(&config.access.user_max_unique_ips).await; + + if !config.access.user_max_unique_ips.is_empty() { + info!("IP limits configured for {} users", config.access.user_max_unique_ips.len()); + } + // Connection concurrency limit let _max_connections = Arc::new(Semaphore::new(10_000)); @@ -568,6 +579,7 @@ async fn main() -> std::result::Result<(), Box> { let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let ip_tracker = ip_tracker.clone(); tokio::spawn(async move { loop { @@ -580,6 +592,7 @@ async fn main() -> std::result::Result<(), Box> { let buffer_pool = buffer_pool.clone(); let rng = rng.clone(); let me_pool = me_pool.clone(); + let ip_tracker = ip_tracker.clone(); tokio::spawn(async move { if let Err(e) = ClientHandler::new( @@ -592,6 +605,7 @@ async fn main() -> std::result::Result<(), Box> { buffer_pool, rng, me_pool, + ip_tracker, ) .run() .await diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 4ea86d3..8b4e8bc 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -11,6 +11,7 @@ use tracing::{debug, warn}; use crate::config::ProxyConfig; use crate::crypto::SecureRandom; use crate::error::{HandshakeResult, ProxyError, Result}; +use crate::ip_tracker::UserIpTracker; use crate::protocol::constants::*; use crate::protocol::tls; use crate::stats::{ReplayChecker, Stats}; @@ -178,6 +179,7 @@ pub struct RunningClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + ip_tracker: Arc, } impl ClientHandler { @@ -191,6 +193,7 @@ impl ClientHandler { buffer_pool: Arc, rng: Arc, me_pool: Option>, + ip_tracker: Arc, ) -> RunningClientHandler { RunningClientHandler { stream, @@ -202,6 +205,7 @@ impl ClientHandler { buffer_pool, rng, me_pool, + ip_tracker, } } } @@ -346,6 +350,8 @@ impl RunningClientHandler { self.rng, self.me_pool, local_addr, + peer, + self.ip_tracker, ) .await } @@ -404,6 +410,8 @@ impl RunningClientHandler { self.rng, self.me_pool, local_addr, + peer, + self.ip_tracker, ) .await } @@ -423,6 +431,8 @@ impl RunningClientHandler { rng: Arc, me_pool: Option>, local_addr: SocketAddr, + peer_addr: SocketAddr, + ip_tracker: Arc, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -430,11 +440,36 @@ impl RunningClientHandler { { let user = &success.user; - if let Err(e) = Self::check_user_limits_static(user, &config, &stats) { + if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await { warn!(user = %user, error = %e, "User limit exceeded"); return Err(e); } + // IP Cleanup Guard: автоматически удаляет IP при выходе из scope + struct IpCleanupGuard { + tracker: Arc, + user: String, + ip: std::net::IpAddr, + } + + impl Drop for IpCleanupGuard { + fn drop(&mut self) { + let tracker = self.tracker.clone(); + let user = self.user.clone(); + let ip = self.ip; + tokio::spawn(async move { + tracker.remove_ip(&user, ip).await; + debug!(user = %user, ip = %ip, "IP cleaned up on disconnect"); + }); + } + } + + let _cleanup = IpCleanupGuard { + tracker: ip_tracker, + user: user.clone(), + ip: peer_addr.ip(), + }; + // Decide: middle proxy or direct if config.general.use_middle_proxy { if let Some(ref pool) = me_pool { @@ -467,7 +502,13 @@ impl RunningClientHandler { .await } - fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> { + async fn check_user_limits_static( + user: &str, + config: &ProxyConfig, + stats: &Stats, + peer_addr: SocketAddr, + ip_tracker: &UserIpTracker, + ) -> Result<()> { if let Some(expiration) = config.access.user_expirations.get(user) { if chrono::Utc::now() > *expiration { return Err(ProxyError::UserExpired { @@ -476,6 +517,19 @@ impl RunningClientHandler { } } + // IP limit check + if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await { + warn!( + user = %user, + ip = %peer_addr.ip(), + reason = %reason, + "IP limit exceeded" + ); + return Err(ProxyError::ConnectionLimitExceeded { + user: user.to_string(), + }); + } + if let Some(limit) = config.access.user_max_tcp_conns.get(user) { if stats.get_user_curr_connects(user) >= *limit as u64 { return Err(ProxyError::ConnectionLimitExceeded {