summaryrefslogtreecommitdiff
path: root/src/router
diff options
context:
space:
mode:
authorNamilskyy <alive6863@gmail.com>2025-12-28 15:19:04 +0300
committerNamilskyy <alive6863@gmail.com>2025-12-28 15:19:04 +0300
commitbbcbdcbbf95cda5115bbb484b5e2d01669a7a1a0 (patch)
treed57a5ec1044713f2ebbdf694f8b6ecca7f800edf /src/router
parent007ee0bc3534218b2d084f368444d96c16d9d7f9 (diff)
Implementing integrated router funtions and other fuctions
Diffstat (limited to 'src/router')
-rw-r--r--src/router/manager.rs144
-rw-r--r--src/router/mod.rs2
-rw-r--r--src/router/router.rs197
3 files changed, 272 insertions, 71 deletions
diff --git a/src/router/manager.rs b/src/router/manager.rs
new file mode 100644
index 0000000..23b7c69
--- /dev/null
+++ b/src/router/manager.rs
@@ -0,0 +1,144 @@
+use crate::router::router::RouterUtils;
+use crate::router::router::{Emissary, EmissaryConfig};
+use log::{debug, error, info};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+#[derive(Debug)]
+pub enum RouterMessage {
+ Start,
+ Stop,
+ Status,
+}
+
+pub struct RouterManager {
+ handle: Option<JoinHandle<()>>,
+ tx: mpsc::Sender<RouterMessage>,
+ is_running: Arc<AtomicBool>,
+}
+
+impl RouterManager {
+ pub async fn new(
+ config: &crate::cfg::config::Config,
+ ) -> Result<Self, Box<dyn std::error::Error>> {
+ if !config.router.integrated_router {
+ return Ok(Self {
+ handle: None,
+ tx: {
+ let (tx, _) = mpsc::channel(1);
+ tx
+ },
+ is_running: Arc::new(AtomicBool::new(false)),
+ });
+ }
+
+ let (tx, mut rx) = mpsc::channel::<RouterMessage>(1);
+ let is_running = Arc::new(AtomicBool::new(false));
+ let is_running_clone = is_running.clone();
+
+ let router_config = config.router.clone();
+ let handle = tokio::spawn(async move {
+ info!("Router manager task started");
+
+ let mut router = Emissary::new(EmissaryConfig {
+ storage: Some(std::path::PathBuf::from(&router_config.storage_path)),
+ auto_update: Some(router_config.auto_update),
+ http_proxy_port: Some(router_config.http_proxy_port),
+ socks_proxy_port: Some(router_config.socks_proxy_port),
+ });
+
+ if let Err(e) = router.config().await {
+ error!("Failed to configure router: {}", e);
+ return;
+ }
+
+ let storage_path = std::path::Path::new(&router_config.storage_path);
+ if !storage_path.exists()
+ || storage_path
+ .read_dir()
+ .ok()
+ .is_none_or(|mut d| d.next().is_none())
+ {
+ info!("Router storage is empty, performing initial reseed...");
+ if let Err(e) = router.reseed().await {
+ error!("Failed to reseed router: {}", e);
+ return;
+ }
+ }
+
+ let router_task = tokio::spawn(async move {
+ info!("Starting router service...");
+ match router.start().await {
+ Ok(_) => info!("Router service stopped"),
+ Err(e) => error!("Router service error: {}", e),
+ }
+ });
+
+ is_running_clone.store(true, Ordering::SeqCst);
+ info!("Router manager is now running");
+
+ while let Some(msg) = rx.recv().await {
+ match msg {
+ RouterMessage::Stop => {
+ info!("Stopping router...");
+ // TODO: Implement proper router shutdown
+ is_running_clone.store(false, Ordering::SeqCst);
+ break;
+ }
+ RouterMessage::Status => {
+ debug!(
+ "Router status: {}",
+ if is_running_clone.load(Ordering::SeqCst) {
+ "running"
+ } else {
+ "stopped"
+ }
+ );
+ }
+ _ => {}
+ }
+ }
+
+ if let Err(e) = router_task.await {
+ error!("Router task panicked: {}", e);
+ }
+
+ info!("Router manager task finished");
+ });
+
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+
+ Ok(Self {
+ handle: Some(handle),
+ tx,
+ is_running,
+ })
+ }
+
+ pub async fn stop(&self) {
+ if self.is_running.load(Ordering::SeqCst) {
+ if let Err(e) = self.tx.send(RouterMessage::Stop).await {
+ error!("Failed to send stop message to router: {}", e);
+ }
+ }
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.is_running.load(Ordering::SeqCst)
+ }
+}
+
+impl Drop for RouterManager {
+ fn drop(&mut self) {
+ if self.is_running() {
+ let tx = self.tx.clone();
+ tokio::spawn(async move {
+ if let Err(e) = tx.send(RouterMessage::Stop).await {
+ error!("Failed to send stop message during drop: {}", e);
+ }
+ });
+ }
+ }
+}
diff --git a/src/router/mod.rs b/src/router/mod.rs
index 772c177..91da2e5 100644
--- a/src/router/mod.rs
+++ b/src/router/mod.rs
@@ -1,2 +1,2 @@
+pub mod manager;
pub mod router;
-
diff --git a/src/router/router.rs b/src/router/router.rs
index c438d75..7c36382 100644
--- a/src/router/router.rs
+++ b/src/router/router.rs
@@ -1,92 +1,149 @@
-use emissary_util::storage::{Storage, StorageBundle};
+use emissary_core::router::Router;
+use emissary_core::{Config, Ntcp2Config, SamConfig, TransitConfig};
+use emissary_util::port_mapper::{PortMapper, PortMapperConfig};
use emissary_util::reseeder::Reseeder;
+use emissary_util::runtime::tokio::Runtime as TokioRuntime;
+use emissary_util::storage::{Storage, StorageBundle};
+use serde::{Deserialize, Serialize};
use std::path::PathBuf;
+use std::sync::Arc;
+use std::future::Future;
-
-pub trait RouterConfigUtils {
- async fn config(&mut self) -> Result<(StorageBundle, Storage), Box<dyn std::error::Error>>;
- async fn prepare_router(&mut self) -> Result<(), Box<dyn std::error::Error>>;
- async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box<dyn std::error::Error>>;
+pub trait RouterUtils {
+ fn config(&mut self) -> impl Future<Output = Result<(), Box<dyn std::error::Error>>> + Send;
+ fn reseed(&mut self) -> impl Future<Output = Result<(), Box<dyn std::error::Error>>> + Send;
+ fn start(self) -> impl Future<Output = Result<Router<TokioRuntime>, Box<dyn std::error::Error>>> + Send;
}
+#[derive(Debug, Deserialize, Serialize)]
pub struct EmissaryConfig {
- Storage: Option<PathBuf>,
- AutoUpdate: Option<bool>,
- HttpProxtPort: Option<u16>,
- SocksProxyPort: Option<u16>,
+ pub storage: Option<PathBuf>,
+ pub auto_update: Option<bool>,
+ pub http_proxy_port: Option<u16>,
+ pub socks_proxy_port: Option<u16>,
}
-impl RouterConfigUtils for EmissaryConfig {
- /// Configures the router with the given configuration.
- ///
- /// If the `Storage` field is `None`, it will be set to `/var/lib/mesk/router/`.
- ///
- /// # Errors
- ///
- /// Returns an error if there's an issue while configuring the router.
- async fn config(&mut self) -> Result<(StorageBundle, Storage), Box<dyn std::error::Error>> {
- self.Storage.get_or_insert_with(|| PathBuf::from("/var/lib/mesk/router/"));
- self.AutoUpdate.get_or_insert(true);
- self.HttpProxtPort.get_or_insert(4445);
- self.SocksProxyPort.get_or_insert(4446);
-
- let storage = Storage::new(self.Storage.clone().into()).await?;
-
- let StorageBundle {
- ntcp2_iv,
- ntcp2_key,
- profiles,
- router_info,
- routers,
- signing_key,
- static_key,
- ssu2_intro_key,
- ssu2_static_key,
- } = storage.load().await;
-
- Ok((StorageBundle {
- ntcp2_iv,
- ntcp2_key,
- profiles,
- router_info,
- routers,
- signing_key,
- static_key,
- ssu2_intro_key,
- ssu2_static_key,
- }, storage))
+pub struct Emissary {
+ config: EmissaryConfig,
+ storage: Option<Storage>,
+ storage_bundle: Option<StorageBundle>,
+}
+
+impl Emissary {
+ pub fn new(config: EmissaryConfig) -> Self {
+ Self {
+ config,
+ storage: None,
+ storage_bundle: None,
+ }
+ }
+
+ fn ensure_configured(&self) -> Result<(), Box<dyn std::error::Error>> {
+ if self.storage.is_none() || self.storage_bundle.is_none() {
+ Err("Router not configured. Call `config()` first.".into())
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl RouterUtils for Emissary {
+ async fn config(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ if self.config.storage.is_none() {
+ self.config.storage = Some(PathBuf::from("/var/lib/mesk/router/"));
+ }
+ if self.config.auto_update.is_none() {
+ self.config.auto_update = Some(true);
+ }
+ if self.config.http_proxy_port.is_none() {
+ self.config.http_proxy_port = Some(4445);
+ }
+ if self.config.socks_proxy_port.is_none() {
+ self.config.socks_proxy_port = Some(4446);
+ }
+
+ let storage = Storage::new(Some(self.config.storage.clone().unwrap())).await?;
+ let bundle = storage.load().await;
+
+ self.storage = Some(storage);
+ self.storage_bundle = Some(bundle);
+
+ Ok(())
}
- async fn reseed(&mut self, config: &StorageBundle, storage: &Storage) -> Result<(), Box<dyn std::error::Error>> {
- let mut routers = config.routers.clone();
-
-
- if routers.is_empty() {
+ async fn reseed(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ self.ensure_configured()?;
+
+ let storage = self.storage.as_ref().unwrap();
+ let mut bundle = self.storage_bundle.take().unwrap();
+
+ if bundle.routers.is_empty() {
match Reseeder::reseed(None, false).await {
Ok(reseed_routers) => {
for info in reseed_routers {
- let _ = storage
+ if let Err(e) = storage
.store_router_info(info.name.to_string(), info.router_info.clone())
- .await;
- routers.push(info.router_info);
+ .await
+ {
+ log::warn!("Failed to store reseeded router info: {}", e);
+ }
+ bundle.routers.push(info.router_info);
}
}
- Err(_) if routers.is_empty() => {
- return Err("Could not reseed routers.".into());
- }
- Err(error) => {
- log::warn!(
- "Failed to reseed routers: {} - using existing routers", error.to_string(),
- );
+ Err(e) => {
+ if bundle.routers.is_empty() {
+ self.storage_bundle = Some(bundle);
+ return Err(format!("Reseed failed and no routers available: {}", e).into());
+ } else {
+ log::warn!("Reseed failed, but using existing routers: {}", e);
+ }
}
}
- }
+ }
+
+ self.storage_bundle = Some(bundle);
Ok(())
}
- async fn prepare_router(&mut self) -> Result<(), Box<dyn std::error::Error>> {
-
-
- Ok(())
+ async fn start(self) -> Result<Router<TokioRuntime>, Box<dyn std::error::Error>> {
+ let storage = self.storage.unwrap();
+ let bundle = self.storage_bundle.unwrap();
+
+ let config = Config {
+ ntcp2: Some(Ntcp2Config {
+ port: 25515,
+ key: bundle.ntcp2_key,
+ iv: bundle.ntcp2_iv,
+ publish: true,
+ host: None,
+ }),
+ samv3_config: Some(SamConfig {
+ tcp_port: self.config.http_proxy_port.unwrap_or(4445),
+ udp_port: self.config.socks_proxy_port.unwrap_or(4446),
+ host: "127.0.0.1".to_string(),
+ }),
+ routers: bundle.routers,
+ profiles: bundle.profiles,
+ router_info: bundle.router_info,
+ static_key: Some(bundle.static_key),
+ signing_key: Some(bundle.signing_key),
+ transit: Some(TransitConfig {
+ max_tunnels: Some(1000),
+ }),
+ ..Default::default()
+ };
+
+ let (router, _events, router_info) = Router::<TokioRuntime>::new(
+ config,
+ None, // AddressBook
+ Some(Arc::new(storage.clone())),
+ )
+ .await
+ .map_err(|e| format!("Router creation failed: {}", e))?;
+
+ // Сохраняем router_info, если он новый
+ storage.store_local_router_info(router_info).await?;
+
+ Ok(router)
}
-} \ No newline at end of file
+}