Drafting Upstreams and SOCKS
This commit is contained in:
Alexey
2026-01-07 17:22:10 +03:00
parent 7746a1177c
commit 4f007f3128
10 changed files with 839 additions and 244 deletions

View File

@@ -1,158 +1,166 @@
//! Telemt - MTProxy on Rust
use std::sync::Arc;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::signal;
use tracing::{info, error, Level};
use tracing_subscriber::{FmtSubscriber, EnvFilter};
use tracing::{info, error, warn};
use tracing_subscriber::{fmt, EnvFilter};
mod error;
mod config;
mod crypto;
mod error;
mod protocol;
mod proxy;
mod stats;
mod stream;
mod transport;
mod proxy;
mod config;
mod stats;
mod util;
use config::ProxyConfig;
use stats::{Stats, ReplayChecker};
use transport::ConnectionPool;
use proxy::ClientHandler;
use crate::config::ProxyConfig;
use crate::proxy::ClientHandler;
use crate::stats::Stats;
use crate::transport::{create_listener, ListenOptions, UpstreamManager};
use crate::util::ip::detect_ip;
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// Initialize logging with env filter
// Use RUST_LOG=debug or RUST_LOG=trace for more details
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber = FmtSubscriber::builder()
.with_env_filter(filter)
.with_target(true)
.with_thread_ids(false)
.with_file(false)
.with_line_number(false)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
// Load configuration
let config_path = std::env::args()
.nth(1)
.unwrap_or_else(|| "config.toml".to_string());
info!("Loading configuration from {}", config_path);
let config = ProxyConfig::load(&config_path).unwrap_or_else(|e| {
error!("Failed to load config: {}", e);
info!("Using default configuration");
ProxyConfig::default()
});
if let Err(e) = config.validate() {
error!("Invalid configuration: {}", e);
std::process::exit(1);
}
let config = Arc::new(config);
info!("Starting MTProto Proxy on port {}", config.port);
info!("Fast mode: {}", config.fast_mode);
info!("Modes: classic={}, secure={}, tls={}",
config.modes.classic, config.modes.secure, config.modes.tls);
// Initialize components
let stats = Arc::new(Stats::new());
let replay_checker = Arc::new(ReplayChecker::new(config.replay_check_len));
let pool = Arc::new(ConnectionPool::new());
// Create handler
let handler = Arc::new(ClientHandler::new(
Arc::clone(&config),
Arc::clone(&stats),
Arc::clone(&replay_checker),
Arc::clone(&pool),
));
// Start listener
let addr: SocketAddr = format!("{}:{}", config.listen_addr_ipv4, config.port)
.parse()?;
let listener = TcpListener::bind(addr).await?;
info!("Listening on {}", addr);
// Print proxy links
print_proxy_links(&config);
info!("Use RUST_LOG=debug or RUST_LOG=trace for more detailed logging");
// Main accept loop
let accept_loop = async {
loop {
match listener.accept().await {
Ok((stream, peer)) => {
let handler = Arc::clone(&handler);
tokio::spawn(async move {
handler.handle(stream, peer).await;
});
}
Err(e) => {
error!("Accept error: {}", e);
}
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("info".parse().unwrap()))
.init();
// Load config
let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.toml".to_string());
let config = match ProxyConfig::load(&config_path) {
Ok(c) => c,
Err(e) => {
// If config doesn't exist, try to create default
if std::path::Path::new(&config_path).exists() {
error!("Failed to load config: {}", e);
std::process::exit(1);
} else {
let default = ProxyConfig::default();
let toml = toml::to_string_pretty(&default).unwrap();
std::fs::write(&config_path, toml).unwrap();
info!("Created default config at {}", config_path);
default
}
}
};
// Graceful shutdown
tokio::select! {
_ = accept_loop => {}
_ = signal::ctrl_c() => {
info!("Shutting down...");
}
}
config.validate()?;
// Cleanup
pool.close_all().await;
let config = Arc::new(config);
let stats = Arc::new(Stats::new());
info!("Goodbye!");
Ok(())
}
// Initialize Upstream Manager
let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone()));
// Start Health Checks
let um_clone = upstream_manager.clone();
tokio::spawn(async move {
um_clone.run_health_checks().await;
});
fn print_proxy_links(config: &ProxyConfig) {
println!("\n=== Proxy Links ===\n");
// Detect public IP if needed (once at startup)
let detected_ip = detect_ip().await;
// Start Listeners
let mut listeners = Vec::new();
for (user, secret) in &config.users {
if config.modes.tls {
let tls_secret = format!(
"ee{}{}",
secret,
hex::encode(config.tls_domain.as_bytes())
);
println!(
"{} (TLS): tg://proxy?server=IP&port={}&secret={}",
user, config.port, tls_secret
);
}
for listener_conf in &config.listeners {
let addr = SocketAddr::new(listener_conf.ip, config.port);
let options = ListenOptions {
ipv6_only: listener_conf.ip.is_ipv6(),
..Default::default()
};
if config.modes.secure {
println!(
"{} (Secure): tg://proxy?server=IP&port={}&secret=dd{}",
user, config.port, secret
);
match create_listener(addr, &options) {
Ok(socket) => {
let listener = TcpListener::from_std(socket.into())?;
info!("Listening on {}", addr);
// Determine public IP for tg:// links
// 1. Use explicit announce_ip if set
// 2. If listening on 0.0.0.0 or ::, use detected public IP
// 3. Otherwise use the bind IP
let public_ip = if let Some(ip) = listener_conf.announce_ip {
ip
} else if listener_conf.ip.is_unspecified() {
// Try to use detected IP of the same family
if listener_conf.ip.is_ipv4() {
detected_ip.ipv4.unwrap_or(listener_conf.ip)
} else {
detected_ip.ipv6.unwrap_or(listener_conf.ip)
}
} else {
listener_conf.ip
};
for (user, secret) in &config.users {
info!("Link for {}: tg://proxy?server={}&port={}&secret={}",
user, public_ip, config.port, secret);
}
listeners.push(listener);
},
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
}
}
if config.modes.classic {
println!(
"{} (Classic): tg://proxy?server=IP&port={}&secret={}",
user, config.port, secret
);
}
println!();
}
println!("===================\n");
if listeners.is_empty() {
error!("No listeners could be started. Exiting.");
std::process::exit(1);
}
// Accept loop
// For simplicity in this slice, we just spawn a task for each listener
// In a real high-perf scenario, we might want a more complex accept loop
for listener in listeners {
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
tokio::spawn(async move {
if let Err(e) = ClientHandler::new(
stream,
peer_addr,
config,
stats,
upstream_manager
).run().await {
// Log only relevant errors
// debug!("Connection error: {}", e);
}
});
}
Err(e) => {
error!("Accept error: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
});
}
// Wait for signal
match signal::ctrl_c().await {
Ok(()) => info!("Shutting down..."),
Err(e) => error!("Signal error: {}", e),
}
Ok(())
}