From bbcbdcbbf95cda5115bbb484b5e2d01669a7a1a0 Mon Sep 17 00:00:00 2001 From: Namilskyy Date: Sun, 28 Dec 2025 15:19:04 +0300 Subject: Implementing integrated router funtions and other fuctions --- src/router/manager.rs | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/router/manager.rs (limited to 'src/router/manager.rs') diff --git a/src/router/manager.rs b/src/router/manager.rs new file mode 100644 index 0000000..23b7c69 --- /dev/null +++ b/src/router/manager.rs @@ -0,0 +1,144 @@ +use crate::router::router::RouterUtils; +use crate::router::router::{Emissary, EmissaryConfig}; +use log::{debug, error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +#[derive(Debug)] +pub enum RouterMessage { + Start, + Stop, + Status, +} + +pub struct RouterManager { + handle: Option>, + tx: mpsc::Sender, + is_running: Arc, +} + +impl RouterManager { + pub async fn new( + config: &crate::cfg::config::Config, + ) -> Result> { + if !config.router.integrated_router { + return Ok(Self { + handle: None, + tx: { + let (tx, _) = mpsc::channel(1); + tx + }, + is_running: Arc::new(AtomicBool::new(false)), + }); + } + + let (tx, mut rx) = mpsc::channel::(1); + let is_running = Arc::new(AtomicBool::new(false)); + let is_running_clone = is_running.clone(); + + let router_config = config.router.clone(); + let handle = tokio::spawn(async move { + info!("Router manager task started"); + + let mut router = Emissary::new(EmissaryConfig { + storage: Some(std::path::PathBuf::from(&router_config.storage_path)), + auto_update: Some(router_config.auto_update), + http_proxy_port: Some(router_config.http_proxy_port), + socks_proxy_port: Some(router_config.socks_proxy_port), + }); + + if let Err(e) = router.config().await { + error!("Failed to configure router: {}", e); + return; + } + + let storage_path = std::path::Path::new(&router_config.storage_path); + if !storage_path.exists() + || storage_path + .read_dir() + .ok() + .is_none_or(|mut d| d.next().is_none()) + { + info!("Router storage is empty, performing initial reseed..."); + if let Err(e) = router.reseed().await { + error!("Failed to reseed router: {}", e); + return; + } + } + + let router_task = tokio::spawn(async move { + info!("Starting router service..."); + match router.start().await { + Ok(_) => info!("Router service stopped"), + Err(e) => error!("Router service error: {}", e), + } + }); + + is_running_clone.store(true, Ordering::SeqCst); + info!("Router manager is now running"); + + while let Some(msg) = rx.recv().await { + match msg { + RouterMessage::Stop => { + info!("Stopping router..."); + // TODO: Implement proper router shutdown + is_running_clone.store(false, Ordering::SeqCst); + break; + } + RouterMessage::Status => { + debug!( + "Router status: {}", + if is_running_clone.load(Ordering::SeqCst) { + "running" + } else { + "stopped" + } + ); + } + _ => {} + } + } + + if let Err(e) = router_task.await { + error!("Router task panicked: {}", e); + } + + info!("Router manager task finished"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + Ok(Self { + handle: Some(handle), + tx, + is_running, + }) + } + + pub async fn stop(&self) { + if self.is_running.load(Ordering::SeqCst) { + if let Err(e) = self.tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message to router: {}", e); + } + } + } + + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::SeqCst) + } +} + +impl Drop for RouterManager { + fn drop(&mut self) { + if self.is_running() { + let tx = self.tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message during drop: {}", e); + } + }); + } + } +} -- cgit v1.2.3