diff options
| author | Namilskyy <alive6863@gmail.com> | 2025-12-28 15:19:04 +0300 |
|---|---|---|
| committer | Namilskyy <alive6863@gmail.com> | 2025-12-28 15:19:04 +0300 |
| commit | bbcbdcbbf95cda5115bbb484b5e2d01669a7a1a0 (patch) | |
| tree | d57a5ec1044713f2ebbdf694f8b6ecca7f800edf /src/router | |
| parent | 007ee0bc3534218b2d084f368444d96c16d9d7f9 (diff) | |
Implementing integrated router funtions and other fuctions
Diffstat (limited to 'src/router')
| -rw-r--r-- | src/router/manager.rs | 144 | ||||
| -rw-r--r-- | src/router/mod.rs | 2 | ||||
| -rw-r--r-- | src/router/router.rs | 197 |
3 files changed, 272 insertions, 71 deletions
diff --git a/src/router/manager.rs b/src/router/manager.rs new file mode 100644 index 0000000..23b7c69 --- /dev/null +++ b/src/router/manager.rs @@ -0,0 +1,144 @@ +use crate::router::router::RouterUtils; +use crate::router::router::{Emissary, EmissaryConfig}; +use log::{debug, error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +#[derive(Debug)] +pub enum RouterMessage { + Start, + Stop, + Status, +} + +pub struct RouterManager { + handle: Option<JoinHandle<()>>, + tx: mpsc::Sender<RouterMessage>, + is_running: Arc<AtomicBool>, +} + +impl RouterManager { + pub async fn new( + config: &crate::cfg::config::Config, + ) -> Result<Self, Box<dyn std::error::Error>> { + if !config.router.integrated_router { + return Ok(Self { + handle: None, + tx: { + let (tx, _) = mpsc::channel(1); + tx + }, + is_running: Arc::new(AtomicBool::new(false)), + }); + } + + let (tx, mut rx) = mpsc::channel::<RouterMessage>(1); + let is_running = Arc::new(AtomicBool::new(false)); + let is_running_clone = is_running.clone(); + + let router_config = config.router.clone(); + let handle = tokio::spawn(async move { + info!("Router manager task started"); + + let mut router = Emissary::new(EmissaryConfig { + storage: Some(std::path::PathBuf::from(&router_config.storage_path)), + auto_update: Some(router_config.auto_update), + http_proxy_port: Some(router_config.http_proxy_port), + socks_proxy_port: Some(router_config.socks_proxy_port), + }); + + if let Err(e) = router.config().await { + error!("Failed to configure router: {}", e); + return; + } + + let storage_path = std::path::Path::new(&router_config.storage_path); + if !storage_path.exists() + || storage_path + .read_dir() + .ok() + .is_none_or(|mut d| d.next().is_none()) + { + info!("Router storage is empty, performing initial reseed..."); + if let Err(e) = router.reseed().await { + error!("Failed to reseed router: {}", e); + return; + } + } + + let router_task = tokio::spawn(async move { + info!("Starting router service..."); + match router.start().await { + Ok(_) => info!("Router service stopped"), + Err(e) => error!("Router service error: {}", e), + } + }); + + is_running_clone.store(true, Ordering::SeqCst); + info!("Router manager is now running"); + + while let Some(msg) = rx.recv().await { + match msg { + RouterMessage::Stop => { + info!("Stopping router..."); + // TODO: Implement proper router shutdown + is_running_clone.store(false, Ordering::SeqCst); + break; + } + RouterMessage::Status => { + debug!( + "Router status: {}", + if is_running_clone.load(Ordering::SeqCst) { + "running" + } else { + "stopped" + } + ); + } + _ => {} + } + } + + if let Err(e) = router_task.await { + error!("Router task panicked: {}", e); + } + + info!("Router manager task finished"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + Ok(Self { + handle: Some(handle), + tx, + is_running, + }) + } + + pub async fn stop(&self) { + if self.is_running.load(Ordering::SeqCst) { + if let Err(e) = self.tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message to router: {}", e); + } + } + } + + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::SeqCst) + } +} + +impl Drop for RouterManager { + fn drop(&mut self) { + if self.is_running() { + let tx = self.tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message during drop: {}", e); + } + }); + } + } +} diff --git a/src/router/mod.rs b/src/router/mod.rs index 772c177..91da2e5 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,2 +1,2 @@ +pub mod manager; pub mod router; - diff --git a/src/router/router.rs b/src/router/router.rs index c438d75..7c36382 100644 --- a/src/router/router.rs +++ b/src/router/router.rs @@ -1,92 +1,149 @@ -use emissary_util::storage::{Storage, StorageBundle}; +use emissary_core::router::Router; +use emissary_core::{Config, Ntcp2Config, SamConfig, TransitConfig}; +use emissary_util::port_mapper::{PortMapper, PortMapperConfig}; use emissary_util::reseeder::Reseeder; +use emissary_util::runtime::tokio::Runtime as TokioRuntime; +use emissary_util::storage::{Storage, StorageBundle}; +use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use std::sync::Arc; +use std::future::Future; - -pub trait RouterConfigUtils { - async fn config(&mut self) -> Result<(StorageBundle, Storage), Box<dyn std::error::Error>>; - async fn prepare_router(&mut self) -> Result<(), Box<dyn std::error::Error>>; - async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box<dyn std::error::Error>>; +pub trait RouterUtils { + fn config(&mut self) -> impl Future<Output = Result<(), Box<dyn std::error::Error>>> + Send; + fn reseed(&mut self) -> impl Future<Output = Result<(), Box<dyn std::error::Error>>> + Send; + fn start(self) -> impl Future<Output = Result<Router<TokioRuntime>, Box<dyn std::error::Error>>> + Send; } +#[derive(Debug, Deserialize, Serialize)] pub struct EmissaryConfig { - Storage: Option<PathBuf>, - AutoUpdate: Option<bool>, - HttpProxtPort: Option<u16>, - SocksProxyPort: Option<u16>, + pub storage: Option<PathBuf>, + pub auto_update: Option<bool>, + pub http_proxy_port: Option<u16>, + pub socks_proxy_port: Option<u16>, } -impl RouterConfigUtils for EmissaryConfig { - /// Configures the router with the given configuration. - /// - /// If the `Storage` field is `None`, it will be set to `/var/lib/mesk/router/`. - /// - /// # Errors - /// - /// Returns an error if there's an issue while configuring the router. - async fn config(&mut self) -> Result<(StorageBundle, Storage), Box<dyn std::error::Error>> { - self.Storage.get_or_insert_with(|| PathBuf::from("/var/lib/mesk/router/")); - self.AutoUpdate.get_or_insert(true); - self.HttpProxtPort.get_or_insert(4445); - self.SocksProxyPort.get_or_insert(4446); - - let storage = Storage::new(self.Storage.clone().into()).await?; - - let StorageBundle { - ntcp2_iv, - ntcp2_key, - profiles, - router_info, - routers, - signing_key, - static_key, - ssu2_intro_key, - ssu2_static_key, - } = storage.load().await; - - Ok((StorageBundle { - ntcp2_iv, - ntcp2_key, - profiles, - router_info, - routers, - signing_key, - static_key, - ssu2_intro_key, - ssu2_static_key, - }, storage)) +pub struct Emissary { + config: EmissaryConfig, + storage: Option<Storage>, + storage_bundle: Option<StorageBundle>, +} + +impl Emissary { + pub fn new(config: EmissaryConfig) -> Self { + Self { + config, + storage: None, + storage_bundle: None, + } + } + + fn ensure_configured(&self) -> Result<(), Box<dyn std::error::Error>> { + if self.storage.is_none() || self.storage_bundle.is_none() { + Err("Router not configured. Call `config()` first.".into()) + } else { + Ok(()) + } + } +} + +impl RouterUtils for Emissary { + async fn config(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if self.config.storage.is_none() { + self.config.storage = Some(PathBuf::from("/var/lib/mesk/router/")); + } + if self.config.auto_update.is_none() { + self.config.auto_update = Some(true); + } + if self.config.http_proxy_port.is_none() { + self.config.http_proxy_port = Some(4445); + } + if self.config.socks_proxy_port.is_none() { + self.config.socks_proxy_port = Some(4446); + } + + let storage = Storage::new(Some(self.config.storage.clone().unwrap())).await?; + let bundle = storage.load().await; + + self.storage = Some(storage); + self.storage_bundle = Some(bundle); + + Ok(()) } - async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box<dyn std::error::Error>> { - let mut routers = config.routers.clone(); - - - if routers.is_empty() { + async fn reseed(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.ensure_configured()?; + + let storage = self.storage.as_ref().unwrap(); + let mut bundle = self.storage_bundle.take().unwrap(); + + if bundle.routers.is_empty() { match Reseeder::reseed(None, false).await { Ok(reseed_routers) => { for info in reseed_routers { - let _ = storage + if let Err(e) = storage .store_router_info(info.name.to_string(), info.router_info.clone()) - .await; - routers.push(info.router_info); + .await + { + log::warn!("Failed to store reseeded router info: {}", e); + } + bundle.routers.push(info.router_info); } } - Err(_) if routers.is_empty() => { - return Err("Could not reseed routers.".into()); - } - Err(error) => { - log::warn!( - "Failed to reseed routers: {} - using existing routers", error.to_string(), - ); + Err(e) => { + if bundle.routers.is_empty() { + self.storage_bundle = Some(bundle); + return Err(format!("Reseed failed and no routers available: {}", e).into()); + } else { + log::warn!("Reseed failed, but using existing routers: {}", e); + } } } - } + } + + self.storage_bundle = Some(bundle); Ok(()) } - async fn prepare_router(&mut self) -> Result<(), Box<dyn std::error::Error>> { - - - Ok(()) + async fn start(self) -> Result<Router<TokioRuntime>, Box<dyn std::error::Error>> { + let storage = self.storage.unwrap(); + let bundle = self.storage_bundle.unwrap(); + + let config = Config { + ntcp2: Some(Ntcp2Config { + port: 25515, + key: bundle.ntcp2_key, + iv: bundle.ntcp2_iv, + publish: true, + host: None, + }), + samv3_config: Some(SamConfig { + tcp_port: self.config.http_proxy_port.unwrap_or(4445), + udp_port: self.config.socks_proxy_port.unwrap_or(4446), + host: "127.0.0.1".to_string(), + }), + routers: bundle.routers, + profiles: bundle.profiles, + router_info: bundle.router_info, + static_key: Some(bundle.static_key), + signing_key: Some(bundle.signing_key), + transit: Some(TransitConfig { + max_tunnels: Some(1000), + }), + ..Default::default() + }; + + let (router, _events, router_info) = Router::<TokioRuntime>::new( + config, + None, // AddressBook + Some(Arc::new(storage.clone())), + ) + .await + .map_err(|e| format!("Router creation failed: {}", e))?; + + // Сохраняем router_info, если он новый + storage.store_local_router_info(router_info).await?; + + Ok(router) } -}
\ No newline at end of file +} |
