summaryrefslogtreecommitdiff
path: root/src/router/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/router/manager.rs')
-rw-r--r--src/router/manager.rs144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/router/manager.rs b/src/router/manager.rs
new file mode 100644
index 0000000..23b7c69
--- /dev/null
+++ b/src/router/manager.rs
@@ -0,0 +1,144 @@
+use crate::router::router::RouterUtils;
+use crate::router::router::{Emissary, EmissaryConfig};
+use log::{debug, error, info};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+#[derive(Debug)]
+pub enum RouterMessage {
+ Start,
+ Stop,
+ Status,
+}
+
+pub struct RouterManager {
+ handle: Option<JoinHandle<()>>,
+ tx: mpsc::Sender<RouterMessage>,
+ is_running: Arc<AtomicBool>,
+}
+
+impl RouterManager {
+ pub async fn new(
+ config: &crate::cfg::config::Config,
+ ) -> Result<Self, Box<dyn std::error::Error>> {
+ if !config.router.integrated_router {
+ return Ok(Self {
+ handle: None,
+ tx: {
+ let (tx, _) = mpsc::channel(1);
+ tx
+ },
+ is_running: Arc::new(AtomicBool::new(false)),
+ });
+ }
+
+ let (tx, mut rx) = mpsc::channel::<RouterMessage>(1);
+ let is_running = Arc::new(AtomicBool::new(false));
+ let is_running_clone = is_running.clone();
+
+ let router_config = config.router.clone();
+ let handle = tokio::spawn(async move {
+ info!("Router manager task started");
+
+ let mut router = Emissary::new(EmissaryConfig {
+ storage: Some(std::path::PathBuf::from(&router_config.storage_path)),
+ auto_update: Some(router_config.auto_update),
+ http_proxy_port: Some(router_config.http_proxy_port),
+ socks_proxy_port: Some(router_config.socks_proxy_port),
+ });
+
+ if let Err(e) = router.config().await {
+ error!("Failed to configure router: {}", e);
+ return;
+ }
+
+ let storage_path = std::path::Path::new(&router_config.storage_path);
+ if !storage_path.exists()
+ || storage_path
+ .read_dir()
+ .ok()
+ .is_none_or(|mut d| d.next().is_none())
+ {
+ info!("Router storage is empty, performing initial reseed...");
+ if let Err(e) = router.reseed().await {
+ error!("Failed to reseed router: {}", e);
+ return;
+ }
+ }
+
+ let router_task = tokio::spawn(async move {
+ info!("Starting router service...");
+ match router.start().await {
+ Ok(_) => info!("Router service stopped"),
+ Err(e) => error!("Router service error: {}", e),
+ }
+ });
+
+ is_running_clone.store(true, Ordering::SeqCst);
+ info!("Router manager is now running");
+
+ while let Some(msg) = rx.recv().await {
+ match msg {
+ RouterMessage::Stop => {
+ info!("Stopping router...");
+ // TODO: Implement proper router shutdown
+ is_running_clone.store(false, Ordering::SeqCst);
+ break;
+ }
+ RouterMessage::Status => {
+ debug!(
+ "Router status: {}",
+ if is_running_clone.load(Ordering::SeqCst) {
+ "running"
+ } else {
+ "stopped"
+ }
+ );
+ }
+ _ => {}
+ }
+ }
+
+ if let Err(e) = router_task.await {
+ error!("Router task panicked: {}", e);
+ }
+
+ info!("Router manager task finished");
+ });
+
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+
+ Ok(Self {
+ handle: Some(handle),
+ tx,
+ is_running,
+ })
+ }
+
+ pub async fn stop(&self) {
+ if self.is_running.load(Ordering::SeqCst) {
+ if let Err(e) = self.tx.send(RouterMessage::Stop).await {
+ error!("Failed to send stop message to router: {}", e);
+ }
+ }
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.is_running.load(Ordering::SeqCst)
+ }
+}
+
+impl Drop for RouterManager {
+ fn drop(&mut self) {
+ if self.is_running() {
+ let tx = self.tx.clone();
+ tokio::spawn(async move {
+ if let Err(e) = tx.send(RouterMessage::Stop).await {
+ error!("Failed to send stop message during drop: {}", e);
+ }
+ });
+ }
+ }
+}