From bbcbdcbbf95cda5115bbb484b5e2d01669a7a1a0 Mon Sep 17 00:00:00 2001 From: Namilskyy Date: Sun, 28 Dec 2025 15:19:04 +0300 Subject: Implementing integrated router funtions and other fuctions --- src/router/manager.rs | 144 ++++++++++++++++++++++++++++++++++++ src/router/mod.rs | 2 +- src/router/router.rs | 197 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 272 insertions(+), 71 deletions(-) create mode 100644 src/router/manager.rs (limited to 'src/router') diff --git a/src/router/manager.rs b/src/router/manager.rs new file mode 100644 index 0000000..23b7c69 --- /dev/null +++ b/src/router/manager.rs @@ -0,0 +1,144 @@ +use crate::router::router::RouterUtils; +use crate::router::router::{Emissary, EmissaryConfig}; +use log::{debug, error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +#[derive(Debug)] +pub enum RouterMessage { + Start, + Stop, + Status, +} + +pub struct RouterManager { + handle: Option>, + tx: mpsc::Sender, + is_running: Arc, +} + +impl RouterManager { + pub async fn new( + config: &crate::cfg::config::Config, + ) -> Result> { + if !config.router.integrated_router { + return Ok(Self { + handle: None, + tx: { + let (tx, _) = mpsc::channel(1); + tx + }, + is_running: Arc::new(AtomicBool::new(false)), + }); + } + + let (tx, mut rx) = mpsc::channel::(1); + let is_running = Arc::new(AtomicBool::new(false)); + let is_running_clone = is_running.clone(); + + let router_config = config.router.clone(); + let handle = tokio::spawn(async move { + info!("Router manager task started"); + + let mut router = Emissary::new(EmissaryConfig { + storage: Some(std::path::PathBuf::from(&router_config.storage_path)), + auto_update: Some(router_config.auto_update), + http_proxy_port: Some(router_config.http_proxy_port), + socks_proxy_port: Some(router_config.socks_proxy_port), + }); + + if let Err(e) = router.config().await { + error!("Failed to configure router: {}", e); + return; + } + + let storage_path = std::path::Path::new(&router_config.storage_path); + if !storage_path.exists() + || storage_path + .read_dir() + .ok() + .is_none_or(|mut d| d.next().is_none()) + { + info!("Router storage is empty, performing initial reseed..."); + if let Err(e) = router.reseed().await { + error!("Failed to reseed router: {}", e); + return; + } + } + + let router_task = tokio::spawn(async move { + info!("Starting router service..."); + match router.start().await { + Ok(_) => info!("Router service stopped"), + Err(e) => error!("Router service error: {}", e), + } + }); + + is_running_clone.store(true, Ordering::SeqCst); + info!("Router manager is now running"); + + while let Some(msg) = rx.recv().await { + match msg { + RouterMessage::Stop => { + info!("Stopping router..."); + // TODO: Implement proper router shutdown + is_running_clone.store(false, Ordering::SeqCst); + break; + } + RouterMessage::Status => { + debug!( + "Router status: {}", + if is_running_clone.load(Ordering::SeqCst) { + "running" + } else { + "stopped" + } + ); + } + _ => {} + } + } + + if let Err(e) = router_task.await { + error!("Router task panicked: {}", e); + } + + info!("Router manager task finished"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + Ok(Self { + handle: Some(handle), + tx, + is_running, + }) + } + + pub async fn stop(&self) { + if self.is_running.load(Ordering::SeqCst) { + if let Err(e) = self.tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message to router: {}", e); + } + } + } + + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::SeqCst) + } +} + +impl Drop for RouterManager { + fn drop(&mut self) { + if self.is_running() { + let tx = self.tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message during drop: {}", e); + } + }); + } + } +} diff --git a/src/router/mod.rs b/src/router/mod.rs index 772c177..91da2e5 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,2 +1,2 @@ +pub mod manager; pub mod router; - diff --git a/src/router/router.rs b/src/router/router.rs index c438d75..7c36382 100644 --- a/src/router/router.rs +++ b/src/router/router.rs @@ -1,92 +1,149 @@ -use emissary_util::storage::{Storage, StorageBundle}; +use emissary_core::router::Router; +use emissary_core::{Config, Ntcp2Config, SamConfig, TransitConfig}; +use emissary_util::port_mapper::{PortMapper, PortMapperConfig}; use emissary_util::reseeder::Reseeder; +use emissary_util::runtime::tokio::Runtime as TokioRuntime; +use emissary_util::storage::{Storage, StorageBundle}; +use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use std::sync::Arc; +use std::future::Future; - -pub trait RouterConfigUtils { - async fn config(&mut self) -> Result<(StorageBundle, Storage), Box>; - async fn prepare_router(&mut self) -> Result<(), Box>; - async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box>; +pub trait RouterUtils { + fn config(&mut self) -> impl Future>> + Send; + fn reseed(&mut self) -> impl Future>> + Send; + fn start(self) -> impl Future, Box>> + Send; } +#[derive(Debug, Deserialize, Serialize)] pub struct EmissaryConfig { - Storage: Option, - AutoUpdate: Option, - HttpProxtPort: Option, - SocksProxyPort: Option, + pub storage: Option, + pub auto_update: Option, + pub http_proxy_port: Option, + pub socks_proxy_port: Option, } -impl RouterConfigUtils for EmissaryConfig { - /// Configures the router with the given configuration. - /// - /// If the `Storage` field is `None`, it will be set to `/var/lib/mesk/router/`. - /// - /// # Errors - /// - /// Returns an error if there's an issue while configuring the router. - async fn config(&mut self) -> Result<(StorageBundle, Storage), Box> { - self.Storage.get_or_insert_with(|| PathBuf::from("/var/lib/mesk/router/")); - self.AutoUpdate.get_or_insert(true); - self.HttpProxtPort.get_or_insert(4445); - self.SocksProxyPort.get_or_insert(4446); - - let storage = Storage::new(self.Storage.clone().into()).await?; - - let StorageBundle { - ntcp2_iv, - ntcp2_key, - profiles, - router_info, - routers, - signing_key, - static_key, - ssu2_intro_key, - ssu2_static_key, - } = storage.load().await; - - Ok((StorageBundle { - ntcp2_iv, - ntcp2_key, - profiles, - router_info, - routers, - signing_key, - static_key, - ssu2_intro_key, - ssu2_static_key, - }, storage)) +pub struct Emissary { + config: EmissaryConfig, + storage: Option, + storage_bundle: Option, +} + +impl Emissary { + pub fn new(config: EmissaryConfig) -> Self { + Self { + config, + storage: None, + storage_bundle: None, + } + } + + fn ensure_configured(&self) -> Result<(), Box> { + if self.storage.is_none() || self.storage_bundle.is_none() { + Err("Router not configured. Call `config()` first.".into()) + } else { + Ok(()) + } + } +} + +impl RouterUtils for Emissary { + async fn config(&mut self) -> Result<(), Box> { + if self.config.storage.is_none() { + self.config.storage = Some(PathBuf::from("/var/lib/mesk/router/")); + } + if self.config.auto_update.is_none() { + self.config.auto_update = Some(true); + } + if self.config.http_proxy_port.is_none() { + self.config.http_proxy_port = Some(4445); + } + if self.config.socks_proxy_port.is_none() { + self.config.socks_proxy_port = Some(4446); + } + + let storage = Storage::new(Some(self.config.storage.clone().unwrap())).await?; + let bundle = storage.load().await; + + self.storage = Some(storage); + self.storage_bundle = Some(bundle); + + Ok(()) } - async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box> { - let mut routers = config.routers.clone(); - - - if routers.is_empty() { + async fn reseed(&mut self) -> Result<(), Box> { + self.ensure_configured()?; + + let storage = self.storage.as_ref().unwrap(); + let mut bundle = self.storage_bundle.take().unwrap(); + + if bundle.routers.is_empty() { match Reseeder::reseed(None, false).await { Ok(reseed_routers) => { for info in reseed_routers { - let _ = storage + if let Err(e) = storage .store_router_info(info.name.to_string(), info.router_info.clone()) - .await; - routers.push(info.router_info); + .await + { + log::warn!("Failed to store reseeded router info: {}", e); + } + bundle.routers.push(info.router_info); } } - Err(_) if routers.is_empty() => { - return Err("Could not reseed routers.".into()); - } - Err(error) => { - log::warn!( - "Failed to reseed routers: {} - using existing routers", error.to_string(), - ); + Err(e) => { + if bundle.routers.is_empty() { + self.storage_bundle = Some(bundle); + return Err(format!("Reseed failed and no routers available: {}", e).into()); + } else { + log::warn!("Reseed failed, but using existing routers: {}", e); + } } } - } + } + + self.storage_bundle = Some(bundle); Ok(()) } - async fn prepare_router(&mut self) -> Result<(), Box> { - - - Ok(()) + async fn start(self) -> Result, Box> { + let storage = self.storage.unwrap(); + let bundle = self.storage_bundle.unwrap(); + + let config = Config { + ntcp2: Some(Ntcp2Config { + port: 25515, + key: bundle.ntcp2_key, + iv: bundle.ntcp2_iv, + publish: true, + host: None, + }), + samv3_config: Some(SamConfig { + tcp_port: self.config.http_proxy_port.unwrap_or(4445), + udp_port: self.config.socks_proxy_port.unwrap_or(4446), + host: "127.0.0.1".to_string(), + }), + routers: bundle.routers, + profiles: bundle.profiles, + router_info: bundle.router_info, + static_key: Some(bundle.static_key), + signing_key: Some(bundle.signing_key), + transit: Some(TransitConfig { + max_tunnels: Some(1000), + }), + ..Default::default() + }; + + let (router, _events, router_info) = Router::::new( + config, + None, // AddressBook + Some(Arc::new(storage.clone())), + ) + .await + .map_err(|e| format!("Router creation failed: {}", e))?; + + // Сохраняем router_info, если он новый + storage.store_local_router_info(router_info).await?; + + Ok(router) } -} \ No newline at end of file +} -- cgit v1.2.3