diff options
Diffstat (limited to 'src/router/manager.rs')
| -rw-r--r-- | src/router/manager.rs | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/src/router/manager.rs b/src/router/manager.rs new file mode 100644 index 0000000..23b7c69 --- /dev/null +++ b/src/router/manager.rs @@ -0,0 +1,144 @@ +use crate::router::router::RouterUtils; +use crate::router::router::{Emissary, EmissaryConfig}; +use log::{debug, error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +#[derive(Debug)] +pub enum RouterMessage { + Start, + Stop, + Status, +} + +pub struct RouterManager { + handle: Option<JoinHandle<()>>, + tx: mpsc::Sender<RouterMessage>, + is_running: Arc<AtomicBool>, +} + +impl RouterManager { + pub async fn new( + config: &crate::cfg::config::Config, + ) -> Result<Self, Box<dyn std::error::Error>> { + if !config.router.integrated_router { + return Ok(Self { + handle: None, + tx: { + let (tx, _) = mpsc::channel(1); + tx + }, + is_running: Arc::new(AtomicBool::new(false)), + }); + } + + let (tx, mut rx) = mpsc::channel::<RouterMessage>(1); + let is_running = Arc::new(AtomicBool::new(false)); + let is_running_clone = is_running.clone(); + + let router_config = config.router.clone(); + let handle = tokio::spawn(async move { + info!("Router manager task started"); + + let mut router = Emissary::new(EmissaryConfig { + storage: Some(std::path::PathBuf::from(&router_config.storage_path)), + auto_update: Some(router_config.auto_update), + http_proxy_port: Some(router_config.http_proxy_port), + socks_proxy_port: Some(router_config.socks_proxy_port), + }); + + if let Err(e) = router.config().await { + error!("Failed to configure router: {}", e); + return; + } + + let storage_path = std::path::Path::new(&router_config.storage_path); + if !storage_path.exists() + || storage_path + .read_dir() + .ok() + .is_none_or(|mut d| d.next().is_none()) + { + info!("Router storage is empty, performing initial reseed..."); + if let Err(e) = router.reseed().await { + error!("Failed to reseed router: {}", e); + return; + } + } + + let router_task = tokio::spawn(async move { + info!("Starting router service..."); + match router.start().await { + Ok(_) => info!("Router service stopped"), + Err(e) => error!("Router service error: {}", e), + } + }); + + is_running_clone.store(true, Ordering::SeqCst); + info!("Router manager is now running"); + + while let Some(msg) = rx.recv().await { + match msg { + RouterMessage::Stop => { + info!("Stopping router..."); + // TODO: Implement proper router shutdown + is_running_clone.store(false, Ordering::SeqCst); + break; + } + RouterMessage::Status => { + debug!( + "Router status: {}", + if is_running_clone.load(Ordering::SeqCst) { + "running" + } else { + "stopped" + } + ); + } + _ => {} + } + } + + if let Err(e) = router_task.await { + error!("Router task panicked: {}", e); + } + + info!("Router manager task finished"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + Ok(Self { + handle: Some(handle), + tx, + is_running, + }) + } + + pub async fn stop(&self) { + if self.is_running.load(Ordering::SeqCst) { + if let Err(e) = self.tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message to router: {}", e); + } + } + } + + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::SeqCst) + } +} + +impl Drop for RouterManager { + fn drop(&mut self) { + if self.is_running() { + let tx = self.tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(RouterMessage::Stop).await { + error!("Failed to send stop message during drop: {}", e); + } + }); + } + } +} |
