diff --git a/src/config/load.rs b/src/config/load.rs index 512b734..4f00f77 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -208,6 +208,8 @@ impl ProxyConfig { upstream_type: UpstreamType::Direct { interface: None }, weight: 1, enabled: true, + scopes: String::new(), + selected_scope: String::new(), }); } diff --git a/src/config/types.rs b/src/config/types.rs index ef8fef7..766851b 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -403,6 +403,10 @@ pub struct UpstreamConfig { pub weight: u16, #[serde(default = "default_true")] pub enabled: bool, + #[serde(default)] + pub scopes: String, + #[serde(skip)] + pub selected_scope: String, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 537a93e..7b1ac1b 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -45,7 +45,7 @@ where ); let tg_stream = upstream_manager - .connect(dc_addr, Some(success.dc_idx)) + .connect(dc_addr, Some(success.dc_idx), user.strip_prefix("scope_").filter(|s| !s.is_empty())) .await?; debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected, performing TG handshake"); diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index bf73408..8fdd437 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -167,20 +167,44 @@ impl UpstreamManager { } /// Select upstream using latency-weighted random selection. - async fn select_upstream(&self, dc_idx: Option) -> Option { + async fn select_upstream(&self, dc_idx: Option, scope: Option<&str>) -> Option { let upstreams = self.upstreams.read().await; if upstreams.is_empty() { return None; } - - let healthy: Vec = upstreams.iter() + // Scope filter: + // If scope is set: only scoped and matched items + // If scope is not set: only unscoped items + let filtered_upstreams : Vec = upstreams.iter() .enumerate() - .filter(|(_, u)| u.healthy) + .filter(|(_, u)| { + scope.map_or( + u.config.scopes.is_empty(), + |req_scope| { + u.config.scopes + .split(',') + .map(str::trim) + .any(|s| s == req_scope) + } + ) + }) .map(|(i, _)| i) .collect(); + // Healthy filter + let healthy: Vec = filtered_upstreams.iter() + .filter(|&&i| upstreams[i].healthy) + .copied() + .collect(); + + if filtered_upstreams.is_empty() { + warn!(scope = scope, "No upstreams available! Using first (direct?)"); + return None; + } + if healthy.is_empty() { - return Some(rand::rng().gen_range(0..upstreams.len())); + warn!(scope = scope, "No healthy upstreams available! Using random."); + return Some(filtered_upstreams[rand::rng().gen_range(0..filtered_upstreams.len())]); } if healthy.len() == 1 { @@ -222,15 +246,20 @@ impl UpstreamManager { } /// Connect to target through a selected upstream. - pub async fn connect(&self, target: SocketAddr, dc_idx: Option) -> Result { - let idx = self.select_upstream(dc_idx).await + pub async fn connect(&self, target: SocketAddr, dc_idx: Option, scope: Option<&str>) -> Result { + let idx = self.select_upstream(dc_idx, scope).await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let upstream = { + let mut upstream = { let guard = self.upstreams.read().await; guard[idx].config.clone() }; + // Set scope for configuration copy + if let Some(s) = scope { + upstream.selected_scope = s.to_string(); + } + let start = Instant::now(); match self.connect_via_upstream(&upstream, target).await { @@ -313,8 +342,12 @@ impl UpstreamManager { if let Some(e) = stream.take_error()? { return Err(ProxyError::Io(e)); } + // replace socks user_id with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _user_id: Option<&str> = scope.or(user_id.as_deref()); - connect_socks4(&mut stream, target, user_id.as_deref()).await?; + connect_socks4(&mut stream, target, _user_id).await?; Ok(stream) }, UpstreamType::Socks5 { address, interface, username, password } => { @@ -341,7 +374,14 @@ impl UpstreamManager { return Err(ProxyError::Io(e)); } - connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?; + debug!(config = ?config, "Socks5 connection"); + // replace socks user:pass with config.selected_scope, if set + let scope: Option<&str> = Some(config.selected_scope.as_str()) + .filter(|s| !s.is_empty()); + let _username: Option<&str> = scope.or(username.as_deref()); + let _password: Option<&str> = scope.or(password.as_deref()); + + connect_socks5(&mut stream, target, _username, _password).await?; Ok(stream) }, }