diff options
Diffstat (limited to 'src/net/i2p_package.rs')
| -rw-r--r-- | src/net/i2p_package.rs | 342 |
1 files changed, 163 insertions, 179 deletions
diff --git a/src/net/i2p_package.rs b/src/net/i2p_package.rs index a9d39d2..cebfa34 100644 --- a/src/net/i2p_package.rs +++ b/src/net/i2p_package.rs @@ -1,23 +1,17 @@ use crate::cfg::config::Config; use crate::pkgtoolkit::Package; -use crate::pkgtoolkit::archive::ArchiveOperations; -use serde::Deserialize; -use tokio; - -/* -use emissary_core::runtime::{ - AsyncRead, - AsyncWrite, -}; -*/ - +use flate2::read::GzDecoder; +use futures_util::stream::TryStreamExt; use indicatif::{ProgressBar, ProgressStyle}; +use reqwest; +use serde::Deserialize; +use std::fs::File as StdFile; use std::{collections::HashMap, path::Path}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use tar::Archive; +use tokio::{fs::File, io::AsyncWriteExt}; +use crate::pkgtoolkit::archive::ArchiveOperations; +use crate::pkgtoolkit::install::InstallOperations; -use url; -use yosemite::Session; -use yosemite::SessionOptions; pub struct I2PPackage { pub config: Config, @@ -30,81 +24,69 @@ struct IndexData { } impl I2PPackage { - /// Creates a new I2P object with the given configuration. + /// Creates a new I2PPackage object with the given configuration. /// /// # Returns /// - /// A new I2P object with the given configuration. The session is initially set to None and the connected status is set to false. - pub fn new(cfg: Config) -> Self { + /// A new I2PPackage object with the given configuration. + pub fn new(config: Config) -> Self { I2PPackage { - config: cfg, + config, index_packages: None, } } - /// Downloads the INDEX.tar.gz file from the configured repository - /// and stores it in the configured cache directory. + /// Creates a reqwest client configured to use the I2P HTTP proxy. + /// + /// # Returns + /// + /// A reqwest Client with proxy configuration for I2P. + fn create_proxy_client(&self) -> Result<reqwest::Client, Box<dyn std::error::Error>> { + let proxy_url = format!("http://127.0.0.1:{}", self.config.repo.i2p_http_proxy_port); + let proxy = reqwest::Proxy::http(&proxy_url)?; + let client = reqwest::Client::builder() + .proxy(proxy) + .build()?; + Ok(client) + } + + /// Downloads the INDEX.tar.gz file from the configured I2P repository + /// and stores it in the configured cache directory with a progress bar. /// /// # Errors /// /// Returns an error if the request fails, if the response status is not successful, or if there's an issue while reading or writing the file. pub async fn fetch_index(&mut self) -> Result<bool, Box<dyn std::error::Error>> { - let repo_url_str = &self.config.repo.repo_url; + let repo_url = &self.config.repo.repo_url; let cache_dir = &self.config.paths.cache_dir; - let url = url::Url::parse(repo_url_str)?; - let host = url.host_str().ok_or("No host in URL")?; + log::debug!("Cache directory: {:?}", cache_dir); - let request_path = url.path(); - let request_path = if request_path.ends_with(".tar.gz") { - request_path.to_string() + let index_url = if repo_url.ends_with(".tar.gz") { + repo_url.to_string() } else { - format!("{}/INDEX.tar.gz", request_path.trim_end_matches('/')) + format!("{}/INDEX.tar.gz", repo_url.trim_end_matches('/')) }; - let session_options = SessionOptions::default(); - let mut session = Session::new(session_options).await?; - let mut stream = session.connect(host).await?; + let client = self.create_proxy_client()?; - let request = format!( - "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n", - request_path, host - ); - stream.write_all(request.as_bytes()).await?; - - let mut reader = BufReader::new(stream); - let mut response_buffer = Vec::new(); - reader.read_to_end(&mut response_buffer).await?; - - let headers_end = response_buffer - .windows(4) - .position(|window| window == b"\r\n\r\n") - .ok_or("Invalid response: no headers end")?; - - let headers_str = std::str::from_utf8(&response_buffer[..headers_end])?; - if !headers_str.starts_with("HTTP/1.1 200") && !headers_str.starts_with("HTTP/1.0 200") { - return Err(format!( - "HTTP Error: {}", - headers_str.lines().next().unwrap_or("Unknown") - ) - .into()); - } - - let content_length = headers_str - .lines() - .find(|line| line.to_lowercase().starts_with("content-length:")) - .and_then(|line| line.split_at(15).1.trim().parse::<u64>().ok()) + // Make a HEAD request to get the content length for the progress bar + let head_response = client.head(&index_url).send().await?; + let content_length: u64 = head_response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) .unwrap_or(0); - let body_start = headers_end + 4; - let mut body_reader = std::io::Cursor::new(&response_buffer[body_start..]); - let file_path = Path::new(cache_dir).join("INDEX.tar.gz"); - + // Create progress bar let pb = if content_length > 0 { let pb = ProgressBar::new(content_length); - pb.set_style(ProgressStyle::default_bar() - .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")? - .progress_chars("#>-")); + pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")? + .progress_chars("#>-"), + ); pb } else { let pb = ProgressBar::new_spinner(); @@ -115,34 +97,41 @@ impl I2PPackage { pb }; - let mut file = tokio::fs::File::create(&file_path).await?; - - let chunk_size = 8192u64; - let mut buffer = vec![0; chunk_size as usize]; + // Send GET request and stream the response body + let response = client.get(&index_url).send().await?; + if !response.status().is_success() { + return Err(format!("HTTP Error: {}", response.status()).into()); + } - loop { - let bytes_read_result = std::io::Read::read(&mut body_reader, &mut buffer); - let bytes_read = match bytes_read_result { - Ok(n) => n, - Err(e) => return Err(e.into()), - }; + let mut stream = response.bytes_stream(); + let file_path = Path::new(cache_dir).join("INDEX.tar.gz"); - if bytes_read == 0 { - break; - } - file.write_all(&buffer[..bytes_read]).await?; + // Ensure cache_dir exists + tokio::fs::create_dir_all(cache_dir) + .await + .map_err(|e| format!("Failed to create cache dir: {}", e))?; - pb.inc(bytes_read as u64); + let mut file = File::create(&file_path).await?; + let mut downloaded: u64 = 0; - if bytes_read < chunk_size as usize { - break; - } + while let Some(chunk) = stream.try_next().await? { + file.write_all(&chunk).await?; + let chunk_len = chunk.len() as u64; + downloaded += chunk_len; + pb.set_position(downloaded); } pb.finish_with_message("INDEX.tar.gz download finished"); log::info!("Extracting INDEX.tar.gz to cache directory..."); - Package::extract_archive(&file_path.to_string_lossy())?; + + let archive_file = + StdFile::open(&file_path).map_err(|e| format!("Failed to open archive: {}", e))?; + let gz_decoder = GzDecoder::new(archive_file); + let mut archive = Archive::new(gz_decoder); + archive + .unpack(cache_dir) + .map_err(|e| format!("Failed to unpack archive: {}", e))?; let index_toml_path = Path::new(cache_dir).join("INDEX.toml"); if !index_toml_path.exists() { @@ -151,13 +140,14 @@ impl I2PPackage { return Ok(true); } - let index_content = std::fs::read_to_string(&index_toml_path)?; - let index_data: IndexData = toml::from_str(&index_content)?; + let index_content = tokio::fs::read_to_string(&index_toml_path).await?; + log::debug!("Content of INDEX.toml:\n{}", index_content); + + let index_data: IndexData = toml::from_str(&index_content) + .map_err(|e| format!("Failed to parse INDEX.toml: {}", e))?; let mut package_map = HashMap::new(); for pkg in index_data.packages { - // PKG_URL = /repo/package.mesk - // FULL URL = "http://mesk.anthrill.i2p/i2p/repo/pkg.mesk" let base_url = url::Url::parse(&self.config.repo.repo_url)?; let full_url = base_url.join(&pkg.url)?; let mut pkg_clone = pkg.clone(); @@ -178,25 +168,38 @@ impl I2PPackage { /// An internal auxiliary function for downloading data and writing it to a file with a progress display. /// /// # Arguments - /// * `data' - Byte slice (&[u8]) with data to write. - /// * `file_path' is the path to the file to write the data to. - /// * `content_length' is the expected data size (for the progress bar). Maybe 0. - /// * `description' - Description of the operation for the progress bar. + /// * `client` - The reqwest client to use for requests. + /// * `url` - The URL to download from. + /// * `file_path` - The path to the file to write the data to. + /// * `description` - Description of the operation for the progress bar. /// /// # Errors /// - /// Returns an error if there is a problem when creating or writing to a file. - async fn download_and_write_file_with_progress( - data: &[u8], + /// Returns an error if the request fails, if the response status is not successful, or if there's an issue while writing the file. + async fn download_file_with_progress( + client: &reqwest::Client, + url: &str, file_path: &Path, - content_length: u64, description: &str, ) -> Result<(), Box<dyn std::error::Error>> { + + let head_response = client.head(url).send().await?; + let content_length: u64 = head_response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) + .unwrap_or(0); let pb = if content_length > 0 { let pb = ProgressBar::new(content_length); - pb.set_style(ProgressStyle::default_bar() - .template(&format!("{{spinner:.green}} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{bytes}}/{{total_bytes}} ({} {{eta}})", description))? - .progress_chars("#>-")); + pb.set_style( + ProgressStyle::default_bar() + .template(&format!( + "{{spinner:.green}} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{bytes}}/{{total_bytes}} ({} {{eta}})", + description + ))? + .progress_chars("#>-"), + ); pb } else { let pb = ProgressBar::new_spinner(); @@ -206,117 +209,98 @@ impl I2PPackage { ))?); pb }; + let response = client.get(url).send().await?; + if !response.status().is_success() { + return Err(format!("HTTP Error: {}", response.status()).into()); + } - let mut file = tokio::fs::File::create(&file_path).await?; - - let chunk_size = 8192usize; - let mut pos = 0; + let mut stream = response.bytes_stream(); + let mut file = File::create(&file_path).await?; + let mut downloaded: u64 = 0; - while pos < data.len() { - let end = std::cmp::min(pos + chunk_size, data.len()); - let chunk = &data[pos..end]; - file.write_all(chunk).await?; - pb.inc(chunk.len() as u64); - pos = end; + while let Some(chunk) = stream.try_next().await? { + file.write_all(&chunk).await?; + let chunk_len = chunk.len() as u64; + downloaded += chunk_len; + pb.set_position(downloaded); } pb.finish_with_message(format!("{} download finished", description)); Ok(()) } - /// Fetches a specific package identified by `index` (likely the package name). - /// Assumes `fetch_index` has been called and `self.index_packages` is populated. - pub fn fetch_package_info( - &self, - package_name: &str, - ) -> Result<&Package, Box<dyn std::error::Error>> { - let packages = self - .index_packages - .as_ref() - .ok_or("Index not loaded. Call fetch_index first.")?; - let pkg_info = packages - .get(package_name) - .ok_or(format!("Package '{}' not found in index.", package_name))?; - Ok(pkg_info) - } - /// Fetches a specific package identified by `package_name`. /// Assumes `fetch_index` has been called and `self.index_packages` is populated to get the URL. - /// Downloads the package file (.mesk) to the cache directory. + /// Downloads the package file (.mesk) to the cache directory with a progress bar, + /// extracts it, and installs the package. /// /// # Errors /// /// Returns an error if the index is not loaded, the package is not found in the index, - /// the package URL is invalid, the request fails, or if there's an issue writing the file. - /// Why didn't I just abstract the download functionality into a separate function initially? - /// Yes, I'm scared to work with fetch_index, even I don't often write such shit code. + /// the package URL is invalid, the request fails, extraction fails, or installation fails. pub async fn fetch_package( &self, package_name: &str, ) -> Result<bool, Box<dyn std::error::Error>> { let package_info = self.fetch_package_info(package_name)?; - let url = url::Url::parse(&package_info.url)?; - let host = url.host_str().ok_or("No host in package URL")?; - let request_path = url.path(); + let url = &package_info.url; - let session_options = SessionOptions::default(); - let mut session = Session::new(session_options).await?; - let mut stream = session.connect(host).await?; + let client = self.create_proxy_client()?; - let request = format!( - "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n", - request_path, host - ); - stream.write_all(request.as_bytes()).await?; - - let mut reader = BufReader::new(stream); - let mut response_buffer = Vec::new(); - reader.read_to_end(&mut response_buffer).await?; - - let headers_end = response_buffer - .windows(4) - .position(|window| window == b"\r\n\r\n") - .ok_or("Invalid response: no headers end")?; - - let headers_str = std::str::from_utf8(&response_buffer[..headers_end])?; - if !headers_str.starts_with("HTTP/1.1 200") && !headers_str.starts_with("HTTP/1.0 200") { - return Err(format!( - "HTTP Error: {}", - headers_str.lines().next().unwrap_or("Unknown") - ) - .into()); - } - - let content_length = headers_str - .lines() - .find(|line| line.to_lowercase().starts_with("content-length:")) - .and_then(|line| line.split_at(15).1.trim().parse::<u64>().ok()) - .unwrap_or(0); - - let body_start = headers_end + 4; - let body_bytes = &response_buffer[body_start..]; - - let file_name = Path::new(request_path) + let file_name = Path::new(url) .file_name() - .ok_or("Could not determine filename from URL path")? + .ok_or("Could not determine filename from URL")? .to_str() .ok_or("Filename is not valid UTF-8")?; let cache_dir = &self.config.paths.cache_dir; let file_path = Path::new(cache_dir).join(file_name); - Self::download_and_write_file_with_progress( - body_bytes, - &file_path, - content_length, - file_name, - ) - .await?; + tokio::fs::create_dir_all(&cache_dir).await?; + + Self::download_file_with_progress(&client, url, &file_path, file_name).await?; log::info!( "Package '{}' downloaded successfully to {:?}", package_name, file_path ); - Ok(true) + + // Extract the package archive + log::info!("Extracting package archive for '{}'...", package_name); + Package::extract_archive(&file_path.to_string_lossy()) + .map_err(|e| format!("Failed to extract package archive: {}", e))?; + + log::info!("Package archive extracted successfully."); + + // Install the package + log::info!("Installing package '{}'...", package_name); + let mut package = package_info.clone(); + + match package.install() { + Ok(_) => { + log::info!("Package '{}' installed successfully.", package_name); + Ok(true) + } + Err(e) => { + log::error!("Failed to install package '{}': {}", package_name, e); + Err(format!("Installation failed: {}", e).into()) + } + } + } + + /// Fetches a specific package identified by `index` (likely the package name). + /// Assumes `fetch_index` has been called and `self.index_packages` is populated. + pub fn fetch_package_info( + &self, + package_name: &str, + ) -> Result<&Package, Box<dyn std::error::Error>> { + let packages = self + .index_packages + .as_ref() + .ok_or("Index not loaded. Call fetch_index first.")?; + let pkg_info = packages + .get(package_name) + .ok_or(format!("Package '{}' not found in index.", package_name))?; + Ok(pkg_info) } } |
