diff --git a/Cargo.lock b/Cargo.lock index 05d751cb1..ae6994617 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1508,6 +1508,7 @@ dependencies = [ "pacquet-store-dir", "pipe-trait", "pretty_assertions", + "rayon", "reqwest", "serde", "serde_json", diff --git a/crates/package-manager/src/create_virtual_dir_by_snapshot.rs b/crates/package-manager/src/create_virtual_dir_by_snapshot.rs index d61077645..5313d9b22 100644 --- a/crates/package-manager/src/create_virtual_dir_by_snapshot.rs +++ b/crates/package-manager/src/create_virtual_dir_by_snapshot.rs @@ -57,17 +57,29 @@ impl<'a> CreateVirtualDirBySnapshot<'a> { } })?; - // 1. Install the files from `cas_paths` - let save_path = - virtual_node_modules_dir.join(dependency_path.package_specifier.name.to_string()); - create_cas_files(import_method, &save_path, cas_paths) - .map_err(CreateVirtualDirError::CreateCasFiles)?; + let mut result1: Result<(), CreateVirtualDirError> = Ok(()); + let mut result2: Result<(), CreateVirtualDirError> = Ok(()); + rayon::scope(|scope| { + // 1. Install the files from `cas_paths` + scope.spawn(|_| { + let save_path = virtual_node_modules_dir + .join(dependency_path.package_specifier.name.to_string()); + result1 = create_cas_files(import_method, &save_path, cas_paths) + .map_err(CreateVirtualDirError::CreateCasFiles); + }); - // 2. Create the symlink layout - if let Some(dependencies) = &package_snapshot.dependencies { - create_symlink_layout(dependencies, virtual_store_dir, &virtual_node_modules_dir) - } - - Ok(()) + // 2. Create the symlink layout + scope.spawn(|_| { + if let Some(dependencies) = &package_snapshot.dependencies { + create_symlink_layout( + dependencies, + virtual_store_dir, + &virtual_node_modules_dir, + ); + result2 = Ok(()); + } + }) + }); + result1.and(result2) } } diff --git a/crates/package-manager/src/create_virtual_store.rs b/crates/package-manager/src/create_virtual_store.rs index f962c026d..fed9a60c6 100644 --- a/crates/package-manager/src/create_virtual_store.rs +++ b/crates/package-manager/src/create_virtual_store.rs @@ -2,7 +2,6 @@ use crate::InstallPackageBySnapshot; use futures_util::future; use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot}; use pacquet_npmrc::Npmrc; -use pacquet_tarball::Cache; use pipe_trait::Pipe; use reqwest::Client; use std::collections::HashMap; @@ -10,7 +9,6 @@ use std::collections::HashMap; /// This subroutine generates filesystem layout for the virtual store at `node_modules/.pacquet`. #[must_use] pub struct CreateVirtualStore<'a> { - pub tarball_cache: &'a Cache, pub http_client: &'a Client, pub config: &'static Npmrc, pub packages: Option<&'a HashMap>, @@ -20,8 +18,7 @@ pub struct CreateVirtualStore<'a> { impl<'a> CreateVirtualStore<'a> { /// Execute the subroutine. pub async fn run(self) { - let CreateVirtualStore { tarball_cache, http_client, config, packages, project_snapshot } = - self; + let CreateVirtualStore { http_client, config, packages, project_snapshot } = self; let packages = packages.unwrap_or_else(|| { dbg!(project_snapshot); @@ -31,16 +28,10 @@ impl<'a> CreateVirtualStore<'a> { packages .iter() .map(|(dependency_path, package_snapshot)| async move { - InstallPackageBySnapshot { - tarball_cache, - http_client, - config, - dependency_path, - package_snapshot, - } - .run() - .await - .unwrap(); // TODO: properly propagate this error + InstallPackageBySnapshot { http_client, config, dependency_path, package_snapshot } + .run() + .await + .unwrap(); // TODO: properly propagate this error }) .pipe(future::join_all) .await; diff --git a/crates/package-manager/src/install.rs b/crates/package-manager/src/install.rs index f5a334f4c..6f8c11e70 100644 --- a/crates/package-manager/src/install.rs +++ b/crates/package-manager/src/install.rs @@ -58,7 +58,6 @@ where assert_eq!(lockfile_version.major, 6); // compatibility check already happens at serde, but this still helps preventing programmer mistakes. InstallFrozenLockfile { - tarball_cache, http_client, config, project_snapshot, diff --git a/crates/package-manager/src/install_frozen_lockfile.rs b/crates/package-manager/src/install_frozen_lockfile.rs index 7a2a6774d..124615a26 100644 --- a/crates/package-manager/src/install_frozen_lockfile.rs +++ b/crates/package-manager/src/install_frozen_lockfile.rs @@ -2,7 +2,6 @@ use crate::{CreateVirtualStore, SymlinkDirectDependencies}; use pacquet_lockfile::{DependencyPath, PackageSnapshot, RootProjectSnapshot}; use pacquet_npmrc::Npmrc; use pacquet_package_manifest::DependencyGroup; -use pacquet_tarball::Cache; use reqwest::Client; use std::collections::HashMap; @@ -20,7 +19,6 @@ pub struct InstallFrozenLockfile<'a, DependencyGroupList> where DependencyGroupList: IntoIterator, { - pub tarball_cache: &'a Cache, pub http_client: &'a Client, pub config: &'static Npmrc, pub project_snapshot: &'a RootProjectSnapshot, @@ -35,7 +33,6 @@ where /// Execute the subroutine. pub async fn run(self) { let InstallFrozenLockfile { - tarball_cache, http_client, config, project_snapshot, @@ -47,9 +44,7 @@ where assert!(config.prefer_frozen_lockfile, "Non frozen lockfile is not yet supported"); - CreateVirtualStore { tarball_cache, http_client, config, packages, project_snapshot } - .run() - .await; + CreateVirtualStore { http_client, config, packages, project_snapshot }.run().await; SymlinkDirectDependencies { config, project_snapshot, dependency_groups }.run(); } diff --git a/crates/package-manager/src/install_package_by_snapshot.rs b/crates/package-manager/src/install_package_by_snapshot.rs index a8c27406b..d1b44de61 100644 --- a/crates/package-manager/src/install_package_by_snapshot.rs +++ b/crates/package-manager/src/install_package_by_snapshot.rs @@ -3,7 +3,7 @@ use derive_more::{Display, Error}; use miette::Diagnostic; use pacquet_lockfile::{DependencyPath, LockfileResolution, PackageSnapshot, PkgNameVerPeer}; use pacquet_npmrc::Npmrc; -use pacquet_tarball::{Cache, DownloadTarballToStore, TarballError}; +use pacquet_tarball::{DownloadTarballToStore, TarballError}; use pipe_trait::Pipe; use reqwest::Client; use std::borrow::Cow; @@ -12,7 +12,6 @@ use std::borrow::Cow; /// then creates the symlink layout for the package. #[must_use] pub struct InstallPackageBySnapshot<'a> { - pub tarball_cache: &'a Cache, pub http_client: &'a Client, pub config: &'static Npmrc, pub dependency_path: &'a DependencyPath, @@ -29,13 +28,8 @@ pub enum InstallPackageBySnapshotError { impl<'a> InstallPackageBySnapshot<'a> { /// Execute the subroutine. pub async fn run(self) -> Result<(), InstallPackageBySnapshotError> { - let InstallPackageBySnapshot { - tarball_cache, - http_client, - config, - dependency_path, - package_snapshot, - } = self; + let InstallPackageBySnapshot { http_client, config, dependency_path, package_snapshot } = + self; let PackageSnapshot { resolution, .. } = package_snapshot; let DependencyPath { custom_registry, package_specifier } = dependency_path; @@ -64,14 +58,13 @@ impl<'a> InstallPackageBySnapshot<'a> { // TODO: skip when already exists in store? let cas_paths = DownloadTarballToStore { - tarball_cache, http_client, store_dir: &config.store_dir, package_integrity: integrity, package_unpacked_size: None, package_url: &tarball_url, } - .run() + .without_cache() .await .map_err(InstallPackageBySnapshotError::DownloadTarball)?; diff --git a/crates/package-manager/src/install_package_from_registry.rs b/crates/package-manager/src/install_package_from_registry.rs index 6a0f31df8..39d983179 100644 --- a/crates/package-manager/src/install_package_from_registry.rs +++ b/crates/package-manager/src/install_package_from_registry.rs @@ -75,7 +75,6 @@ impl<'a> InstallPackageFromRegistry<'a> { // TODO: skip when it already exists in store? let cas_paths = DownloadTarballToStore { - tarball_cache, http_client, store_dir: &config.store_dir, package_integrity: package_version @@ -86,7 +85,7 @@ impl<'a> InstallPackageFromRegistry<'a> { package_unpacked_size: package_version.dist.unpacked_size, package_url: package_version.as_tarball_url(), } - .run() + .with_cache(tarball_cache) .await .map_err(InstallPackageFromRegistryError::DownloadTarballToStore)?; diff --git a/crates/tarball/Cargo.toml b/crates/tarball/Cargo.toml index 41e675cee..d5619e1ae 100644 --- a/crates/tarball/Cargo.toml +++ b/crates/tarball/Cargo.toml @@ -20,6 +20,7 @@ dashmap = { workspace = true } derive_more = { workspace = true } miette = { workspace = true } pipe-trait = { workspace = true } +rayon = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/tarball/src/lib.rs b/crates/tarball/src/lib.rs index cb285b929..73b19635a 100644 --- a/crates/tarball/src/lib.rs +++ b/crates/tarball/src/lib.rs @@ -16,8 +16,9 @@ use pacquet_store_dir::{ PackageFileInfo, PackageFilesIndex, StoreDir, WriteCasFileError, WriteTarballIndexFileError, }; use pipe_trait::Pipe; +use rayon::prelude::*; use reqwest::Client; -use ssri::{Integrity, IntegrityChecker}; +use ssri::Integrity; use tar::Archive; use tokio::sync::{Notify, RwLock}; use tracing::instrument; @@ -104,8 +105,8 @@ fn decompress_gzip(gz_data: &[u8], unpacked_size: Option) -> Result Result { - integrity.pipe(IntegrityChecker::new).chain(data).result() +fn verify_checksum(data: &[u8], integrity: &Integrity) -> Result { + integrity.check(data) } /// This subroutine downloads and extracts a tarball to the store directory. @@ -113,7 +114,6 @@ fn verify_checksum(data: &[u8], integrity: Integrity) -> Result { - pub tarball_cache: &'a Cache, pub http_client: &'a Client, pub store_dir: &'static StoreDir, pub package_integrity: &'a Integrity, @@ -122,9 +122,12 @@ pub struct DownloadTarballToStore<'a> { } impl<'a> DownloadTarballToStore<'a> { - /// Execute the subroutine. - pub async fn run(self) -> Result>, TarballError> { - let &DownloadTarballToStore { tarball_cache, package_url, .. } = &self; + /// Execute the subroutine with cache. + pub async fn with_cache( + self, + tarball_cache: &'a Cache, + ) -> Result>, TarballError> { + let &DownloadTarballToStore { package_url, .. } = &self; // QUESTION: I see no copying from existing store_dir, is there such mechanism? // TODO: If it's not implemented yet, implement it @@ -153,7 +156,7 @@ impl<'a> DownloadTarballToStore<'a> { if tarball_cache.insert(package_url.to_string(), Arc::clone(&cache_lock)).is_some() { tracing::warn!(target: "pacquet::download", ?package_url, "Race condition detected when writing to cache"); } - let cas_paths = self.without_cache().await?; + let cas_paths = self.without_cache().await?.pipe(Arc::new); let mut cache_write = cache_lock.write().await; *cache_write = CacheValue::Available(Arc::clone(&cas_paths)); notify.notify_waiters(); @@ -161,7 +164,8 @@ impl<'a> DownloadTarballToStore<'a> { } } - async fn without_cache(&self) -> Result>, TarballError> { + /// Execute the subroutine without a cache. + pub async fn without_cache(&self) -> Result, TarballError> { let &DownloadTarballToStore { http_client, store_dir, @@ -183,101 +187,103 @@ impl<'a> DownloadTarballToStore<'a> { .map_err(network_error)? .bytes() .await - .map_err(network_error)?; + .map_err(network_error)? + .pipe(Arc::new); tracing::info!(target: "pacquet::download", ?package_url, "Download completed"); // TODO: Cloning here is less than desirable, there are 2 possible solutions for this problem: // 1. Use an Arc and convert this line to Arc::clone. // 2. Replace ssri with base64 and serde magic (which supports Copy). - let package_integrity = package_integrity.clone(); + let package_integrity = package_integrity.clone().pipe(Arc::new); - #[derive(Debug, From)] - enum TaskError { - Checksum(ssri::Error), - Other(TarballError), - } - let cas_paths = tokio::task::spawn(async move { - verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?; + let verify_checksum_task = { + let response = Arc::clone(&response); + let package_integrity = Arc::clone(&package_integrity); + tokio::task::spawn(async move { verify_checksum(&response, &package_integrity) }) + }; + let extract_tarball_task = tokio::task::spawn(async move { // TODO: move tarball extraction to its own function // TODO: test it // TODO: test the duplication of entries - let mut archive = decompress_gzip(&response, package_unpacked_size) - .map_err(TaskError::Other)? + let mut archive = decompress_gzip(&response, package_unpacked_size)? .pipe(Cursor::new) .pipe(Archive::new); - let entries = archive - .entries() - .map_err(TarballError::ReadTarballEntries) - .map_err(TaskError::Other)? - .filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir()); - - let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint(); - let mut cas_paths = HashMap::::with_capacity(capacity); - let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) }; - - for entry in entries { - let mut entry = entry.unwrap(); - - let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error - let file_is_executable = file_mode::is_all_exec(file_mode); - - // Read the contents of the entry - let mut buffer = Vec::with_capacity(entry.size() as usize); - entry.read_to_end(&mut buffer).unwrap(); - - let entry_path = entry.path().unwrap(); - let cleaned_entry_path = - entry_path.components().skip(1).collect::().into_os_string(); - let (file_path, file_hash) = store_dir - .write_cas_file(&buffer, file_is_executable) - .map_err(TarballError::WriteCasFile)?; - - let tarball_index_key = cleaned_entry_path - .to_str() - .expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information - .to_string(); // TODO: convert cleaned_entry_path to String too. - - if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) { - tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); - } - - let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis()); - let file_size = entry.header().size().ok(); - let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash)); - let file_attrs = PackageFileInfo { - checked_at, - integrity: file_integrity, - mode: file_mode, - size: file_size, - }; - - if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) { - tracing::warn!(?previous, "Duplication detected. Old entry has been ejected"); - } + struct FileInfo { + cleaned_entry_path: OsString, + file_mode: u32, + file_size: Option, + buffer: Vec, } + let (cas_paths, index_entries) = archive + .entries() + .map_err(TarballError::ReadTarballEntries)? + .filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir()) + .map(|entry| entry.expect("get entry")) + .map(|mut entry| { + let cleaned_entry_path = entry + .path() + .expect("get path") // TODO: properly propagate this error + .components() + .skip(1) + .collect::() + .into_os_string(); + let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error + let file_size = entry.header().size().ok(); + let mut buffer = Vec::with_capacity(entry.size() as usize); + entry.read_to_end(&mut buffer).expect("read content"); // TODO: properly propagate this error + FileInfo { cleaned_entry_path, file_mode, file_size, buffer } + }) + .collect::>() + .into_par_iter() + .map(|file_info| -> Result<_, TarballError> { + let FileInfo { cleaned_entry_path, file_mode, file_size, buffer } = file_info; + let file_is_executable = file_mode::is_all_exec(file_mode); + + let (file_path, file_hash) = store_dir + .write_cas_file(&buffer, file_is_executable) + .map_err(TarballError::WriteCasFile)?; + + let index_key = cleaned_entry_path + .to_str() + .expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information + .to_string(); // TODO: convert cleaned_entry_path to String too. + + let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis()); + let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash)); + let index_value = PackageFileInfo { + checked_at, + integrity: file_integrity, + mode: file_mode, + size: file_size, + }; + + Ok(((cleaned_entry_path, file_path), (index_key, index_value))) + }) + .collect::, HashMap<_, _>), TarballError>>()?; + + let pkg_files_idx = PackageFilesIndex { files: index_entries }; store_dir .write_index_file(&package_integrity, &pkg_files_idx) .map_err(TarballError::WriteTarballIndexFile)?; - Ok(cas_paths) - }) - .await - .expect("no join error") - .map_err(|error| match error { - TaskError::Checksum(error) => { - TarballError::Checksum(VerifyChecksumError { url: package_url.to_string(), error }) - } - TaskError::Other(error) => error, - })? - .pipe(Arc::new); + Ok::<_, TarballError>(cas_paths) + }); + + verify_checksum_task.await.expect("no join error").map_err(|error| { + TarballError::Checksum(VerifyChecksumError { url: package_url.to_string(), error }) + })?; tracing::info!(target: "pacquet::download", ?package_url, "Checksum verified"); + let cas_paths = extract_tarball_task.await.expect("no join error")?; + + tracing::info!(target: "pacquet::download", ?package_url, "Tarball extracted"); + Ok(cas_paths) } } @@ -316,14 +322,13 @@ mod tests { async fn packages_under_orgs_should_work() { let (store_dir, store_path) = tempdir_with_leaked_path(); let cas_files = DownloadTarballToStore { - tarball_cache: &Default::default(), http_client: &Default::default(), store_dir: store_path, package_integrity: &integrity("sha512-dj7vjIn1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="), package_unpacked_size: Some(16697), package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz" } - .run() + .without_cache() .await .unwrap(); @@ -356,14 +361,13 @@ mod tests { async fn should_throw_error_on_checksum_mismatch() { let (store_dir, store_path) = tempdir_with_leaked_path(); DownloadTarballToStore { - tarball_cache: &Default::default(), http_client: &Default::default(), store_dir: store_path, package_integrity: &integrity("sha512-aaaan1Ar8sVXj2yAXiMNCJDmS9MQ9XMlIecX2dIzzhjSHCyKo4DdXjXMs7wKW2kj6yvVRSpuQjOZ3YLrh56w=="), package_unpacked_size: Some(16697), package_url: "https://registry.npmjs.org/@fastify/error/-/error-3.3.0.tgz", } - .run() + .without_cache() .await .expect_err("checksum mismatch"); diff --git a/tasks/micro-benchmark/src/main.rs b/tasks/micro-benchmark/src/main.rs index 9246a1210..af3eef965 100644 --- a/tasks/micro-benchmark/src/main.rs +++ b/tasks/micro-benchmark/src/main.rs @@ -37,14 +37,13 @@ fn bench_tarball(c: &mut Criterion, server: &mut ServerGuard, fixtures_folder: & let http_client = Client::new(); let cas_map = DownloadTarballToStore { - tarball_cache: &Default::default(), http_client: &http_client, store_dir, package_integrity: &package_integrity, package_unpacked_size: Some(16697), package_url: url, } - .run() + .without_cache() .await .unwrap(); cas_map.len()