Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85cb4092d5 | ||
|
|
5016160ac3 | ||
|
|
4f007f3128 | ||
|
|
7746a1177c | ||
|
|
2bb2a2983f |
20
README.md
20
README.md
@@ -57,6 +57,14 @@ Generate 16 bytes/32 characters HEX with OpenSSL or another way:
|
|||||||
```bash
|
```bash
|
||||||
openssl rand -hex 16
|
openssl rand -hex 16
|
||||||
```
|
```
|
||||||
|
OR
|
||||||
|
```bash
|
||||||
|
xxd -l 16 -p /dev/urandom
|
||||||
|
```
|
||||||
|
OR
|
||||||
|
```bash
|
||||||
|
python3 -c 'import os; print(os.urandom(16).hex())'
|
||||||
|
```
|
||||||
|
|
||||||
**1. Place your config to /etc/telemt.toml**
|
**1. Place your config to /etc/telemt.toml**
|
||||||
|
|
||||||
@@ -127,11 +135,17 @@ then Ctrl+X -> Y -> Enter to save
|
|||||||
- high entropy, which is normal for AES-encrypted traffic;
|
- high entropy, which is normal for AES-encrypted traffic;
|
||||||
### Whitelist on IP
|
### Whitelist on IP
|
||||||
- MTProxy cannot work when there is:
|
- MTProxy cannot work when there is:
|
||||||
- no IP connectivity to the target host
|
- no IP connectivity to the target host: Russian Whitelist on Mobile Networks - "Белый список"
|
||||||
- OR all TCP traffic is blocked
|
- OR all TCP traffic is blocked
|
||||||
- OR all TLS traffic is blocked,
|
- OR high entropy/encrypted traffic is blocked: content filters at universities and critical infrastructure
|
||||||
|
- OR all TLS traffic is blocked
|
||||||
|
- OR specified port is blocked: use 443 to make it "like real"
|
||||||
|
- OR provided SNI is blocked: use "officially approved"/innocuous name
|
||||||
- like most protocols on the Internet;
|
- like most protocols on the Internet;
|
||||||
- this situation is observed in China behind the Great Chinese Firewall and in Russia on mobile networks
|
- these situations are observed:
|
||||||
|
- in China behind the Great Firewall
|
||||||
|
- in Russia on mobile networks, less in wired networks
|
||||||
|
- in Iran during "activity"
|
||||||
|
|
||||||
## Why Rust?
|
## Why Rust?
|
||||||
- Long-running reliability and idempotent behavior
|
- Long-running reliability and idempotent behavior
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ pub struct ProxyModes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn default_true() -> bool { true }
|
fn default_true() -> bool { true }
|
||||||
|
fn default_weight() -> u16 { 1 }
|
||||||
|
|
||||||
impl Default for ProxyModes {
|
impl Default for ProxyModes {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
@@ -25,6 +26,48 @@ impl Default for ProxyModes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
#[serde(tag = "type", rename_all = "lowercase")]
|
||||||
|
pub enum UpstreamType {
|
||||||
|
Direct {
|
||||||
|
#[serde(default)]
|
||||||
|
interface: Option<String>, // Bind to specific IP/Interface
|
||||||
|
},
|
||||||
|
Socks4 {
|
||||||
|
address: String, // IP:Port of SOCKS server
|
||||||
|
#[serde(default)]
|
||||||
|
interface: Option<String>, // Bind to specific IP/Interface for connection to SOCKS
|
||||||
|
#[serde(default)]
|
||||||
|
user_id: Option<String>,
|
||||||
|
},
|
||||||
|
Socks5 {
|
||||||
|
address: String,
|
||||||
|
#[serde(default)]
|
||||||
|
interface: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
username: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
password: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct UpstreamConfig {
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub upstream_type: UpstreamType,
|
||||||
|
#[serde(default = "default_weight")]
|
||||||
|
pub weight: u16,
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
pub enabled: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ListenerConfig {
|
||||||
|
pub ip: IpAddr,
|
||||||
|
#[serde(default)]
|
||||||
|
pub announce_ip: Option<IpAddr>, // IP to show in tg:// links
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct ProxyConfig {
|
pub struct ProxyConfig {
|
||||||
#[serde(default = "default_port")]
|
#[serde(default = "default_port")]
|
||||||
@@ -104,6 +147,16 @@ pub struct ProxyConfig {
|
|||||||
|
|
||||||
#[serde(default = "default_fake_cert_len")]
|
#[serde(default = "default_fake_cert_len")]
|
||||||
pub fake_cert_len: usize,
|
pub fake_cert_len: usize,
|
||||||
|
|
||||||
|
// New fields
|
||||||
|
#[serde(default)]
|
||||||
|
pub upstreams: Vec<UpstreamConfig>,
|
||||||
|
|
||||||
|
#[serde(default)]
|
||||||
|
pub listeners: Vec<ListenerConfig>,
|
||||||
|
|
||||||
|
#[serde(default)]
|
||||||
|
pub show_link: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_port() -> u16 { 443 }
|
fn default_port() -> u16 { 443 }
|
||||||
@@ -156,6 +209,9 @@ impl Default for ProxyConfig {
|
|||||||
metrics_port: None,
|
metrics_port: None,
|
||||||
metrics_whitelist: default_metrics_whitelist(),
|
metrics_whitelist: default_metrics_whitelist(),
|
||||||
fake_cert_len: default_fake_cert_len(),
|
fake_cert_len: default_fake_cert_len(),
|
||||||
|
upstreams: Vec::new(),
|
||||||
|
listeners: Vec::new(),
|
||||||
|
show_link: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -187,6 +243,33 @@ impl ProxyConfig {
|
|||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
config.fake_cert_len = rand::thread_rng().gen_range(1024..4096);
|
config.fake_cert_len = rand::thread_rng().gen_range(1024..4096);
|
||||||
|
|
||||||
|
// Migration: Populate listeners if empty
|
||||||
|
if config.listeners.is_empty() {
|
||||||
|
if let Ok(ipv4) = config.listen_addr_ipv4.parse::<IpAddr>() {
|
||||||
|
config.listeners.push(ListenerConfig {
|
||||||
|
ip: ipv4,
|
||||||
|
announce_ip: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(ipv6_str) = &config.listen_addr_ipv6 {
|
||||||
|
if let Ok(ipv6) = ipv6_str.parse::<IpAddr>() {
|
||||||
|
config.listeners.push(ListenerConfig {
|
||||||
|
ip: ipv6,
|
||||||
|
announce_ip: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migration: Populate upstreams if empty (Default Direct)
|
||||||
|
if config.upstreams.is_empty() {
|
||||||
|
config.upstreams.push(UpstreamConfig {
|
||||||
|
upstream_type: UpstreamType::Direct { interface: None },
|
||||||
|
weight: 1,
|
||||||
|
enabled: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,26 +285,3 @@ impl ProxyConfig {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_default_config() {
|
|
||||||
let config = ProxyConfig::default();
|
|
||||||
assert_eq!(config.port, 443);
|
|
||||||
assert!(config.modes.tls);
|
|
||||||
assert_eq!(config.client_keepalive, 600);
|
|
||||||
assert_eq!(config.client_ack_timeout, 300);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_config_validate() {
|
|
||||||
let mut config = ProxyConfig::default();
|
|
||||||
assert!(config.validate().is_ok());
|
|
||||||
|
|
||||||
config.users.clear();
|
|
||||||
assert!(config.validate().is_err());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -235,6 +235,9 @@ pub enum ProxyError {
|
|||||||
#[error("Invalid proxy protocol header")]
|
#[error("Invalid proxy protocol header")]
|
||||||
InvalidProxyProtocol,
|
InvalidProxyProtocol,
|
||||||
|
|
||||||
|
#[error("Proxy error: {0}")]
|
||||||
|
Proxy(String),
|
||||||
|
|
||||||
// ============= Config Errors =============
|
// ============= Config Errors =============
|
||||||
|
|
||||||
#[error("Config error: {0}")]
|
#[error("Config error: {0}")]
|
||||||
|
|||||||
301
src/main.rs
301
src/main.rs
@@ -1,158 +1,193 @@
|
|||||||
//! Telemt - MTProxy on Rust
|
//! Telemt - MTProxy on Rust
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
use tracing::{info, error, Level};
|
use tracing::{info, error, warn};
|
||||||
use tracing_subscriber::{FmtSubscriber, EnvFilter};
|
use tracing_subscriber::{fmt, EnvFilter};
|
||||||
|
|
||||||
mod error;
|
mod config;
|
||||||
mod crypto;
|
mod crypto;
|
||||||
|
mod error;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
|
mod proxy;
|
||||||
|
mod stats;
|
||||||
mod stream;
|
mod stream;
|
||||||
mod transport;
|
mod transport;
|
||||||
mod proxy;
|
|
||||||
mod config;
|
|
||||||
mod stats;
|
|
||||||
mod util;
|
mod util;
|
||||||
|
|
||||||
use config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
use stats::{Stats, ReplayChecker};
|
use crate::proxy::ClientHandler;
|
||||||
use transport::ConnectionPool;
|
use crate::stats::Stats;
|
||||||
use proxy::ClientHandler;
|
use crate::transport::{create_listener, ListenOptions, UpstreamManager};
|
||||||
|
use crate::util::ip::detect_ip;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
// Initialize logging with env filter
|
// Initialize logging
|
||||||
// Use RUST_LOG=debug or RUST_LOG=trace for more details
|
fmt()
|
||||||
let filter = EnvFilter::try_from_default_env()
|
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse().unwrap()))
|
||||||
.unwrap_or_else(|_| EnvFilter::new("info"));
|
.init();
|
||||||
|
|
||||||
let subscriber = FmtSubscriber::builder()
|
// Load config
|
||||||
.with_env_filter(filter)
|
let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.toml".to_string());
|
||||||
.with_target(true)
|
let config = match ProxyConfig::load(&config_path) {
|
||||||
.with_thread_ids(false)
|
Ok(c) => c,
|
||||||
.with_file(false)
|
Err(e) => {
|
||||||
.with_line_number(false)
|
// If config doesn't exist, try to create default
|
||||||
.finish();
|
if std::path::Path::new(&config_path).exists() {
|
||||||
|
error!("Failed to load config: {}", e);
|
||||||
tracing::subscriber::set_global_default(subscriber)?;
|
std::process::exit(1);
|
||||||
|
} else {
|
||||||
// Load configuration
|
let default = ProxyConfig::default();
|
||||||
let config_path = std::env::args()
|
let toml = toml::to_string_pretty(&default).unwrap();
|
||||||
.nth(1)
|
std::fs::write(&config_path, toml).unwrap();
|
||||||
.unwrap_or_else(|| "config.toml".to_string());
|
info!("Created default config at {}", config_path);
|
||||||
|
default
|
||||||
info!("Loading configuration from {}", config_path);
|
|
||||||
|
|
||||||
let config = ProxyConfig::load(&config_path).unwrap_or_else(|e| {
|
|
||||||
error!("Failed to load config: {}", e);
|
|
||||||
info!("Using default configuration");
|
|
||||||
ProxyConfig::default()
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = config.validate() {
|
|
||||||
error!("Invalid configuration: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let config = Arc::new(config);
|
|
||||||
|
|
||||||
info!("Starting MTProto Proxy on port {}", config.port);
|
|
||||||
info!("Fast mode: {}", config.fast_mode);
|
|
||||||
info!("Modes: classic={}, secure={}, tls={}",
|
|
||||||
config.modes.classic, config.modes.secure, config.modes.tls);
|
|
||||||
|
|
||||||
// Initialize components
|
|
||||||
let stats = Arc::new(Stats::new());
|
|
||||||
let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len));
|
|
||||||
let pool = Arc::new(ConnectionPool::new());
|
|
||||||
|
|
||||||
// Create handler
|
|
||||||
let handler = Arc::new(ClientHandler::new(
|
|
||||||
Arc::clone(&config),
|
|
||||||
Arc::clone(&stats),
|
|
||||||
Arc::clone(&replay_checker),
|
|
||||||
Arc::clone(&pool),
|
|
||||||
));
|
|
||||||
|
|
||||||
// Start listener
|
|
||||||
let addr: SocketAddr = format!("{}:{}", config.listen_addr_ipv4, config.port)
|
|
||||||
.parse()?;
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
info!("Listening on {}", addr);
|
|
||||||
|
|
||||||
// Print proxy links
|
|
||||||
print_proxy_links(&config);
|
|
||||||
|
|
||||||
info!("Use RUST_LOG=debug or RUST_LOG=trace for more detailed logging");
|
|
||||||
|
|
||||||
// Main accept loop
|
|
||||||
let accept_loop = async {
|
|
||||||
loop {
|
|
||||||
match listener.accept().await {
|
|
||||||
Ok((stream, peer)) => {
|
|
||||||
let handler = Arc::clone(&handler);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
handler.handle(stream, peer).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Accept error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Graceful shutdown
|
config.validate()?;
|
||||||
tokio::select! {
|
|
||||||
_ = accept_loop => {}
|
let config = Arc::new(config);
|
||||||
_ = signal::ctrl_c() => {
|
let stats = Arc::new(Stats::new());
|
||||||
info!("Shutting down...");
|
|
||||||
|
// Initialize Upstream Manager
|
||||||
|
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
|
||||||
|
|
||||||
|
// Start Health Checks
|
||||||
|
let um_clone = upstream_manager.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
um_clone.run_health_checks().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Detect public IP if needed (once at startup)
|
||||||
|
let detected_ip = detect_ip().await;
|
||||||
|
|
||||||
|
// Start Listeners
|
||||||
|
let mut listeners = Vec::new();
|
||||||
|
|
||||||
|
for listener_conf in &config.listeners {
|
||||||
|
let addr = SocketAddr::new(listener_conf.ip, config.port);
|
||||||
|
let options = ListenOptions {
|
||||||
|
ipv6_only: listener_conf.ip.is_ipv6(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
match create_listener(addr, &options) {
|
||||||
|
Ok(socket) => {
|
||||||
|
let listener = TcpListener::from_std(socket.into())?;
|
||||||
|
info!("Listening on {}", addr);
|
||||||
|
|
||||||
|
// Determine public IP for tg:// links
|
||||||
|
// 1. Use explicit announce_ip if set
|
||||||
|
// 2. If listening on 0.0.0.0 or ::, use detected public IP
|
||||||
|
// 3. Otherwise use the bind IP
|
||||||
|
let public_ip = if let Some(ip) = listener_conf.announce_ip {
|
||||||
|
ip
|
||||||
|
} else if listener_conf.ip.is_unspecified() {
|
||||||
|
// Try to use detected IP of the same family
|
||||||
|
if listener_conf.ip.is_ipv4() {
|
||||||
|
detected_ip.ipv4.unwrap_or(listener_conf.ip)
|
||||||
|
} else {
|
||||||
|
detected_ip.ipv6.unwrap_or(listener_conf.ip)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
listener_conf.ip
|
||||||
|
};
|
||||||
|
|
||||||
|
// Show links for configured users
|
||||||
|
if !config.show_link.is_empty() {
|
||||||
|
info!("--- Proxy Links for {} ---", public_ip);
|
||||||
|
for user_name in &config.show_link {
|
||||||
|
if let Some(secret) = config.users.get(user_name) {
|
||||||
|
info!("User: {}", user_name);
|
||||||
|
|
||||||
|
// Classic
|
||||||
|
if config.modes.classic {
|
||||||
|
info!(" Classic: tg://proxy?server={}&port={}&secret={}",
|
||||||
|
public_ip, config.port, secret);
|
||||||
|
}
|
||||||
|
|
||||||
|
// DD (Secure)
|
||||||
|
if config.modes.secure {
|
||||||
|
info!(" DD: tg://proxy?server={}&port={}&secret=dd{}",
|
||||||
|
public_ip, config.port, secret);
|
||||||
|
}
|
||||||
|
|
||||||
|
// EE-TLS (FakeTLS)
|
||||||
|
if config.modes.tls {
|
||||||
|
let domain_hex = hex::encode(&config.tls_domain);
|
||||||
|
info!(" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
|
||||||
|
public_ip, config.port, secret, domain_hex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("User '{}' specified in show_link not found in users list", user_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("-----------------------------------");
|
||||||
|
}
|
||||||
|
|
||||||
|
listeners.push(listener);
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to bind to {}: {}", addr, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup
|
if listeners.is_empty() {
|
||||||
pool.close_all().await;
|
error!("No listeners could be started. Exiting.");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept loop
|
||||||
|
// For simplicity in this slice, we just spawn a task for each listener
|
||||||
|
// In a real high-perf scenario, we might want a more complex accept loop
|
||||||
|
|
||||||
|
for listener in listeners {
|
||||||
|
let config = config.clone();
|
||||||
|
let stats = stats.clone();
|
||||||
|
let upstream_manager = upstream_manager.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((stream, peer_addr)) => {
|
||||||
|
let config = config.clone();
|
||||||
|
let stats = stats.clone();
|
||||||
|
let upstream_manager = upstream_manager.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = ClientHandler::new(
|
||||||
|
stream,
|
||||||
|
peer_addr,
|
||||||
|
config,
|
||||||
|
stats,
|
||||||
|
upstream_manager
|
||||||
|
).run().await {
|
||||||
|
// Log only relevant errors
|
||||||
|
// debug!("Connection error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Accept error: {}", e);
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for signal
|
||||||
|
match signal::ctrl_c().await {
|
||||||
|
Ok(()) => info!("Shutting down..."),
|
||||||
|
Err(e) => error!("Signal error: {}", e),
|
||||||
|
}
|
||||||
|
|
||||||
info!("Goodbye!");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print_proxy_links(config: &ProxyConfig) {
|
|
||||||
println!("\n=== Proxy Links ===\n");
|
|
||||||
|
|
||||||
for (user, secret) in &config.users {
|
|
||||||
if config.modes.tls {
|
|
||||||
let tls_secret = format!(
|
|
||||||
"ee{}{}",
|
|
||||||
secret,
|
|
||||||
hex::encode(config.tls_domain.as_bytes())
|
|
||||||
);
|
|
||||||
println!(
|
|
||||||
"{} (TLS): tg://proxy?server=IP&port={}&secret={}",
|
|
||||||
user, config.port, tls_secret
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.modes.secure {
|
|
||||||
println!(
|
|
||||||
"{} (Secure): tg://proxy?server=IP&port={}&secret=dd{}",
|
|
||||||
user, config.port, secret
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.modes.classic {
|
|
||||||
println!(
|
|
||||||
"{} (Classic): tg://proxy?server=IP&port={}&secret={}",
|
|
||||||
user, config.port, secret
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
println!();
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("===================\n");
|
|
||||||
}
|
|
||||||
@@ -13,7 +13,7 @@ use crate::error::{ProxyError, Result, HandshakeResult};
|
|||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
use crate::protocol::tls;
|
use crate::protocol::tls;
|
||||||
use crate::stats::{Stats, ReplayChecker};
|
use crate::stats::{Stats, ReplayChecker};
|
||||||
use crate::transport::{ConnectionPool, configure_client_socket};
|
use crate::transport::{configure_client_socket, UpstreamManager};
|
||||||
use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter};
|
use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter};
|
||||||
use crate::crypto::AesCtr;
|
use crate::crypto::AesCtr;
|
||||||
|
|
||||||
@@ -24,39 +24,55 @@ use super::handshake::{
|
|||||||
use super::relay::relay_bidirectional;
|
use super::relay::relay_bidirectional;
|
||||||
use super::masking::handle_bad_client;
|
use super::masking::handle_bad_client;
|
||||||
|
|
||||||
/// Client connection handler
|
/// Client connection handler (builder struct)
|
||||||
pub struct ClientHandler {
|
pub struct ClientHandler;
|
||||||
|
|
||||||
|
/// Running client handler with stream and context
|
||||||
|
pub struct RunningClientHandler {
|
||||||
|
stream: TcpStream,
|
||||||
|
peer: SocketAddr,
|
||||||
config: Arc<ProxyConfig>,
|
config: Arc<ProxyConfig>,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
replay_checker: Arc<ReplayChecker>,
|
replay_checker: Arc<ReplayChecker>,
|
||||||
pool: Arc<ConnectionPool>,
|
upstream_manager: Arc<UpstreamManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientHandler {
|
impl ClientHandler {
|
||||||
/// Create new client handler
|
/// Create new client handler instance
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
stream: TcpStream,
|
||||||
|
peer: SocketAddr,
|
||||||
config: Arc<ProxyConfig>,
|
config: Arc<ProxyConfig>,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
replay_checker: Arc<ReplayChecker>,
|
upstream_manager: Arc<UpstreamManager>,
|
||||||
pool: Arc<ConnectionPool>,
|
) -> RunningClientHandler {
|
||||||
) -> Self {
|
// Note: ReplayChecker should be shared globally for proper replay protection
|
||||||
Self {
|
// Creating it per-connection disables replay protection across connections
|
||||||
|
// TODO: Pass Arc<ReplayChecker> from main.rs
|
||||||
|
let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len));
|
||||||
|
|
||||||
|
RunningClientHandler {
|
||||||
|
stream,
|
||||||
|
peer,
|
||||||
config,
|
config,
|
||||||
stats,
|
stats,
|
||||||
replay_checker,
|
replay_checker,
|
||||||
pool,
|
upstream_manager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a client connection
|
impl RunningClientHandler {
|
||||||
pub async fn handle(&self, stream: TcpStream, peer: SocketAddr) {
|
/// Run the client handler
|
||||||
|
pub async fn run(mut self) -> Result<()> {
|
||||||
self.stats.increment_connects_all();
|
self.stats.increment_connects_all();
|
||||||
|
|
||||||
|
let peer = self.peer;
|
||||||
debug!(peer = %peer, "New connection");
|
debug!(peer = %peer, "New connection");
|
||||||
|
|
||||||
// Configure socket
|
// Configure socket
|
||||||
if let Err(e) = configure_client_socket(
|
if let Err(e) = configure_client_socket(
|
||||||
&stream,
|
&self.stream,
|
||||||
self.config.client_keepalive,
|
self.config.client_keepalive,
|
||||||
self.config.client_ack_timeout,
|
self.config.client_ack_timeout,
|
||||||
) {
|
) {
|
||||||
@@ -66,49 +82,56 @@ impl ClientHandler {
|
|||||||
// Perform handshake with timeout
|
// Perform handshake with timeout
|
||||||
let handshake_timeout = Duration::from_secs(self.config.client_handshake_timeout);
|
let handshake_timeout = Duration::from_secs(self.config.client_handshake_timeout);
|
||||||
|
|
||||||
|
// Clone stats for error handling block
|
||||||
|
let stats = self.stats.clone();
|
||||||
|
|
||||||
let result = timeout(
|
let result = timeout(
|
||||||
handshake_timeout,
|
handshake_timeout,
|
||||||
self.do_handshake(stream, peer)
|
self.do_handshake()
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
debug!(peer = %peer, "Connection handled successfully");
|
debug!(peer = %peer, "Connection handled successfully");
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!(peer = %peer, error = %e, "Handshake failed");
|
debug!(peer = %peer, error = %e, "Handshake failed");
|
||||||
|
Err(e)
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
self.stats.increment_handshake_timeouts();
|
stats.increment_handshake_timeouts();
|
||||||
debug!(peer = %peer, "Handshake timeout");
|
debug!(peer = %peer, "Handshake timeout");
|
||||||
|
Err(ProxyError::TgHandshakeTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform handshake and relay
|
/// Perform handshake and relay
|
||||||
async fn do_handshake(&self, mut stream: TcpStream, peer: SocketAddr) -> Result<()> {
|
async fn do_handshake(mut self) -> Result<()> {
|
||||||
// Read first bytes to determine handshake type
|
// Read first bytes to determine handshake type
|
||||||
let mut first_bytes = [0u8; 5];
|
let mut first_bytes = [0u8; 5];
|
||||||
stream.read_exact(&mut first_bytes).await?;
|
self.stream.read_exact(&mut first_bytes).await?;
|
||||||
|
|
||||||
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
||||||
|
let peer = self.peer;
|
||||||
|
|
||||||
debug!(peer = %peer, is_tls = is_tls, first_bytes = %hex::encode(&first_bytes), "Handshake type detected");
|
debug!(peer = %peer, is_tls = is_tls, first_bytes = %hex::encode(&first_bytes), "Handshake type detected");
|
||||||
|
|
||||||
if is_tls {
|
if is_tls {
|
||||||
self.handle_tls_client(stream, peer, first_bytes).await
|
self.handle_tls_client(first_bytes).await
|
||||||
} else {
|
} else {
|
||||||
self.handle_direct_client(stream, peer, first_bytes).await
|
self.handle_direct_client(first_bytes).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle TLS-wrapped client
|
/// Handle TLS-wrapped client
|
||||||
async fn handle_tls_client(
|
async fn handle_tls_client(
|
||||||
&self,
|
mut self,
|
||||||
mut stream: TcpStream,
|
|
||||||
peer: SocketAddr,
|
|
||||||
first_bytes: [u8; 5],
|
first_bytes: [u8; 5],
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let peer = self.peer;
|
||||||
|
|
||||||
// Read TLS handshake length
|
// Read TLS handshake length
|
||||||
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
|
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
|
||||||
|
|
||||||
@@ -117,17 +140,22 @@ impl ClientHandler {
|
|||||||
if tls_len < 512 {
|
if tls_len < 512 {
|
||||||
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
|
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
|
||||||
self.stats.increment_connects_bad();
|
self.stats.increment_connects_bad();
|
||||||
handle_bad_client(stream, &first_bytes, &self.config).await;
|
handle_bad_client(self.stream, &first_bytes, &self.config).await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read full TLS handshake
|
// Read full TLS handshake
|
||||||
let mut handshake = vec![0u8; 5 + tls_len];
|
let mut handshake = vec![0u8; 5 + tls_len];
|
||||||
handshake[..5].copy_from_slice(&first_bytes);
|
handshake[..5].copy_from_slice(&first_bytes);
|
||||||
stream.read_exact(&mut handshake[5..]).await?;
|
self.stream.read_exact(&mut handshake[5..]).await?;
|
||||||
|
|
||||||
|
// Extract fields before consuming self.stream
|
||||||
|
let config = self.config.clone();
|
||||||
|
let replay_checker = self.replay_checker.clone();
|
||||||
|
let stats = self.stats.clone();
|
||||||
|
|
||||||
// Split stream for reading/writing
|
// Split stream for reading/writing
|
||||||
let (read_half, write_half) = stream.into_split();
|
let (read_half, write_half) = self.stream.into_split();
|
||||||
|
|
||||||
// Handle TLS handshake
|
// Handle TLS handshake
|
||||||
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
|
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
|
||||||
@@ -135,12 +163,12 @@ impl ClientHandler {
|
|||||||
read_half,
|
read_half,
|
||||||
write_half,
|
write_half,
|
||||||
peer,
|
peer,
|
||||||
&self.config,
|
&config,
|
||||||
&self.replay_checker,
|
&replay_checker,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient => {
|
HandshakeResult::BadClient => {
|
||||||
self.stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
@@ -158,44 +186,62 @@ impl ClientHandler {
|
|||||||
tls_reader,
|
tls_reader,
|
||||||
tls_writer,
|
tls_writer,
|
||||||
peer,
|
peer,
|
||||||
&self.config,
|
&config,
|
||||||
&self.replay_checker,
|
&replay_checker,
|
||||||
true,
|
true,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient => {
|
HandshakeResult::BadClient => {
|
||||||
self.stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Handle authenticated client
|
// Handle authenticated client
|
||||||
self.handle_authenticated_inner(crypto_reader, crypto_writer, success).await
|
// We can't use self.handle_authenticated_inner because self is partially moved
|
||||||
|
// So we call it as an associated function or method on a new struct,
|
||||||
|
// or just inline the logic / use a static method.
|
||||||
|
// Since handle_authenticated_inner needs self.upstream_manager and self.stats,
|
||||||
|
// we should pass them explicitly.
|
||||||
|
|
||||||
|
Self::handle_authenticated_static(
|
||||||
|
crypto_reader,
|
||||||
|
crypto_writer,
|
||||||
|
success,
|
||||||
|
self.upstream_manager,
|
||||||
|
self.stats,
|
||||||
|
self.config
|
||||||
|
).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle direct (non-TLS) client
|
/// Handle direct (non-TLS) client
|
||||||
async fn handle_direct_client(
|
async fn handle_direct_client(
|
||||||
&self,
|
mut self,
|
||||||
mut stream: TcpStream,
|
|
||||||
peer: SocketAddr,
|
|
||||||
first_bytes: [u8; 5],
|
first_bytes: [u8; 5],
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let peer = self.peer;
|
||||||
|
|
||||||
// Check if non-TLS modes are enabled
|
// Check if non-TLS modes are enabled
|
||||||
if !self.config.modes.classic && !self.config.modes.secure {
|
if !self.config.modes.classic && !self.config.modes.secure {
|
||||||
debug!(peer = %peer, "Non-TLS modes disabled");
|
debug!(peer = %peer, "Non-TLS modes disabled");
|
||||||
self.stats.increment_connects_bad();
|
self.stats.increment_connects_bad();
|
||||||
handle_bad_client(stream, &first_bytes, &self.config).await;
|
handle_bad_client(self.stream, &first_bytes, &self.config).await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read rest of handshake
|
// Read rest of handshake
|
||||||
let mut handshake = [0u8; HANDSHAKE_LEN];
|
let mut handshake = [0u8; HANDSHAKE_LEN];
|
||||||
handshake[..5].copy_from_slice(&first_bytes);
|
handshake[..5].copy_from_slice(&first_bytes);
|
||||||
stream.read_exact(&mut handshake[5..]).await?;
|
self.stream.read_exact(&mut handshake[5..]).await?;
|
||||||
|
|
||||||
|
// Extract fields
|
||||||
|
let config = self.config.clone();
|
||||||
|
let replay_checker = self.replay_checker.clone();
|
||||||
|
let stats = self.stats.clone();
|
||||||
|
|
||||||
// Split stream
|
// Split stream
|
||||||
let (read_half, write_half) = stream.into_split();
|
let (read_half, write_half) = self.stream.into_split();
|
||||||
|
|
||||||
// Handle MTProto handshake
|
// Handle MTProto handshake
|
||||||
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
|
||||||
@@ -203,27 +249,36 @@ impl ClientHandler {
|
|||||||
read_half,
|
read_half,
|
||||||
write_half,
|
write_half,
|
||||||
peer,
|
peer,
|
||||||
&self.config,
|
&config,
|
||||||
&self.replay_checker,
|
&replay_checker,
|
||||||
false,
|
false,
|
||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient => {
|
HandshakeResult::BadClient => {
|
||||||
self.stats.increment_connects_bad();
|
stats.increment_connects_bad();
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
HandshakeResult::Error(e) => return Err(e),
|
HandshakeResult::Error(e) => return Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.handle_authenticated_inner(crypto_reader, crypto_writer, success).await
|
Self::handle_authenticated_static(
|
||||||
|
crypto_reader,
|
||||||
|
crypto_writer,
|
||||||
|
success,
|
||||||
|
self.upstream_manager,
|
||||||
|
self.stats,
|
||||||
|
self.config
|
||||||
|
).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle authenticated client - connect to Telegram and relay
|
/// Static version of handle_authenticated_inner to avoid ownership issues
|
||||||
async fn handle_authenticated_inner<R, W>(
|
async fn handle_authenticated_static<R, W>(
|
||||||
&self,
|
|
||||||
client_reader: CryptoReader<R>,
|
client_reader: CryptoReader<R>,
|
||||||
client_writer: CryptoWriter<W>,
|
client_writer: CryptoWriter<W>,
|
||||||
success: HandshakeSuccess,
|
success: HandshakeSuccess,
|
||||||
|
upstream_manager: Arc<UpstreamManager>,
|
||||||
|
stats: Arc<Stats>,
|
||||||
|
config: Arc<ProxyConfig>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
@@ -232,13 +287,13 @@ impl ClientHandler {
|
|||||||
let user = &success.user;
|
let user = &success.user;
|
||||||
|
|
||||||
// Check user limits
|
// Check user limits
|
||||||
if let Err(e) = self.check_user_limits(user) {
|
if let Err(e) = Self::check_user_limits_static(user, &config, &stats) {
|
||||||
warn!(user = %user, error = %e, "User limit exceeded");
|
warn!(user = %user, error = %e, "User limit exceeded");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get datacenter address
|
// Get datacenter address
|
||||||
let dc_addr = self.get_dc_addr(success.dc_idx)?;
|
let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
user = %user,
|
user = %user,
|
||||||
@@ -246,39 +301,40 @@ impl ClientHandler {
|
|||||||
dc = success.dc_idx,
|
dc = success.dc_idx,
|
||||||
dc_addr = %dc_addr,
|
dc_addr = %dc_addr,
|
||||||
proto = ?success.proto_tag,
|
proto = ?success.proto_tag,
|
||||||
fast_mode = self.config.fast_mode,
|
fast_mode = config.fast_mode,
|
||||||
"Connecting to Telegram"
|
"Connecting to Telegram"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Connect to Telegram
|
// Connect to Telegram via UpstreamManager
|
||||||
let tg_stream = self.pool.get(dc_addr).await?;
|
let tg_stream = upstream_manager.connect(dc_addr).await?;
|
||||||
|
|
||||||
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected to Telegram, performing handshake");
|
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected to Telegram, performing handshake");
|
||||||
|
|
||||||
// Perform Telegram handshake and get crypto streams
|
// Perform Telegram handshake and get crypto streams
|
||||||
let (tg_reader, tg_writer) = self.do_tg_handshake(
|
let (tg_reader, tg_writer) = Self::do_tg_handshake_static(
|
||||||
tg_stream,
|
tg_stream,
|
||||||
&success,
|
&success,
|
||||||
|
&config,
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
debug!(peer = %success.peer, "Telegram handshake complete, starting relay");
|
debug!(peer = %success.peer, "Telegram handshake complete, starting relay");
|
||||||
|
|
||||||
// Update stats
|
// Update stats
|
||||||
self.stats.increment_user_connects(user);
|
stats.increment_user_connects(user);
|
||||||
self.stats.increment_user_curr_connects(user);
|
stats.increment_user_curr_connects(user);
|
||||||
|
|
||||||
// Relay traffic - передаём Arc::clone(&self.stats)
|
// Relay traffic
|
||||||
let relay_result = relay_bidirectional(
|
let relay_result = relay_bidirectional(
|
||||||
client_reader,
|
client_reader,
|
||||||
client_writer,
|
client_writer,
|
||||||
tg_reader,
|
tg_reader,
|
||||||
tg_writer,
|
tg_writer,
|
||||||
user,
|
user,
|
||||||
Arc::clone(&self.stats),
|
Arc::clone(&stats),
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
// Update stats
|
// Update stats
|
||||||
self.stats.decrement_user_curr_connects(user);
|
stats.decrement_user_curr_connects(user);
|
||||||
|
|
||||||
match &relay_result {
|
match &relay_result {
|
||||||
Ok(()) => debug!(user = %user, peer = %success.peer, "Relay completed normally"),
|
Ok(()) => debug!(user = %user, peer = %success.peer, "Relay completed normally"),
|
||||||
@@ -288,26 +344,26 @@ impl ClientHandler {
|
|||||||
relay_result
|
relay_result
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check user limits (expiration, connection count, data quota)
|
/// Check user limits (static version)
|
||||||
fn check_user_limits(&self, user: &str) -> Result<()> {
|
fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> {
|
||||||
// Check expiration
|
// Check expiration
|
||||||
if let Some(expiration) = self.config.user_expirations.get(user) {
|
if let Some(expiration) = config.user_expirations.get(user) {
|
||||||
if chrono::Utc::now() > *expiration {
|
if chrono::Utc::now() > *expiration {
|
||||||
return Err(ProxyError::UserExpired { user: user.to_string() });
|
return Err(ProxyError::UserExpired { user: user.to_string() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check connection limit
|
// Check connection limit
|
||||||
if let Some(limit) = self.config.user_max_tcp_conns.get(user) {
|
if let Some(limit) = config.user_max_tcp_conns.get(user) {
|
||||||
let current = self.stats.get_user_curr_connects(user);
|
let current = stats.get_user_curr_connects(user);
|
||||||
if current >= *limit as u64 {
|
if current >= *limit as u64 {
|
||||||
return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() });
|
return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check data quota
|
// Check data quota
|
||||||
if let Some(quota) = self.config.user_data_quota.get(user) {
|
if let Some(quota) = config.user_data_quota.get(user) {
|
||||||
let used = self.stats.get_user_total_octets(user);
|
let used = stats.get_user_total_octets(user);
|
||||||
if used >= *quota {
|
if used >= *quota {
|
||||||
return Err(ProxyError::DataQuotaExceeded { user: user.to_string() });
|
return Err(ProxyError::DataQuotaExceeded { user: user.to_string() });
|
||||||
}
|
}
|
||||||
@@ -316,11 +372,11 @@ impl ClientHandler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get datacenter address by index
|
/// Get datacenter address by index (static version)
|
||||||
fn get_dc_addr(&self, dc_idx: i16) -> Result<SocketAddr> {
|
fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
|
||||||
let idx = (dc_idx.abs() - 1) as usize;
|
let idx = (dc_idx.abs() - 1) as usize;
|
||||||
|
|
||||||
let datacenters = if self.config.prefer_ipv6 {
|
let datacenters = if config.prefer_ipv6 {
|
||||||
&*TG_DATACENTERS_V6
|
&*TG_DATACENTERS_V6
|
||||||
} else {
|
} else {
|
||||||
&*TG_DATACENTERS_V4
|
&*TG_DATACENTERS_V4
|
||||||
@@ -333,19 +389,18 @@ impl ClientHandler {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform handshake with Telegram server
|
/// Perform handshake with Telegram server (static version)
|
||||||
/// Returns crypto reader and writer for TG connection
|
async fn do_tg_handshake_static(
|
||||||
async fn do_tg_handshake(
|
|
||||||
&self,
|
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
success: &HandshakeSuccess,
|
success: &HandshakeSuccess,
|
||||||
|
config: &ProxyConfig,
|
||||||
) -> Result<(CryptoReader<tokio::net::tcp::OwnedReadHalf>, CryptoWriter<tokio::net::tcp::OwnedWriteHalf>)> {
|
) -> Result<(CryptoReader<tokio::net::tcp::OwnedReadHalf>, CryptoWriter<tokio::net::tcp::OwnedWriteHalf>)> {
|
||||||
// Generate nonce with keys for TG
|
// Generate nonce with keys for TG
|
||||||
let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce(
|
let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce(
|
||||||
success.proto_tag,
|
success.proto_tag,
|
||||||
&success.dec_key, // Client's dec key
|
&success.dec_key, // Client's dec key
|
||||||
success.dec_iv,
|
success.dec_iv,
|
||||||
self.config.fast_mode,
|
config.fast_mode,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Encrypt nonce
|
// Encrypt nonce
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ const MASK_BUFFER_SIZE: usize = 8192;
|
|||||||
|
|
||||||
/// Handle a bad client by forwarding to mask host
|
/// Handle a bad client by forwarding to mask host
|
||||||
pub async fn handle_bad_client(
|
pub async fn handle_bad_client(
|
||||||
mut client: TcpStream,
|
client: TcpStream,
|
||||||
initial_data: &[u8],
|
initial_data: &[u8],
|
||||||
config: &ProxyConfig,
|
config: &ProxyConfig,
|
||||||
) {
|
) {
|
||||||
|
|||||||
@@ -149,9 +149,9 @@ pub trait FrameCodec: Send + Sync {
|
|||||||
/// Create a frame codec for the given protocol tag
|
/// Create a frame codec for the given protocol tag
|
||||||
pub fn create_codec(proto_tag: ProtoTag) -> Box<dyn FrameCodec> {
|
pub fn create_codec(proto_tag: ProtoTag) -> Box<dyn FrameCodec> {
|
||||||
match proto_tag {
|
match proto_tag {
|
||||||
ProtoTag::Abridged => Box::new(super::frame_codec::AbridgedCodec::new()),
|
ProtoTag::Abridged => Box::new(crate::stream::frame_codec::AbridgedCodec::new()),
|
||||||
ProtoTag::Intermediate => Box::new(super::frame_codec::IntermediateCodec::new()),
|
ProtoTag::Intermediate => Box::new(crate::stream::frame_codec::IntermediateCodec::new()),
|
||||||
ProtoTag::Secure => Box::new(super::frame_codec::SecureCodec::new()),
|
ProtoTag::Secure => Box::new(crate::stream::frame_codec::SecureCodec::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,11 @@
|
|||||||
pub mod pool;
|
pub mod pool;
|
||||||
pub mod proxy_protocol;
|
pub mod proxy_protocol;
|
||||||
pub mod socket;
|
pub mod socket;
|
||||||
|
pub mod socks;
|
||||||
|
pub mod upstream;
|
||||||
|
|
||||||
pub use pool::ConnectionPool;
|
pub use pool::ConnectionPool;
|
||||||
pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol};
|
pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol};
|
||||||
pub use socket::*;
|
pub use socket::*;
|
||||||
|
pub use socks::*;
|
||||||
|
pub use upstream::UpstreamManager;
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
//! TCP Socket Configuration
|
//! TCP Socket Configuration
|
||||||
|
|
||||||
use std::io::Result;
|
use std::io::Result;
|
||||||
use std::net::SocketAddr;
|
use std::net::{SocketAddr, IpAddr};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
|
use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
|
||||||
@@ -93,6 +93,11 @@ pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
|
|||||||
|
|
||||||
/// Create a new TCP socket for outgoing connections
|
/// Create a new TCP socket for outgoing connections
|
||||||
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
|
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
|
||||||
|
create_outgoing_socket_bound(addr, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new TCP socket for outgoing connections, optionally bound to a specific interface
|
||||||
|
pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>) -> Result<Socket> {
|
||||||
let domain = if addr.is_ipv4() {
|
let domain = if addr.is_ipv4() {
|
||||||
Domain::IPV4
|
Domain::IPV4
|
||||||
} else {
|
} else {
|
||||||
@@ -107,9 +112,16 @@ pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
|
|||||||
// Disable Nagle
|
// Disable Nagle
|
||||||
socket.set_nodelay(true)?;
|
socket.set_nodelay(true)?;
|
||||||
|
|
||||||
|
if let Some(bind_ip) = bind_addr {
|
||||||
|
let bind_sock_addr = SocketAddr::new(bind_ip, 0);
|
||||||
|
socket.bind(&bind_sock_addr.into())?;
|
||||||
|
debug!("Bound outgoing socket to {}", bind_ip);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(socket)
|
Ok(socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Get local address of a socket
|
/// Get local address of a socket
|
||||||
pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
|
pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
|
||||||
stream.local_addr().ok()
|
stream.local_addr().ok()
|
||||||
|
|||||||
145
src/transport/socks.rs
Normal file
145
src/transport/socks.rs
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
//! SOCKS4/5 Client Implementation
|
||||||
|
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use crate::error::{ProxyError, Result};
|
||||||
|
|
||||||
|
pub async fn connect_socks4(
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
target: SocketAddr,
|
||||||
|
user_id: Option<&str>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let ip = match target.ip() {
|
||||||
|
IpAddr::V4(ip) => ip,
|
||||||
|
IpAddr::V6(_) => return Err(ProxyError::Proxy("SOCKS4 does not support IPv6".to_string())),
|
||||||
|
};
|
||||||
|
|
||||||
|
let port = target.port();
|
||||||
|
let user = user_id.unwrap_or("").as_bytes();
|
||||||
|
|
||||||
|
// VN (4) | CD (1) | DSTPORT (2) | DSTIP (4) | USERID (variable) | NULL (1)
|
||||||
|
let mut buf = Vec::with_capacity(9 + user.len());
|
||||||
|
buf.push(4); // VN
|
||||||
|
buf.push(1); // CD (CONNECT)
|
||||||
|
buf.extend_from_slice(&port.to_be_bytes());
|
||||||
|
buf.extend_from_slice(&ip.octets());
|
||||||
|
buf.extend_from_slice(user);
|
||||||
|
buf.push(0); // NULL
|
||||||
|
|
||||||
|
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
// Response: VN (1) | CD (1) | DSTPORT (2) | DSTIP (4)
|
||||||
|
let mut resp = [0u8; 8];
|
||||||
|
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
if resp[1] != 90 {
|
||||||
|
return Err(ProxyError::Proxy(format!("SOCKS4 request rejected: code {}", resp[1])));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect_socks5(
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
target: SocketAddr,
|
||||||
|
username: Option<&str>,
|
||||||
|
password: Option<&str>,
|
||||||
|
) -> Result<()> {
|
||||||
|
// 1. Auth negotiation
|
||||||
|
// VER (1) | NMETHODS (1) | METHODS (variable)
|
||||||
|
let mut methods = vec![0u8]; // No auth
|
||||||
|
if username.is_some() {
|
||||||
|
methods.push(2u8); // Username/Password
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = vec![5u8, methods.len() as u8];
|
||||||
|
buf.extend_from_slice(&methods);
|
||||||
|
|
||||||
|
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
let mut resp = [0u8; 2];
|
||||||
|
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
if resp[0] != 5 {
|
||||||
|
return Err(ProxyError::Proxy("Invalid SOCKS5 version".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
match resp[1] {
|
||||||
|
0 => {}, // No auth
|
||||||
|
2 => {
|
||||||
|
// Username/Password auth
|
||||||
|
if let (Some(u), Some(p)) = (username, password) {
|
||||||
|
let u_bytes = u.as_bytes();
|
||||||
|
let p_bytes = p.as_bytes();
|
||||||
|
|
||||||
|
let mut auth_buf = Vec::with_capacity(3 + u_bytes.len() + p_bytes.len());
|
||||||
|
auth_buf.push(1); // VER
|
||||||
|
auth_buf.push(u_bytes.len() as u8);
|
||||||
|
auth_buf.extend_from_slice(u_bytes);
|
||||||
|
auth_buf.push(p_bytes.len() as u8);
|
||||||
|
auth_buf.extend_from_slice(p_bytes);
|
||||||
|
|
||||||
|
stream.write_all(&auth_buf).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
let mut auth_resp = [0u8; 2];
|
||||||
|
stream.read_exact(&mut auth_resp).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
if auth_resp[1] != 0 {
|
||||||
|
return Err(ProxyError::Proxy("SOCKS5 authentication failed".to_string()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(ProxyError::Proxy("SOCKS5 server requires authentication".to_string()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => return Err(ProxyError::Proxy("Unsupported SOCKS5 auth method".to_string())),
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Connection request
|
||||||
|
// VER (1) | CMD (1) | RSV (1) | ATYP (1) | DST.ADDR (variable) | DST.PORT (2)
|
||||||
|
let mut req = vec![5u8, 1u8, 0u8]; // CONNECT
|
||||||
|
|
||||||
|
match target {
|
||||||
|
SocketAddr::V4(v4) => {
|
||||||
|
req.push(1u8); // IPv4
|
||||||
|
req.extend_from_slice(&v4.ip().octets());
|
||||||
|
},
|
||||||
|
SocketAddr::V6(v6) => {
|
||||||
|
req.push(4u8); // IPv6
|
||||||
|
req.extend_from_slice(&v6.ip().octets());
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
req.extend_from_slice(&target.port().to_be_bytes());
|
||||||
|
|
||||||
|
stream.write_all(&req).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
// Response
|
||||||
|
let mut head = [0u8; 4];
|
||||||
|
stream.read_exact(&mut head).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
|
||||||
|
if head[1] != 0 {
|
||||||
|
return Err(ProxyError::Proxy(format!("SOCKS5 request failed: code {}", head[1])));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip address part of response
|
||||||
|
match head[3] {
|
||||||
|
1 => { // IPv4
|
||||||
|
let mut addr = [0u8; 4 + 2];
|
||||||
|
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
},
|
||||||
|
3 => { // Domain
|
||||||
|
let mut len = [0u8; 1];
|
||||||
|
stream.read_exact(&mut len).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
let mut addr = vec![0u8; len[0] as usize + 2];
|
||||||
|
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
},
|
||||||
|
4 => { // IPv6
|
||||||
|
let mut addr = [0u8; 16 + 2];
|
||||||
|
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
|
||||||
|
},
|
||||||
|
_ => return Err(ProxyError::Proxy("Invalid address type in SOCKS5 response".to_string())),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
259
src/transport/upstream.rs
Normal file
259
src/transport/upstream.rs
Normal file
@@ -0,0 +1,259 @@
|
|||||||
|
//! Upstream Management
|
||||||
|
|
||||||
|
use std::net::{SocketAddr, IpAddr};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
|
use rand::Rng;
|
||||||
|
use tracing::{debug, warn, error, info};
|
||||||
|
|
||||||
|
use crate::config::{UpstreamConfig, UpstreamType};
|
||||||
|
use crate::error::{Result, ProxyError};
|
||||||
|
use crate::transport::socket::create_outgoing_socket_bound;
|
||||||
|
use crate::transport::socks::{connect_socks4, connect_socks5};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct UpstreamState {
|
||||||
|
config: UpstreamConfig,
|
||||||
|
healthy: bool,
|
||||||
|
fails: u32,
|
||||||
|
last_check: std::time::Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct UpstreamManager {
|
||||||
|
upstreams: Arc<RwLock<Vec<UpstreamState>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpstreamManager {
|
||||||
|
pub fn new(configs: Vec<UpstreamConfig>) -> Self {
|
||||||
|
let states = configs.into_iter()
|
||||||
|
.filter(|c| c.enabled)
|
||||||
|
.map(|c| UpstreamState {
|
||||||
|
config: c,
|
||||||
|
healthy: true, // Optimistic start
|
||||||
|
fails: 0,
|
||||||
|
last_check: std::time::Instant::now(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
upstreams: Arc::new(RwLock::new(states)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Select an upstream using Weighted Round Robin (simplified)
|
||||||
|
async fn select_upstream(&self) -> Option<usize> {
|
||||||
|
let upstreams = self.upstreams.read().await;
|
||||||
|
if upstreams.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let healthy_indices: Vec<usize> = upstreams.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, u)| u.healthy)
|
||||||
|
.map(|(i, _)| i)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if healthy_indices.is_empty() {
|
||||||
|
// If all unhealthy, try any random one
|
||||||
|
return Some(rand::thread_rng().gen_range(0..upstreams.len()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Weighted selection
|
||||||
|
let total_weight: u32 = healthy_indices.iter()
|
||||||
|
.map(|&i| upstreams[i].config.weight as u32)
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
if total_weight == 0 {
|
||||||
|
return Some(healthy_indices[rand::thread_rng().gen_range(0..healthy_indices.len())]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut choice = rand::thread_rng().gen_range(0..total_weight);
|
||||||
|
|
||||||
|
for &idx in &healthy_indices {
|
||||||
|
let weight = upstreams[idx].config.weight as u32;
|
||||||
|
if choice < weight {
|
||||||
|
return Some(idx);
|
||||||
|
}
|
||||||
|
choice -= weight;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(healthy_indices[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect(&self, target: SocketAddr) -> Result<TcpStream> {
|
||||||
|
let idx = self.select_upstream().await
|
||||||
|
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
|
||||||
|
|
||||||
|
let upstream = {
|
||||||
|
let guard = self.upstreams.read().await;
|
||||||
|
guard[idx].config.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
match self.connect_via_upstream(&upstream, target).await {
|
||||||
|
Ok(stream) => {
|
||||||
|
// Mark success
|
||||||
|
let mut guard = self.upstreams.write().await;
|
||||||
|
if let Some(u) = guard.get_mut(idx) {
|
||||||
|
if !u.healthy {
|
||||||
|
debug!("Upstream recovered: {:?}", u.config);
|
||||||
|
}
|
||||||
|
u.healthy = true;
|
||||||
|
u.fails = 0;
|
||||||
|
}
|
||||||
|
Ok(stream)
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// Mark failure
|
||||||
|
let mut guard = self.upstreams.write().await;
|
||||||
|
if let Some(u) = guard.get_mut(idx) {
|
||||||
|
u.fails += 1;
|
||||||
|
warn!("Failed to connect via upstream {:?}: {}. Fails: {}", u.config, e, u.fails);
|
||||||
|
if u.fails > 3 {
|
||||||
|
u.healthy = false;
|
||||||
|
warn!("Upstream disabled due to failures: {:?}", u.config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_via_upstream(&self, config: &UpstreamConfig, target: SocketAddr) -> Result<TcpStream> {
|
||||||
|
match &config.upstream_type {
|
||||||
|
UpstreamType::Direct { interface } => {
|
||||||
|
let bind_ip = interface.as_ref()
|
||||||
|
.and_then(|s| s.parse::<IpAddr>().ok());
|
||||||
|
|
||||||
|
let socket = create_outgoing_socket_bound(target, bind_ip)?;
|
||||||
|
|
||||||
|
// Non-blocking connect logic
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
match socket.connect(&target.into()) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
|
||||||
|
Err(err) => return Err(ProxyError::Io(err)),
|
||||||
|
}
|
||||||
|
|
||||||
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
|
let stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
|
// Wait for connection to complete
|
||||||
|
stream.writable().await?;
|
||||||
|
if let Some(e) = stream.take_error()? {
|
||||||
|
return Err(ProxyError::Io(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(stream)
|
||||||
|
},
|
||||||
|
UpstreamType::Socks4 { address, interface, user_id } => {
|
||||||
|
info!("Connecting to target {} via SOCKS4 proxy {}", target, address);
|
||||||
|
|
||||||
|
let proxy_addr: SocketAddr = address.parse()
|
||||||
|
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
|
||||||
|
|
||||||
|
let bind_ip = interface.as_ref()
|
||||||
|
.and_then(|s| s.parse::<IpAddr>().ok());
|
||||||
|
|
||||||
|
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
||||||
|
|
||||||
|
// Non-blocking connect logic
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
match socket.connect(&proxy_addr.into()) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
|
||||||
|
Err(err) => return Err(ProxyError::Io(err)),
|
||||||
|
}
|
||||||
|
|
||||||
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
|
let mut stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
|
// Wait for connection to complete
|
||||||
|
stream.writable().await?;
|
||||||
|
if let Some(e) = stream.take_error()? {
|
||||||
|
return Err(ProxyError::Io(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
connect_socks4(&mut stream, target, user_id.as_deref()).await?;
|
||||||
|
Ok(stream)
|
||||||
|
},
|
||||||
|
UpstreamType::Socks5 { address, interface, username, password } => {
|
||||||
|
info!("Connecting to target {} via SOCKS5 proxy {}", target, address);
|
||||||
|
|
||||||
|
let proxy_addr: SocketAddr = address.parse()
|
||||||
|
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
|
||||||
|
|
||||||
|
let bind_ip = interface.as_ref()
|
||||||
|
.and_then(|s| s.parse::<IpAddr>().ok());
|
||||||
|
|
||||||
|
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
|
||||||
|
|
||||||
|
// Non-blocking connect logic
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
match socket.connect(&proxy_addr.into()) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
|
||||||
|
Err(err) => return Err(ProxyError::Io(err)),
|
||||||
|
}
|
||||||
|
|
||||||
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
|
let mut stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
|
// Wait for connection to complete
|
||||||
|
stream.writable().await?;
|
||||||
|
if let Some(e) = stream.take_error()? {
|
||||||
|
return Err(ProxyError::Io(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?;
|
||||||
|
Ok(stream)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Background task to check health
|
||||||
|
pub async fn run_health_checks(&self) {
|
||||||
|
// Simple TCP connect check to a known stable DC (e.g. 149.154.167.50:443 - DC2)
|
||||||
|
let check_target: SocketAddr = "149.154.167.50:443".parse().unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||||
|
|
||||||
|
let count = self.upstreams.read().await.len();
|
||||||
|
for i in 0..count {
|
||||||
|
let config = {
|
||||||
|
let guard = self.upstreams.read().await;
|
||||||
|
guard[i].config.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = tokio::time::timeout(
|
||||||
|
Duration::from_secs(10),
|
||||||
|
self.connect_via_upstream(&config, check_target)
|
||||||
|
).await;
|
||||||
|
|
||||||
|
let mut guard = self.upstreams.write().await;
|
||||||
|
let u = &mut guard[i];
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(Ok(_stream)) => {
|
||||||
|
if !u.healthy {
|
||||||
|
debug!("Upstream recovered: {:?}", u.config);
|
||||||
|
}
|
||||||
|
u.healthy = true;
|
||||||
|
u.fails = 0;
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!("Health check failed for {:?}: {}", u.config, e);
|
||||||
|
// Don't mark unhealthy immediately in background check
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!("Health check timeout for {:?}", u.config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
u.last_check = std::time::Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
//! IP Addr Detect
|
//! IP Addr Detect
|
||||||
|
|
||||||
use std::net::IpAddr;
|
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
@@ -40,28 +40,74 @@ const IPV6_URLS: &[&str] = &[
|
|||||||
"http://api6.ipify.org/",
|
"http://api6.ipify.org/",
|
||||||
];
|
];
|
||||||
|
|
||||||
|
/// Detect local IP address by connecting to a public DNS
|
||||||
|
/// This does not actually send any packets
|
||||||
|
fn get_local_ip(target: &str) -> Option<IpAddr> {
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").ok()?;
|
||||||
|
socket.connect(target).ok()?;
|
||||||
|
socket.local_addr().ok().map(|addr| addr.ip())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_local_ipv6(target: &str) -> Option<IpAddr> {
|
||||||
|
let socket = UdpSocket::bind("[::]:0").ok()?;
|
||||||
|
socket.connect(target).ok()?;
|
||||||
|
socket.local_addr().ok().map(|addr| addr.ip())
|
||||||
|
}
|
||||||
|
|
||||||
/// Detect public IP addresses
|
/// Detect public IP addresses
|
||||||
pub async fn detect_ip() -> IpInfo {
|
pub async fn detect_ip() -> IpInfo {
|
||||||
let mut info = IpInfo::default();
|
let mut info = IpInfo::default();
|
||||||
|
|
||||||
// Detect IPv4
|
// Try to get local interface IP first (default gateway interface)
|
||||||
for url in IPV4_URLS {
|
// We connect to Google DNS to find out which interface is used for routing
|
||||||
if let Some(ip) = fetch_ip(url).await {
|
if let Some(ip) = get_local_ip("8.8.8.8:80") {
|
||||||
if ip.is_ipv4() {
|
if ip.is_ipv4() && !ip.is_loopback() {
|
||||||
info.ipv4 = Some(ip);
|
info.ipv4 = Some(ip);
|
||||||
debug!(ip = %ip, "Detected IPv4 address");
|
debug!(ip = %ip, "Detected local IPv4 address via routing");
|
||||||
break;
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80") {
|
||||||
|
if ip.is_ipv6() && !ip.is_loopback() {
|
||||||
|
info.ipv6 = Some(ip);
|
||||||
|
debug!(ip = %ip, "Detected local IPv6 address via routing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If local detection failed or returned private IP (and we want public),
|
||||||
|
// or just as a fallback/verification, we might want to check external services.
|
||||||
|
// However, the requirement is: "if IP for listening is not set... it should be IP from interface...
|
||||||
|
// if impossible - request external resources".
|
||||||
|
|
||||||
|
// So if we found a local IP, we might be good. But often servers are behind NAT.
|
||||||
|
// If the local IP is private, we probably want the public IP for the tg:// link.
|
||||||
|
// Let's check if the detected IPs are private.
|
||||||
|
|
||||||
|
let need_external_v4 = info.ipv4.map_or(true, |ip| is_private_ip(ip));
|
||||||
|
let need_external_v6 = info.ipv6.map_or(true, |ip| is_private_ip(ip));
|
||||||
|
|
||||||
|
if need_external_v4 {
|
||||||
|
debug!("Local IPv4 is private or missing, checking external services...");
|
||||||
|
for url in IPV4_URLS {
|
||||||
|
if let Some(ip) = fetch_ip(url).await {
|
||||||
|
if ip.is_ipv4() {
|
||||||
|
info.ipv4 = Some(ip);
|
||||||
|
debug!(ip = %ip, "Detected public IPv4 address");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Detect IPv6
|
if need_external_v6 {
|
||||||
for url in IPV6_URLS {
|
debug!("Local IPv6 is private or missing, checking external services...");
|
||||||
if let Some(ip) = fetch_ip(url).await {
|
for url in IPV6_URLS {
|
||||||
if ip.is_ipv6() {
|
if let Some(ip) = fetch_ip(url).await {
|
||||||
info.ipv6 = Some(ip);
|
if ip.is_ipv6() {
|
||||||
debug!(ip = %ip, "Detected IPv6 address");
|
info.ipv6 = Some(ip);
|
||||||
break;
|
debug!(ip = %ip, "Detected public IPv6 address");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -73,6 +119,17 @@ pub async fn detect_ip() -> IpInfo {
|
|||||||
info
|
info
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_private_ip(ip: IpAddr) -> bool {
|
||||||
|
match ip {
|
||||||
|
IpAddr::V4(ipv4) => {
|
||||||
|
ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local()
|
||||||
|
}
|
||||||
|
IpAddr::V6(ipv6) => {
|
||||||
|
ipv6.is_loopback() || (ipv6.segments()[0] & 0xfe00) == 0xfc00 // Unique Local
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Fetch IP from URL
|
/// Fetch IP from URL
|
||||||
async fn fetch_ip(url: &str) -> Option<IpAddr> {
|
async fn fetch_ip(url: &str) -> Option<IpAddr> {
|
||||||
let client = reqwest::Client::builder()
|
let client = reqwest::Client::builder()
|
||||||
|
|||||||
Reference in New Issue
Block a user