15 Commits

Author SHA1 Message Date
Alexey
be1b2db867 Update README.md 2026-01-08 02:10:34 +03:00
Alexey
8fbee8701b Update README.md 2026-01-08 02:10:02 +03:00
Alexey
952d160870 Update README.md 2026-01-08 02:03:30 +03:00
Alexey
91ae6becde Update README.md 2026-01-08 02:01:50 +03:00
Alexey
e1f576e4fe Update README.md 2026-01-08 02:00:27 +03:00
Alexey
a7556cabdc Update README.md 2026-01-07 19:12:16 +03:00
Alexey
b2e8d16bb1 Update README.md 2026-01-07 19:10:04 +03:00
Alexey
d95e762812 Update README.md 2026-01-07 19:07:08 +03:00
Alexey
384f927fc3 Update README.md 2026-01-07 19:06:28 +03:00
Alexey
1b7c09ae18 Update README.md 2026-01-07 18:54:44 +03:00
Alexey
85cb4092d5 1.0.2.0 2026-01-07 18:16:01 +03:00
Alexey
5016160ac3 1.0.1.2 2026-01-07 17:42:30 +03:00
Alexey
4f007f3128 1.0.1.1
Drafting Upstreams and SOCKS
2026-01-07 17:22:10 +03:00
Alexey
7746a1177c Update README.md 2026-01-06 15:10:14 +03:00
Alexey
2bb2a2983f Update README.md 2026-01-06 14:57:52 +03:00
12 changed files with 1008 additions and 287 deletions

171
README.md
View File

@@ -5,13 +5,21 @@
# GOTO # GOTO
- [Features](#features) - [Features](#features)
- [Quick Start Guide](#quick-start-guide) - [Quick Start Guide](#quick-start-guide)
- [Build](#build)
- [How to use?](#how-to-use) - [How to use?](#how-to-use)
- [Systemd Method](#telemt-via-systemd) - [Systemd Method](#telemt-via-systemd)
- [Configuration](#configuration)
- [Minimal Configuration](#minimal-configuration-for-first-start)
- [Advanced](#advanced)
- [Adtag](#adtag)
- [Listening and Announce IPs](#listening-and-announce-ips)
- [Upstream Manager](#upstream-manager)
- [IP](#bind-on-ip)
- [SOCKS](#socks45-as-upstream)
- [FAQ](#faq) - [FAQ](#faq)
- [Telegram Calls](#telegram-calls-via-mtproxy) - [Telegram Calls](#telegram-calls-via-mtproxy)
- [DPI](#how-does-dpi-see-mtproxy-tls) - [DPI](#how-does-dpi-see-mtproxy-tls)
- [Whitelist on Network Level](#whitelist-on-ip) - [Whitelist on Network Level](#whitelist-on-ip)
- [Build](#build)
- [Why Rust?](#why-rust) - [Why Rust?](#why-rust)
## Features ## Features
@@ -27,25 +35,27 @@
- Extensive logging via `trace` and `debug` with `RUST_LOG` method - Extensive logging via `trace` and `debug` with `RUST_LOG` method
## Quick Start Guide ## Quick Start Guide
**This software is designed for Debian-based OS: in addition to Debian, these are Ubuntu, Mint, Kali, MX and many other Linux**
### Build 1. Download release
```bash ```bash
# Cloning repo wget https://github.com/telemt/telemt/releases/latest/download/telemt
git clone https://github.com/telemt/telemt
# Changing Directory to telemt
cd telemt
# Starting Release Build
cargo build --release
# Move to /bin
mv ./target/release/telemt /bin
# Make executable
chmod +x /bin/telemt
# Lets go!
telemt config.toml
``` ```
2. Move to Bin Folder
```bash
mv telemt /bin
```
4. Make Executable
```bash
chmod +x /bin/telemt
```
5. Go to [How to use?](#how-to-use) section for for further steps
## How to use? ## How to use?
### Telemt via Systemd ### Telemt via Systemd
**This instruction "assume" that you:**
- logged in as root or executed `su -` / `sudo su`
- you already have an assembled and executable `telemt` in /bin folder as a result of the [Quick Start Guide](#quick-start-guide) or [Build](#build)
**0. Check port and generate secrets** **0. Check port and generate secrets**
The port you have selected for use should be MISSING from the list, when: The port you have selected for use should be MISSING from the list, when:
@@ -57,6 +67,14 @@ Generate 16 bytes/32 characters HEX with OpenSSL or another way:
```bash ```bash
openssl rand -hex 16 openssl rand -hex 16
``` ```
OR
```bash
xxd -l 16 -p /dev/urandom
```
OR
```bash
python3 -c 'import os; print(os.urandom(16).hex())'
```
**1. Place your config to /etc/telemt.toml** **1. Place your config to /etc/telemt.toml**
@@ -64,28 +82,8 @@ Open nano
```bash ```bash
nano /etc/telemt.toml nano /etc/telemt.toml
``` ```
```bash paste your config from [Configuration](#configuration) section
port = 443 # Listening port
[users]
hello = "00000000000000000000000000000000" # Replace the secret with one generated before
[modes]
classic = false # Plain obfuscated mode
secure = false # dd-prefix mode
tls = true # Fake TLS - ee-prefix
tls_domain = "petrovich.ru" # Domain for ee-secret and masking
mask = true # Enable masking of bad traffic
mask_host = "petrovich.ru" # Optional override for mask destination
mask_port = 443 # Port for masking
prefer_ipv6 = false # Try IPv6 DCs first if true
fast_mode = true # Use "fast" obfuscation variant
client_keepalive = 600 # Seconds
client_ack_timeout = 300 # Seconds
```
then Ctrl+X -> Y -> Enter to save then Ctrl+X -> Y -> Enter to save
**2. Create service on /etc/systemd/system/telemt.service** **2. Create service on /etc/systemd/system/telemt.service**
@@ -117,9 +115,79 @@ then Ctrl+X -> Y -> Enter to save
**5.** In Shell type `systemctl enable telemt` - then telemt will start with system startup, after the network is up **5.** In Shell type `systemctl enable telemt` - then telemt will start with system startup, after the network is up
## Configuration
### Minimal Configuration for First Start
```toml
port = 443 # Listening port
show_links = ["tele", "hello"] # Specify users, for whom will be displayed the links
[users]
tele = "00000000000000000000000000000000" # Replace the secret with one generated before
hello = "00000000000000000000000000000000" # Replace the secret with one generated before
[modes]
classic = false # Plain obfuscated mode
secure = false # dd-prefix mode
tls = true # Fake TLS - ee-prefix
tls_domain = "petrovich.ru" # Domain for ee-secret and masking
mask = true # Enable masking of bad traffic
mask_host = "petrovich.ru" # Optional override for mask destination
mask_port = 443 # Port for masking
prefer_ipv6 = false # Try IPv6 DCs first if true
fast_mode = true # Use "fast" obfuscation variant
client_keepalive = 600 # Seconds
client_ack_timeout = 300 # Seconds
```
### Advanced
#### Adtag
To use channel advertising and usage statistics from Telegram, get Adtag from [@mtproxybot](https://t.me/mtproxybot), add this parameter to the end of config.toml and specify it
```toml
ad_tag = "00000000000000000000000000000000" # Replace zeros to your adtag from @mtproxybot
```
#### Listening and Announce IPs
To specify listening address and/or address in links, add to the end of config.toml:
```toml
[[listeners]]
ip = "0.0.0.0" # 0.0.0.0 = all IPs; your IP = specific listening
announce_ip = "1.2.3.4" # IP in links; comment with # if not used
```
#### Upstream Manager
To specify upstream, add to the end of config.toml:
##### Bind on IP
```toml
[[upstreams]]
type = "direct"
weight = 1
enabled = true
interface = "192.168.1.100" # Change to your outgoing IP
```
##### SOCKS4/5 as Upstream
- Without Auth:
```toml
[[upstreams]]
type = "socks5" # Specify SOCKS4 or SOCKS5
address = "1.2.3.4:1234" # SOCKS-server Address
weight = 1 # Set Weight for Scenarios
enabled = true
```
- With Auth:
```toml
[[upstreams]]
type = "socks5" # Specify SOCKS4 or SOCKS5
address = "1.2.3.4:1234" # SOCKS-server Address
username = "user" # Username for Auth on SOCKS-server
password = "pass" # Password for Auth on SOCKS-server
weight = 1 # Set Weight for Scenarios
enabled = true
```
## FAQ ## FAQ
### Telegram Calls via MTProxy ### Telegram Calls via MTProxy
- Telegram architecture does **NOT allow calls via MTProxy**, but only via SOCKS5, which cannot be obfuscated - Telegram architecture **does NOT allow calls via MTProxy**, but only via SOCKS5, which cannot be obfuscated
### How does DPI see MTProxy TLS? ### How does DPI see MTProxy TLS?
- DPI sees MTProxy in Fake TLS (ee) mode as TLS 1.3 - DPI sees MTProxy in Fake TLS (ee) mode as TLS 1.3
- the SNI you specify sends both the client and the server; - the SNI you specify sends both the client and the server;
@@ -127,11 +195,34 @@ then Ctrl+X -> Y -> Enter to save
- high entropy, which is normal for AES-encrypted traffic; - high entropy, which is normal for AES-encrypted traffic;
### Whitelist on IP ### Whitelist on IP
- MTProxy cannot work when there is: - MTProxy cannot work when there is:
- no IP connectivity to the target host - no IP connectivity to the target host: Russian Whitelist on Mobile Networks - "Белый список"
- OR all TCP traffic is blocked - OR all TCP traffic is blocked
- OR all TLS traffic is blocked, - OR high entropy/encrypted traffic is blocked: content filters at universities and critical infrastructure
- OR all TLS traffic is blocked
- OR specified port is blocked: use 443 to make it "like real"
- OR provided SNI is blocked: use "officially approved"/innocuous name
- like most protocols on the Internet; - like most protocols on the Internet;
- this situation is observed in China behind the Great Chinese Firewall and in Russia on mobile networks - these situations are observed:
- in China behind the Great Firewall
- in Russia on mobile networks, less in wired networks
- in Iran during "activity"
## Build
```bash
# Cloning repo
git clone https://github.com/telemt/telemt
# Changing Directory to telemt
cd telemt
# Starting Release Build
cargo build --release
# Move to /bin
mv ./target/release/telemt /bin
# Make executable
chmod +x /bin/telemt
# Lets go!
telemt config.toml
```
## Why Rust? ## Why Rust?
- Long-running reliability and idempotent behavior - Long-running reliability and idempotent behavior

View File

@@ -18,6 +18,7 @@ pub struct ProxyModes {
} }
fn default_true() -> bool { true } fn default_true() -> bool { true }
fn default_weight() -> u16 { 1 }
impl Default for ProxyModes { impl Default for ProxyModes {
fn default() -> Self { fn default() -> Self {
@@ -25,6 +26,48 @@ impl Default for ProxyModes {
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum UpstreamType {
Direct {
#[serde(default)]
interface: Option<String>, // Bind to specific IP/Interface
},
Socks4 {
address: String, // IP:Port of SOCKS server
#[serde(default)]
interface: Option<String>, // Bind to specific IP/Interface for connection to SOCKS
#[serde(default)]
user_id: Option<String>,
},
Socks5 {
address: String,
#[serde(default)]
interface: Option<String>,
#[serde(default)]
username: Option<String>,
#[serde(default)]
password: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpstreamConfig {
#[serde(flatten)]
pub upstream_type: UpstreamType,
#[serde(default = "default_weight")]
pub weight: u16,
#[serde(default = "default_true")]
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenerConfig {
pub ip: IpAddr,
#[serde(default)]
pub announce_ip: Option<IpAddr>, // IP to show in tg:// links
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyConfig { pub struct ProxyConfig {
#[serde(default = "default_port")] #[serde(default = "default_port")]
@@ -104,6 +147,16 @@ pub struct ProxyConfig {
#[serde(default = "default_fake_cert_len")] #[serde(default = "default_fake_cert_len")]
pub fake_cert_len: usize, pub fake_cert_len: usize,
// New fields
#[serde(default)]
pub upstreams: Vec<UpstreamConfig>,
#[serde(default)]
pub listeners: Vec<ListenerConfig>,
#[serde(default)]
pub show_link: Vec<String>,
} }
fn default_port() -> u16 { 443 } fn default_port() -> u16 { 443 }
@@ -156,6 +209,9 @@ impl Default for ProxyConfig {
metrics_port: None, metrics_port: None,
metrics_whitelist: default_metrics_whitelist(), metrics_whitelist: default_metrics_whitelist(),
fake_cert_len: default_fake_cert_len(), fake_cert_len: default_fake_cert_len(),
upstreams: Vec::new(),
listeners: Vec::new(),
show_link: Vec::new(),
} }
} }
} }
@@ -187,6 +243,33 @@ impl ProxyConfig {
use rand::Rng; use rand::Rng;
config.fake_cert_len = rand::thread_rng().gen_range(1024..4096); config.fake_cert_len = rand::thread_rng().gen_range(1024..4096);
// Migration: Populate listeners if empty
if config.listeners.is_empty() {
if let Ok(ipv4) = config.listen_addr_ipv4.parse::<IpAddr>() {
config.listeners.push(ListenerConfig {
ip: ipv4,
announce_ip: None,
});
}
if let Some(ipv6_str) = &config.listen_addr_ipv6 {
if let Ok(ipv6) = ipv6_str.parse::<IpAddr>() {
config.listeners.push(ListenerConfig {
ip: ipv6,
announce_ip: None,
});
}
}
}
// Migration: Populate upstreams if empty (Default Direct)
if config.upstreams.is_empty() {
config.upstreams.push(UpstreamConfig {
upstream_type: UpstreamType::Direct { interface: None },
weight: 1,
enabled: true,
});
}
Ok(config) Ok(config)
} }
@@ -202,26 +285,3 @@ impl ProxyConfig {
Ok(()) Ok(())
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = ProxyConfig::default();
assert_eq!(config.port, 443);
assert!(config.modes.tls);
assert_eq!(config.client_keepalive, 600);
assert_eq!(config.client_ack_timeout, 300);
}
#[test]
fn test_config_validate() {
let mut config = ProxyConfig::default();
assert!(config.validate().is_ok());
config.users.clear();
assert!(config.validate().is_err());
}
}

View File

@@ -235,6 +235,9 @@ pub enum ProxyError {
#[error("Invalid proxy protocol header")] #[error("Invalid proxy protocol header")]
InvalidProxyProtocol, InvalidProxyProtocol,
#[error("Proxy error: {0}")]
Proxy(String),
// ============= Config Errors ============= // ============= Config Errors =============
#[error("Config error: {0}")] #[error("Config error: {0}")]

View File

@@ -1,158 +1,193 @@
//! Telemt - MTProxy on Rust //! Telemt - MTProxy on Rust
use std::sync::Arc;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
use tracing::{info, error, Level}; use tracing::{info, error, warn};
use tracing_subscriber::{FmtSubscriber, EnvFilter}; use tracing_subscriber::{fmt, EnvFilter};
mod error; mod config;
mod crypto; mod crypto;
mod error;
mod protocol; mod protocol;
mod proxy;
mod stats;
mod stream; mod stream;
mod transport; mod transport;
mod proxy;
mod config;
mod stats;
mod util; mod util;
use config::ProxyConfig; use crate::config::ProxyConfig;
use stats::{Stats, ReplayChecker}; use crate::proxy::ClientHandler;
use transport::ConnectionPool; use crate::stats::Stats;
use proxy::ClientHandler; use crate::transport::{create_listener, ListenOptions, UpstreamManager};
use crate::util::ip::detect_ip;
#[tokio::main] #[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging with env filter // Initialize logging
// Use RUST_LOG=debug or RUST_LOG=trace for more details fmt()
let filter = EnvFilter::try_from_default_env() .with_env_filter(EnvFilter::from_default_env().add_directive("info".parse().unwrap()))
.unwrap_or_else(|_| EnvFilter::new("info")); .init();
let subscriber = FmtSubscriber::builder() // Load config
.with_env_filter(filter) let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.toml".to_string());
.with_target(true) let config = match ProxyConfig::load(&config_path) {
.with_thread_ids(false) Ok(c) => c,
.with_file(false) Err(e) => {
.with_line_number(false) // If config doesn't exist, try to create default
.finish(); if std::path::Path::new(&config_path).exists() {
error!("Failed to load config: {}", e);
tracing::subscriber::set_global_default(subscriber)?; std::process::exit(1);
} else {
// Load configuration let default = ProxyConfig::default();
let config_path = std::env::args() let toml = toml::to_string_pretty(&default).unwrap();
.nth(1) std::fs::write(&config_path, toml).unwrap();
.unwrap_or_else(|| "config.toml".to_string()); info!("Created default config at {}", config_path);
default
info!("Loading configuration from {}", config_path);
let config = ProxyConfig::load(&config_path).unwrap_or_else(|e| {
error!("Failed to load config: {}", e);
info!("Using default configuration");
ProxyConfig::default()
});
if let Err(e) = config.validate() {
error!("Invalid configuration: {}", e);
std::process::exit(1);
}
let config = Arc::new(config);
info!("Starting MTProto Proxy on port {}", config.port);
info!("Fast mode: {}", config.fast_mode);
info!("Modes: classic={}, secure={}, tls={}",
config.modes.classic, config.modes.secure, config.modes.tls);
// Initialize components
let stats = Arc::new(Stats::new());
let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len));
let pool = Arc::new(ConnectionPool::new());
// Create handler
let handler = Arc::new(ClientHandler::new(
Arc::clone(&config),
Arc::clone(&stats),
Arc::clone(&replay_checker),
Arc::clone(&pool),
));
// Start listener
let addr: SocketAddr = format!("{}:{}", config.listen_addr_ipv4, config.port)
.parse()?;
let listener = TcpListener::bind(addr).await?;
info!("Listening on {}", addr);
// Print proxy links
print_proxy_links(&config);
info!("Use RUST_LOG=debug or RUST_LOG=trace for more detailed logging");
// Main accept loop
let accept_loop = async {
loop {
match listener.accept().await {
Ok((stream, peer)) => {
let handler = Arc::clone(&handler);
tokio::spawn(async move {
handler.handle(stream, peer).await;
});
}
Err(e) => {
error!("Accept error: {}", e);
}
} }
} }
}; };
// Graceful shutdown config.validate()?;
tokio::select! {
_ = accept_loop => {} let config = Arc::new(config);
_ = signal::ctrl_c() => { let stats = Arc::new(Stats::new());
info!("Shutting down...");
// Initialize Upstream Manager
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
// Start Health Checks
let um_clone = upstream_manager.clone();
tokio::spawn(async move {
um_clone.run_health_checks().await;
});
// Detect public IP if needed (once at startup)
let detected_ip = detect_ip().await;
// Start Listeners
let mut listeners = Vec::new();
for listener_conf in &config.listeners {
let addr = SocketAddr::new(listener_conf.ip, config.port);
let options = ListenOptions {
ipv6_only: listener_conf.ip.is_ipv6(),
..Default::default()
};
match create_listener(addr, &options) {
Ok(socket) => {
let listener = TcpListener::from_std(socket.into())?;
info!("Listening on {}", addr);
// Determine public IP for tg:// links
// 1. Use explicit announce_ip if set
// 2. If listening on 0.0.0.0 or ::, use detected public IP
// 3. Otherwise use the bind IP
let public_ip = if let Some(ip) = listener_conf.announce_ip {
ip
} else if listener_conf.ip.is_unspecified() {
// Try to use detected IP of the same family
if listener_conf.ip.is_ipv4() {
detected_ip.ipv4.unwrap_or(listener_conf.ip)
} else {
detected_ip.ipv6.unwrap_or(listener_conf.ip)
}
} else {
listener_conf.ip
};
// Show links for configured users
if !config.show_link.is_empty() {
info!("--- Proxy Links for {} ---", public_ip);
for user_name in &config.show_link {
if let Some(secret) = config.users.get(user_name) {
info!("User: {}", user_name);
// Classic
if config.modes.classic {
info!(" Classic: tg://proxy?server={}&port={}&secret={}",
public_ip, config.port, secret);
}
// DD (Secure)
if config.modes.secure {
info!(" DD: tg://proxy?server={}&port={}&secret=dd{}",
public_ip, config.port, secret);
}
// EE-TLS (FakeTLS)
if config.modes.tls {
let domain_hex = hex::encode(&config.tls_domain);
info!(" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
public_ip, config.port, secret, domain_hex);
}
} else {
warn!("User '{}' specified in show_link not found in users list", user_name);
}
}
info!("-----------------------------------");
}
listeners.push(listener);
},
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
}
} }
} }
// Cleanup if listeners.is_empty() {
pool.close_all().await; error!("No listeners could be started. Exiting.");
std::process::exit(1);
}
// Accept loop
// For simplicity in this slice, we just spawn a task for each listener
// In a real high-perf scenario, we might want a more complex accept loop
for listener in listeners {
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
tokio::spawn(async move {
if let Err(e) = ClientHandler::new(
stream,
peer_addr,
config,
stats,
upstream_manager
).run().await {
// Log only relevant errors
// debug!("Connection error: {}", e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
});
}
// Wait for signal
match signal::ctrl_c().await {
Ok(()) => info!("Shutting down..."),
Err(e) => error!("Signal error: {}", e),
}
info!("Goodbye!");
Ok(()) Ok(())
} }
fn print_proxy_links(config: &ProxyConfig) {
println!("\n=== Proxy Links ===\n");
for (user, secret) in &config.users {
if config.modes.tls {
let tls_secret = format!(
"ee{}{}",
secret,
hex::encode(config.tls_domain.as_bytes())
);
println!(
"{} (TLS): tg://proxy?server=IP&port={}&secret={}",
user, config.port, tls_secret
);
}
if config.modes.secure {
println!(
"{} (Secure): tg://proxy?server=IP&port={}&secret=dd{}",
user, config.port, secret
);
}
if config.modes.classic {
println!(
"{} (Classic): tg://proxy?server=IP&port={}&secret={}",
user, config.port, secret
);
}
println!();
}
println!("===================\n");
}

View File

@@ -13,7 +13,7 @@ use crate::error::{ProxyError, Result, HandshakeResult};
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::protocol::tls; use crate::protocol::tls;
use crate::stats::{Stats, ReplayChecker}; use crate::stats::{Stats, ReplayChecker};
use crate::transport::{ConnectionPool, configure_client_socket}; use crate::transport::{configure_client_socket, UpstreamManager};
use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter}; use crate::stream::{CryptoReader, CryptoWriter, FakeTlsReader, FakeTlsWriter};
use crate::crypto::AesCtr; use crate::crypto::AesCtr;
@@ -24,39 +24,55 @@ use super::handshake::{
use super::relay::relay_bidirectional; use super::relay::relay_bidirectional;
use super::masking::handle_bad_client; use super::masking::handle_bad_client;
/// Client connection handler /// Client connection handler (builder struct)
pub struct ClientHandler { pub struct ClientHandler;
/// Running client handler with stream and context
pub struct RunningClientHandler {
stream: TcpStream,
peer: SocketAddr,
config: Arc<ProxyConfig>, config: Arc<ProxyConfig>,
stats: Arc<Stats>, stats: Arc<Stats>,
replay_checker: Arc<ReplayChecker>, replay_checker: Arc<ReplayChecker>,
pool: Arc<ConnectionPool>, upstream_manager: Arc<UpstreamManager>,
} }
impl ClientHandler { impl ClientHandler {
/// Create new client handler /// Create new client handler instance
pub fn new( pub fn new(
stream: TcpStream,
peer: SocketAddr,
config: Arc<ProxyConfig>, config: Arc<ProxyConfig>,
stats: Arc<Stats>, stats: Arc<Stats>,
replay_checker: Arc<ReplayChecker>, upstream_manager: Arc<UpstreamManager>,
pool: Arc<ConnectionPool>, ) -> RunningClientHandler {
) -> Self { // Note: ReplayChecker should be shared globally for proper replay protection
Self { // Creating it per-connection disables replay protection across connections
// TODO: Pass Arc<ReplayChecker> from main.rs
let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len));
RunningClientHandler {
stream,
peer,
config, config,
stats, stats,
replay_checker, replay_checker,
pool, upstream_manager,
} }
} }
}
/// Handle a client connection impl RunningClientHandler {
pub async fn handle(&self, stream: TcpStream, peer: SocketAddr) { /// Run the client handler
pub async fn run(mut self) -> Result<()> {
self.stats.increment_connects_all(); self.stats.increment_connects_all();
let peer = self.peer;
debug!(peer = %peer, "New connection"); debug!(peer = %peer, "New connection");
// Configure socket // Configure socket
if let Err(e) = configure_client_socket( if let Err(e) = configure_client_socket(
&stream, &self.stream,
self.config.client_keepalive, self.config.client_keepalive,
self.config.client_ack_timeout, self.config.client_ack_timeout,
) { ) {
@@ -66,49 +82,56 @@ impl ClientHandler {
// Perform handshake with timeout // Perform handshake with timeout
let handshake_timeout = Duration::from_secs(self.config.client_handshake_timeout); let handshake_timeout = Duration::from_secs(self.config.client_handshake_timeout);
// Clone stats for error handling block
let stats = self.stats.clone();
let result = timeout( let result = timeout(
handshake_timeout, handshake_timeout,
self.do_handshake(stream, peer) self.do_handshake()
).await; ).await;
match result { match result {
Ok(Ok(())) => { Ok(Ok(())) => {
debug!(peer = %peer, "Connection handled successfully"); debug!(peer = %peer, "Connection handled successfully");
Ok(())
} }
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed"); debug!(peer = %peer, error = %e, "Handshake failed");
Err(e)
} }
Err(_) => { Err(_) => {
self.stats.increment_handshake_timeouts(); stats.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout"); debug!(peer = %peer, "Handshake timeout");
Err(ProxyError::TgHandshakeTimeout)
} }
} }
} }
/// Perform handshake and relay /// Perform handshake and relay
async fn do_handshake(&self, mut stream: TcpStream, peer: SocketAddr) -> Result<()> { async fn do_handshake(mut self) -> Result<()> {
// Read first bytes to determine handshake type // Read first bytes to determine handshake type
let mut first_bytes = [0u8; 5]; let mut first_bytes = [0u8; 5];
stream.read_exact(&mut first_bytes).await?; self.stream.read_exact(&mut first_bytes).await?;
let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
let peer = self.peer;
debug!(peer = %peer, is_tls = is_tls, first_bytes = %hex::encode(&first_bytes), "Handshake type detected"); debug!(peer = %peer, is_tls = is_tls, first_bytes = %hex::encode(&first_bytes), "Handshake type detected");
if is_tls { if is_tls {
self.handle_tls_client(stream, peer, first_bytes).await self.handle_tls_client(first_bytes).await
} else { } else {
self.handle_direct_client(stream, peer, first_bytes).await self.handle_direct_client(first_bytes).await
} }
} }
/// Handle TLS-wrapped client /// Handle TLS-wrapped client
async fn handle_tls_client( async fn handle_tls_client(
&self, mut self,
mut stream: TcpStream,
peer: SocketAddr,
first_bytes: [u8; 5], first_bytes: [u8; 5],
) -> Result<()> { ) -> Result<()> {
let peer = self.peer;
// Read TLS handshake length // Read TLS handshake length
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
@@ -117,17 +140,22 @@ impl ClientHandler {
if tls_len < 512 { if tls_len < 512 {
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();
handle_bad_client(stream, &first_bytes, &self.config).await; handle_bad_client(self.stream, &first_bytes, &self.config).await;
return Ok(()); return Ok(());
} }
// Read full TLS handshake // Read full TLS handshake
let mut handshake = vec![0u8; 5 + tls_len]; let mut handshake = vec![0u8; 5 + tls_len];
handshake[..5].copy_from_slice(&first_bytes); handshake[..5].copy_from_slice(&first_bytes);
stream.read_exact(&mut handshake[5..]).await?; self.stream.read_exact(&mut handshake[5..]).await?;
// Extract fields before consuming self.stream
let config = self.config.clone();
let replay_checker = self.replay_checker.clone();
let stats = self.stats.clone();
// Split stream for reading/writing // Split stream for reading/writing
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = self.stream.into_split();
// Handle TLS handshake // Handle TLS handshake
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake( let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
@@ -135,12 +163,12 @@ impl ClientHandler {
read_half, read_half,
write_half, write_half,
peer, peer,
&self.config, &config,
&self.replay_checker, &replay_checker,
).await { ).await {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient => { HandshakeResult::BadClient => {
self.stats.increment_connects_bad(); stats.increment_connects_bad();
return Ok(()); return Ok(());
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
@@ -158,44 +186,62 @@ impl ClientHandler {
tls_reader, tls_reader,
tls_writer, tls_writer,
peer, peer,
&self.config, &config,
&self.replay_checker, &replay_checker,
true, true,
).await { ).await {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient => { HandshakeResult::BadClient => {
self.stats.increment_connects_bad(); stats.increment_connects_bad();
return Ok(()); return Ok(());
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
}; };
// Handle authenticated client // Handle authenticated client
self.handle_authenticated_inner(crypto_reader, crypto_writer, success).await // We can't use self.handle_authenticated_inner because self is partially moved
// So we call it as an associated function or method on a new struct,
// or just inline the logic / use a static method.
// Since handle_authenticated_inner needs self.upstream_manager and self.stats,
// we should pass them explicitly.
Self::handle_authenticated_static(
crypto_reader,
crypto_writer,
success,
self.upstream_manager,
self.stats,
self.config
).await
} }
/// Handle direct (non-TLS) client /// Handle direct (non-TLS) client
async fn handle_direct_client( async fn handle_direct_client(
&self, mut self,
mut stream: TcpStream,
peer: SocketAddr,
first_bytes: [u8; 5], first_bytes: [u8; 5],
) -> Result<()> { ) -> Result<()> {
let peer = self.peer;
// Check if non-TLS modes are enabled // Check if non-TLS modes are enabled
if !self.config.modes.classic && !self.config.modes.secure { if !self.config.modes.classic && !self.config.modes.secure {
debug!(peer = %peer, "Non-TLS modes disabled"); debug!(peer = %peer, "Non-TLS modes disabled");
self.stats.increment_connects_bad(); self.stats.increment_connects_bad();
handle_bad_client(stream, &first_bytes, &self.config).await; handle_bad_client(self.stream, &first_bytes, &self.config).await;
return Ok(()); return Ok(());
} }
// Read rest of handshake // Read rest of handshake
let mut handshake = [0u8; HANDSHAKE_LEN]; let mut handshake = [0u8; HANDSHAKE_LEN];
handshake[..5].copy_from_slice(&first_bytes); handshake[..5].copy_from_slice(&first_bytes);
stream.read_exact(&mut handshake[5..]).await?; self.stream.read_exact(&mut handshake[5..]).await?;
// Extract fields
let config = self.config.clone();
let replay_checker = self.replay_checker.clone();
let stats = self.stats.clone();
// Split stream // Split stream
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = self.stream.into_split();
// Handle MTProto handshake // Handle MTProto handshake
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
@@ -203,27 +249,36 @@ impl ClientHandler {
read_half, read_half,
write_half, write_half,
peer, peer,
&self.config, &config,
&self.replay_checker, &replay_checker,
false, false,
).await { ).await {
HandshakeResult::Success(result) => result, HandshakeResult::Success(result) => result,
HandshakeResult::BadClient => { HandshakeResult::BadClient => {
self.stats.increment_connects_bad(); stats.increment_connects_bad();
return Ok(()); return Ok(());
} }
HandshakeResult::Error(e) => return Err(e), HandshakeResult::Error(e) => return Err(e),
}; };
self.handle_authenticated_inner(crypto_reader, crypto_writer, success).await Self::handle_authenticated_static(
crypto_reader,
crypto_writer,
success,
self.upstream_manager,
self.stats,
self.config
).await
} }
/// Handle authenticated client - connect to Telegram and relay /// Static version of handle_authenticated_inner to avoid ownership issues
async fn handle_authenticated_inner<R, W>( async fn handle_authenticated_static<R, W>(
&self,
client_reader: CryptoReader<R>, client_reader: CryptoReader<R>,
client_writer: CryptoWriter<W>, client_writer: CryptoWriter<W>,
success: HandshakeSuccess, success: HandshakeSuccess,
upstream_manager: Arc<UpstreamManager>,
stats: Arc<Stats>,
config: Arc<ProxyConfig>,
) -> Result<()> ) -> Result<()>
where where
R: AsyncRead + Unpin + Send + 'static, R: AsyncRead + Unpin + Send + 'static,
@@ -232,13 +287,13 @@ impl ClientHandler {
let user = &success.user; let user = &success.user;
// Check user limits // Check user limits
if let Err(e) = self.check_user_limits(user) { if let Err(e) = Self::check_user_limits_static(user, &config, &stats) {
warn!(user = %user, error = %e, "User limit exceeded"); warn!(user = %user, error = %e, "User limit exceeded");
return Err(e); return Err(e);
} }
// Get datacenter address // Get datacenter address
let dc_addr = self.get_dc_addr(success.dc_idx)?; let dc_addr = Self::get_dc_addr_static(success.dc_idx, &config)?;
info!( info!(
user = %user, user = %user,
@@ -246,39 +301,40 @@ impl ClientHandler {
dc = success.dc_idx, dc = success.dc_idx,
dc_addr = %dc_addr, dc_addr = %dc_addr,
proto = ?success.proto_tag, proto = ?success.proto_tag,
fast_mode = self.config.fast_mode, fast_mode = config.fast_mode,
"Connecting to Telegram" "Connecting to Telegram"
); );
// Connect to Telegram // Connect to Telegram via UpstreamManager
let tg_stream = self.pool.get(dc_addr).await?; let tg_stream = upstream_manager.connect(dc_addr).await?;
debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected to Telegram, performing handshake"); debug!(peer = %success.peer, dc_addr = %dc_addr, "Connected to Telegram, performing handshake");
// Perform Telegram handshake and get crypto streams // Perform Telegram handshake and get crypto streams
let (tg_reader, tg_writer) = self.do_tg_handshake( let (tg_reader, tg_writer) = Self::do_tg_handshake_static(
tg_stream, tg_stream,
&success, &success,
&config,
).await?; ).await?;
debug!(peer = %success.peer, "Telegram handshake complete, starting relay"); debug!(peer = %success.peer, "Telegram handshake complete, starting relay");
// Update stats // Update stats
self.stats.increment_user_connects(user); stats.increment_user_connects(user);
self.stats.increment_user_curr_connects(user); stats.increment_user_curr_connects(user);
// Relay traffic - передаём Arc::clone(&self.stats) // Relay traffic
let relay_result = relay_bidirectional( let relay_result = relay_bidirectional(
client_reader, client_reader,
client_writer, client_writer,
tg_reader, tg_reader,
tg_writer, tg_writer,
user, user,
Arc::clone(&self.stats), Arc::clone(&stats),
).await; ).await;
// Update stats // Update stats
self.stats.decrement_user_curr_connects(user); stats.decrement_user_curr_connects(user);
match &relay_result { match &relay_result {
Ok(()) => debug!(user = %user, peer = %success.peer, "Relay completed normally"), Ok(()) => debug!(user = %user, peer = %success.peer, "Relay completed normally"),
@@ -288,26 +344,26 @@ impl ClientHandler {
relay_result relay_result
} }
/// Check user limits (expiration, connection count, data quota) /// Check user limits (static version)
fn check_user_limits(&self, user: &str) -> Result<()> { fn check_user_limits_static(user: &str, config: &ProxyConfig, stats: &Stats) -> Result<()> {
// Check expiration // Check expiration
if let Some(expiration) = self.config.user_expirations.get(user) { if let Some(expiration) = config.user_expirations.get(user) {
if chrono::Utc::now() > *expiration { if chrono::Utc::now() > *expiration {
return Err(ProxyError::UserExpired { user: user.to_string() }); return Err(ProxyError::UserExpired { user: user.to_string() });
} }
} }
// Check connection limit // Check connection limit
if let Some(limit) = self.config.user_max_tcp_conns.get(user) { if let Some(limit) = config.user_max_tcp_conns.get(user) {
let current = self.stats.get_user_curr_connects(user); let current = stats.get_user_curr_connects(user);
if current >= *limit as u64 { if current >= *limit as u64 {
return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() }); return Err(ProxyError::ConnectionLimitExceeded { user: user.to_string() });
} }
} }
// Check data quota // Check data quota
if let Some(quota) = self.config.user_data_quota.get(user) { if let Some(quota) = config.user_data_quota.get(user) {
let used = self.stats.get_user_total_octets(user); let used = stats.get_user_total_octets(user);
if used >= *quota { if used >= *quota {
return Err(ProxyError::DataQuotaExceeded { user: user.to_string() }); return Err(ProxyError::DataQuotaExceeded { user: user.to_string() });
} }
@@ -316,11 +372,11 @@ impl ClientHandler {
Ok(()) Ok(())
} }
/// Get datacenter address by index /// Get datacenter address by index (static version)
fn get_dc_addr(&self, dc_idx: i16) -> Result<SocketAddr> { fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let idx = (dc_idx.abs() - 1) as usize; let idx = (dc_idx.abs() - 1) as usize;
let datacenters = if self.config.prefer_ipv6 { let datacenters = if config.prefer_ipv6 {
&*TG_DATACENTERS_V6 &*TG_DATACENTERS_V6
} else { } else {
&*TG_DATACENTERS_V4 &*TG_DATACENTERS_V4
@@ -333,19 +389,18 @@ impl ClientHandler {
)) ))
} }
/// Perform handshake with Telegram server /// Perform handshake with Telegram server (static version)
/// Returns crypto reader and writer for TG connection async fn do_tg_handshake_static(
async fn do_tg_handshake(
&self,
mut stream: TcpStream, mut stream: TcpStream,
success: &HandshakeSuccess, success: &HandshakeSuccess,
config: &ProxyConfig,
) -> Result<(CryptoReader<tokio::net::tcp::OwnedReadHalf>, CryptoWriter<tokio::net::tcp::OwnedWriteHalf>)> { ) -> Result<(CryptoReader<tokio::net::tcp::OwnedReadHalf>, CryptoWriter<tokio::net::tcp::OwnedWriteHalf>)> {
// Generate nonce with keys for TG // Generate nonce with keys for TG
let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce( let (nonce, tg_enc_key, tg_enc_iv, tg_dec_key, tg_dec_iv) = generate_tg_nonce(
success.proto_tag, success.proto_tag,
&success.dec_key, // Client's dec key &success.dec_key, // Client's dec key
success.dec_iv, success.dec_iv,
self.config.fast_mode, config.fast_mode,
); );
// Encrypt nonce // Encrypt nonce

View File

@@ -13,7 +13,7 @@ const MASK_BUFFER_SIZE: usize = 8192;
/// Handle a bad client by forwarding to mask host /// Handle a bad client by forwarding to mask host
pub async fn handle_bad_client( pub async fn handle_bad_client(
mut client: TcpStream, client: TcpStream,
initial_data: &[u8], initial_data: &[u8],
config: &ProxyConfig, config: &ProxyConfig,
) { ) {

View File

@@ -149,9 +149,9 @@ pub trait FrameCodec: Send + Sync {
/// Create a frame codec for the given protocol tag /// Create a frame codec for the given protocol tag
pub fn create_codec(proto_tag: ProtoTag) -> Box<dyn FrameCodec> { pub fn create_codec(proto_tag: ProtoTag) -> Box<dyn FrameCodec> {
match proto_tag { match proto_tag {
ProtoTag::Abridged => Box::new(super::frame_codec::AbridgedCodec::new()), ProtoTag::Abridged => Box::new(crate::stream::frame_codec::AbridgedCodec::new()),
ProtoTag::Intermediate => Box::new(super::frame_codec::IntermediateCodec::new()), ProtoTag::Intermediate => Box::new(crate::stream::frame_codec::IntermediateCodec::new()),
ProtoTag::Secure => Box::new(super::frame_codec::SecureCodec::new()), ProtoTag::Secure => Box::new(crate::stream::frame_codec::SecureCodec::new()),
} }
} }

View File

@@ -3,7 +3,11 @@
pub mod pool; pub mod pool;
pub mod proxy_protocol; pub mod proxy_protocol;
pub mod socket; pub mod socket;
pub mod socks;
pub mod upstream;
pub use pool::ConnectionPool; pub use pool::ConnectionPool;
pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol}; pub use proxy_protocol::{ProxyProtocolInfo, parse_proxy_protocol};
pub use socket::*; pub use socket::*;
pub use socks::*;
pub use upstream::UpstreamManager;

View File

@@ -1,7 +1,7 @@
//! TCP Socket Configuration //! TCP Socket Configuration
use std::io::Result; use std::io::Result;
use std::net::SocketAddr; use std::net::{SocketAddr, IpAddr};
use std::time::Duration; use std::time::Duration;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol}; use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
@@ -93,6 +93,11 @@ pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
/// Create a new TCP socket for outgoing connections /// Create a new TCP socket for outgoing connections
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> { pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
create_outgoing_socket_bound(addr, None)
}
/// Create a new TCP socket for outgoing connections, optionally bound to a specific interface
pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>) -> Result<Socket> {
let domain = if addr.is_ipv4() { let domain = if addr.is_ipv4() {
Domain::IPV4 Domain::IPV4
} else { } else {
@@ -107,9 +112,16 @@ pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
// Disable Nagle // Disable Nagle
socket.set_nodelay(true)?; socket.set_nodelay(true)?;
if let Some(bind_ip) = bind_addr {
let bind_sock_addr = SocketAddr::new(bind_ip, 0);
socket.bind(&bind_sock_addr.into())?;
debug!("Bound outgoing socket to {}", bind_ip);
}
Ok(socket) Ok(socket)
} }
/// Get local address of a socket /// Get local address of a socket
pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> { pub fn get_local_addr(stream: &TcpStream) -> Option<SocketAddr> {
stream.local_addr().ok() stream.local_addr().ok()

145
src/transport/socks.rs Normal file
View File

@@ -0,0 +1,145 @@
//! SOCKS4/5 Client Implementation
use std::net::{IpAddr, SocketAddr};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use crate::error::{ProxyError, Result};
pub async fn connect_socks4(
stream: &mut TcpStream,
target: SocketAddr,
user_id: Option<&str>,
) -> Result<()> {
let ip = match target.ip() {
IpAddr::V4(ip) => ip,
IpAddr::V6(_) => return Err(ProxyError::Proxy("SOCKS4 does not support IPv6".to_string())),
};
let port = target.port();
let user = user_id.unwrap_or("").as_bytes();
// VN (4) | CD (1) | DSTPORT (2) | DSTIP (4) | USERID (variable) | NULL (1)
let mut buf = Vec::with_capacity(9 + user.len());
buf.push(4); // VN
buf.push(1); // CD (CONNECT)
buf.extend_from_slice(&port.to_be_bytes());
buf.extend_from_slice(&ip.octets());
buf.extend_from_slice(user);
buf.push(0); // NULL
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
// Response: VN (1) | CD (1) | DSTPORT (2) | DSTIP (4)
let mut resp = [0u8; 8];
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
if resp[1] != 90 {
return Err(ProxyError::Proxy(format!("SOCKS4 request rejected: code {}", resp[1])));
}
Ok(())
}
pub async fn connect_socks5(
stream: &mut TcpStream,
target: SocketAddr,
username: Option<&str>,
password: Option<&str>,
) -> Result<()> {
// 1. Auth negotiation
// VER (1) | NMETHODS (1) | METHODS (variable)
let mut methods = vec![0u8]; // No auth
if username.is_some() {
methods.push(2u8); // Username/Password
}
let mut buf = vec![5u8, methods.len() as u8];
buf.extend_from_slice(&methods);
stream.write_all(&buf).await.map_err(|e| ProxyError::Io(e))?;
let mut resp = [0u8; 2];
stream.read_exact(&mut resp).await.map_err(|e| ProxyError::Io(e))?;
if resp[0] != 5 {
return Err(ProxyError::Proxy("Invalid SOCKS5 version".to_string()));
}
match resp[1] {
0 => {}, // No auth
2 => {
// Username/Password auth
if let (Some(u), Some(p)) = (username, password) {
let u_bytes = u.as_bytes();
let p_bytes = p.as_bytes();
let mut auth_buf = Vec::with_capacity(3 + u_bytes.len() + p_bytes.len());
auth_buf.push(1); // VER
auth_buf.push(u_bytes.len() as u8);
auth_buf.extend_from_slice(u_bytes);
auth_buf.push(p_bytes.len() as u8);
auth_buf.extend_from_slice(p_bytes);
stream.write_all(&auth_buf).await.map_err(|e| ProxyError::Io(e))?;
let mut auth_resp = [0u8; 2];
stream.read_exact(&mut auth_resp).await.map_err(|e| ProxyError::Io(e))?;
if auth_resp[1] != 0 {
return Err(ProxyError::Proxy("SOCKS5 authentication failed".to_string()));
}
} else {
return Err(ProxyError::Proxy("SOCKS5 server requires authentication".to_string()));
}
},
_ => return Err(ProxyError::Proxy("Unsupported SOCKS5 auth method".to_string())),
}
// 2. Connection request
// VER (1) | CMD (1) | RSV (1) | ATYP (1) | DST.ADDR (variable) | DST.PORT (2)
let mut req = vec![5u8, 1u8, 0u8]; // CONNECT
match target {
SocketAddr::V4(v4) => {
req.push(1u8); // IPv4
req.extend_from_slice(&v4.ip().octets());
},
SocketAddr::V6(v6) => {
req.push(4u8); // IPv6
req.extend_from_slice(&v6.ip().octets());
},
}
req.extend_from_slice(&target.port().to_be_bytes());
stream.write_all(&req).await.map_err(|e| ProxyError::Io(e))?;
// Response
let mut head = [0u8; 4];
stream.read_exact(&mut head).await.map_err(|e| ProxyError::Io(e))?;
if head[1] != 0 {
return Err(ProxyError::Proxy(format!("SOCKS5 request failed: code {}", head[1])));
}
// Skip address part of response
match head[3] {
1 => { // IPv4
let mut addr = [0u8; 4 + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
},
3 => { // Domain
let mut len = [0u8; 1];
stream.read_exact(&mut len).await.map_err(|e| ProxyError::Io(e))?;
let mut addr = vec![0u8; len[0] as usize + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
},
4 => { // IPv6
let mut addr = [0u8; 16 + 2];
stream.read_exact(&mut addr).await.map_err(|e| ProxyError::Io(e))?;
},
_ => return Err(ProxyError::Proxy("Invalid address type in SOCKS5 response".to_string())),
}
Ok(())
}

259
src/transport/upstream.rs Normal file
View File

@@ -0,0 +1,259 @@
//! Upstream Management
use std::net::{SocketAddr, IpAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::RwLock;
use rand::Rng;
use tracing::{debug, warn, error, info};
use crate::config::{UpstreamConfig, UpstreamType};
use crate::error::{Result, ProxyError};
use crate::transport::socket::create_outgoing_socket_bound;
use crate::transport::socks::{connect_socks4, connect_socks5};
#[derive(Debug)]
struct UpstreamState {
config: UpstreamConfig,
healthy: bool,
fails: u32,
last_check: std::time::Instant,
}
#[derive(Clone)]
pub struct UpstreamManager {
upstreams: Arc<RwLock<Vec<UpstreamState>>>,
}
impl UpstreamManager {
pub fn new(configs: Vec<UpstreamConfig>) -> Self {
let states = configs.into_iter()
.filter(|c| c.enabled)
.map(|c| UpstreamState {
config: c,
healthy: true, // Optimistic start
fails: 0,
last_check: std::time::Instant::now(),
})
.collect();
Self {
upstreams: Arc::new(RwLock::new(states)),
}
}
/// Select an upstream using Weighted Round Robin (simplified)
async fn select_upstream(&self) -> Option<usize> {
let upstreams = self.upstreams.read().await;
if upstreams.is_empty() {
return None;
}
let healthy_indices: Vec<usize> = upstreams.iter()
.enumerate()
.filter(|(_, u)| u.healthy)
.map(|(i, _)| i)
.collect();
if healthy_indices.is_empty() {
// If all unhealthy, try any random one
return Some(rand::thread_rng().gen_range(0..upstreams.len()));
}
// Weighted selection
let total_weight: u32 = healthy_indices.iter()
.map(|&i| upstreams[i].config.weight as u32)
.sum();
if total_weight == 0 {
return Some(healthy_indices[rand::thread_rng().gen_range(0..healthy_indices.len())]);
}
let mut choice = rand::thread_rng().gen_range(0..total_weight);
for &idx in &healthy_indices {
let weight = upstreams[idx].config.weight as u32;
if choice < weight {
return Some(idx);
}
choice -= weight;
}
Some(healthy_indices[0])
}
pub async fn connect(&self, target: SocketAddr) -> Result<TcpStream> {
let idx = self.select_upstream().await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let upstream = {
let guard = self.upstreams.read().await;
guard[idx].config.clone()
};
match self.connect_via_upstream(&upstream, target).await {
Ok(stream) => {
// Mark success
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
if !u.healthy {
debug!("Upstream recovered: {:?}", u.config);
}
u.healthy = true;
u.fails = 0;
}
Ok(stream)
},
Err(e) => {
// Mark failure
let mut guard = self.upstreams.write().await;
if let Some(u) = guard.get_mut(idx) {
u.fails += 1;
warn!("Failed to connect via upstream {:?}: {}. Fails: {}", u.config, e, u.fails);
if u.fails > 3 {
u.healthy = false;
warn!("Upstream disabled due to failures: {:?}", u.config);
}
}
Err(e)
}
}
}
async fn connect_via_upstream(&self, config: &UpstreamConfig, target: SocketAddr) -> Result<TcpStream> {
match &config.upstream_type {
UpstreamType::Direct { interface } => {
let bind_ip = interface.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok());
let socket = create_outgoing_socket_bound(target, bind_ip)?;
// Non-blocking connect logic
socket.set_nonblocking(true)?;
match socket.connect(&target.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let stream = TcpStream::from_std(std_stream)?;
// Wait for connection to complete
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
Ok(stream)
},
UpstreamType::Socks4 { address, interface, user_id } => {
info!("Connecting to target {} via SOCKS4 proxy {}", target, address);
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS4 address".to_string()))?;
let bind_ip = interface.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok());
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
// Non-blocking connect logic
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
// Wait for connection to complete
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
connect_socks4(&mut stream, target, user_id.as_deref()).await?;
Ok(stream)
},
UpstreamType::Socks5 { address, interface, username, password } => {
info!("Connecting to target {} via SOCKS5 proxy {}", target, address);
let proxy_addr: SocketAddr = address.parse()
.map_err(|_| ProxyError::Config("Invalid SOCKS5 address".to_string()))?;
let bind_ip = interface.as_ref()
.and_then(|s| s.parse::<IpAddr>().ok());
let socket = create_outgoing_socket_bound(proxy_addr, bind_ip)?;
// Non-blocking connect logic
socket.set_nonblocking(true)?;
match socket.connect(&proxy_addr.into()) {
Ok(()) => {},
Err(err) if err.raw_os_error() == Some(115) || err.kind() == std::io::ErrorKind::WouldBlock => {},
Err(err) => return Err(ProxyError::Io(err)),
}
let std_stream: std::net::TcpStream = socket.into();
let mut stream = TcpStream::from_std(std_stream)?;
// Wait for connection to complete
stream.writable().await?;
if let Some(e) = stream.take_error()? {
return Err(ProxyError::Io(e));
}
connect_socks5(&mut stream, target, username.as_deref(), password.as_deref()).await?;
Ok(stream)
},
}
}
/// Background task to check health
pub async fn run_health_checks(&self) {
// Simple TCP connect check to a known stable DC (e.g. 149.154.167.50:443 - DC2)
let check_target: SocketAddr = "149.154.167.50:443".parse().unwrap();
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let count = self.upstreams.read().await.len();
for i in 0..count {
let config = {
let guard = self.upstreams.read().await;
guard[i].config.clone()
};
let result = tokio::time::timeout(
Duration::from_secs(10),
self.connect_via_upstream(&config, check_target)
).await;
let mut guard = self.upstreams.write().await;
let u = &mut guard[i];
match result {
Ok(Ok(_stream)) => {
if !u.healthy {
debug!("Upstream recovered: {:?}", u.config);
}
u.healthy = true;
u.fails = 0;
}
Ok(Err(e)) => {
debug!("Health check failed for {:?}: {}", u.config, e);
// Don't mark unhealthy immediately in background check
}
Err(_) => {
debug!("Health check timeout for {:?}", u.config);
}
}
u.last_check = std::time::Instant::now();
}
}
}
}

View File

@@ -1,6 +1,6 @@
//! IP Addr Detect //! IP Addr Detect
use std::net::IpAddr; use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::time::Duration; use std::time::Duration;
use tracing::{debug, warn}; use tracing::{debug, warn};
@@ -40,28 +40,74 @@ const IPV6_URLS: &[&str] = &[
"http://api6.ipify.org/", "http://api6.ipify.org/",
]; ];
/// Detect local IP address by connecting to a public DNS
/// This does not actually send any packets
fn get_local_ip(target: &str) -> Option<IpAddr> {
let socket = UdpSocket::bind("0.0.0.0:0").ok()?;
socket.connect(target).ok()?;
socket.local_addr().ok().map(|addr| addr.ip())
}
fn get_local_ipv6(target: &str) -> Option<IpAddr> {
let socket = UdpSocket::bind("[::]:0").ok()?;
socket.connect(target).ok()?;
socket.local_addr().ok().map(|addr| addr.ip())
}
/// Detect public IP addresses /// Detect public IP addresses
pub async fn detect_ip() -> IpInfo { pub async fn detect_ip() -> IpInfo {
let mut info = IpInfo::default(); let mut info = IpInfo::default();
// Detect IPv4 // Try to get local interface IP first (default gateway interface)
for url in IPV4_URLS { // We connect to Google DNS to find out which interface is used for routing
if let Some(ip) = fetch_ip(url).await { if let Some(ip) = get_local_ip("8.8.8.8:80") {
if ip.is_ipv4() { if ip.is_ipv4() && !ip.is_loopback() {
info.ipv4 = Some(ip); info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected IPv4 address"); debug!(ip = %ip, "Detected local IPv4 address via routing");
break; }
}
if let Some(ip) = get_local_ipv6("[2001:4860:4860::8888]:80") {
if ip.is_ipv6() && !ip.is_loopback() {
info.ipv6 = Some(ip);
debug!(ip = %ip, "Detected local IPv6 address via routing");
}
}
// If local detection failed or returned private IP (and we want public),
// or just as a fallback/verification, we might want to check external services.
// However, the requirement is: "if IP for listening is not set... it should be IP from interface...
// if impossible - request external resources".
// So if we found a local IP, we might be good. But often servers are behind NAT.
// If the local IP is private, we probably want the public IP for the tg:// link.
// Let's check if the detected IPs are private.
let need_external_v4 = info.ipv4.map_or(true, |ip| is_private_ip(ip));
let need_external_v6 = info.ipv6.map_or(true, |ip| is_private_ip(ip));
if need_external_v4 {
debug!("Local IPv4 is private or missing, checking external services...");
for url in IPV4_URLS {
if let Some(ip) = fetch_ip(url).await {
if ip.is_ipv4() {
info.ipv4 = Some(ip);
debug!(ip = %ip, "Detected public IPv4 address");
break;
}
} }
} }
} }
// Detect IPv6 if need_external_v6 {
for url in IPV6_URLS { debug!("Local IPv6 is private or missing, checking external services...");
if let Some(ip) = fetch_ip(url).await { for url in IPV6_URLS {
if ip.is_ipv6() { if let Some(ip) = fetch_ip(url).await {
info.ipv6 = Some(ip); if ip.is_ipv6() {
debug!(ip = %ip, "Detected IPv6 address"); info.ipv6 = Some(ip);
break; debug!(ip = %ip, "Detected public IPv6 address");
break;
}
} }
} }
} }
@@ -73,6 +119,17 @@ pub async fn detect_ip() -> IpInfo {
info info
} }
fn is_private_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(ipv4) => {
ipv4.is_private() || ipv4.is_loopback() || ipv4.is_link_local()
}
IpAddr::V6(ipv6) => {
ipv6.is_loopback() || (ipv6.segments()[0] & 0xfe00) == 0xfc00 // Unique Local
}
}
}
/// Fetch IP from URL /// Fetch IP from URL
async fn fetch_ip(url: &str) -> Option<IpAddr> { async fn fetch_ip(url: &str) -> Option<IpAddr> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()