diff --git a/crates/bin/docs_rs_admin/src/repackage.rs b/crates/bin/docs_rs_admin/src/repackage.rs index 37b56abe8..693513ff5 100644 --- a/crates/bin/docs_rs_admin/src/repackage.rs +++ b/crates/bin/docs_rs_admin/src/repackage.rs @@ -2,11 +2,13 @@ use anyhow::Result; use docs_rs_storage::{AsyncStorage, FileEntry, rustdoc_archive_path, source_archive_path}; use docs_rs_types::{CompressionAlgorithm, KrateName, ReleaseId, Version}; use docs_rs_utils::spawn_blocking; -use futures_util::StreamExt as _; +use futures_util::TryStreamExt as _; use sqlx::Acquire as _; use std::collections::HashSet; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::{fs, io}; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; /// repackage old rustdoc / source content. /// @@ -94,10 +96,9 @@ pub async fn repackage( transaction.commit().await?; - // TODO: validate the zip file? - // only delete the old files when we were able to update database with `archive_storage=true`, // and were able to validate the zip file. + info!("removing legacy files from storage..."); storage.delete_prefix(&rustdoc_prefix).await?; storage.delete_prefix(&sources_prefix).await?; @@ -113,31 +114,47 @@ async fn repackage_path( prefix: &str, target_archive: &str, ) -> Result, CompressionAlgorithm)>> { - let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?; - - let mut files = 0; - let mut list = storage.list_prefix(prefix).await; - while let Some(entry) = list.next().await { - let entry = entry?; - let mut stream = storage.get_stream(&entry).await?; - - let target_path = tempdir.path().join(stream.path.trim_start_matches(prefix)); - - fs::create_dir_all(&target_path.parent().unwrap()).await?; - { - let mut output_file = fs::File::create(&target_path).await?; - io::copy(&mut stream.content, &mut output_file).await?; - output_file.sync_all().await?; - } + const DOWNLOAD_CONCURRENCY: usize = 8; - files += 1; - } + info!("repackage path"); + let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?; + let tempdir_path = tempdir.path().to_path_buf(); + + let files = Arc::new(AtomicUsize::new(0)); + storage + .list_prefix(prefix) + .await + .try_for_each_concurrent(DOWNLOAD_CONCURRENCY, { + |entry| { + let tempdir_path = tempdir_path.clone(); + let files = files.clone(); + async move { + debug!(path=%entry, "downloading file"); + let mut stream = storage.get_stream(&entry).await?; + let target_path = tempdir_path.join(stream.path.trim_start_matches(prefix)); + + if let Some(parent) = target_path.parent() { + fs::create_dir_all(parent).await?; + } + let mut output_file = fs::File::create(&target_path).await?; + io::copy(&mut stream.content, &mut output_file).await?; + output_file.sync_all().await?; + + files.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + } + }) + .await?; + let files = files.load(Ordering::Relaxed); if files > 0 { + info!("creating zip file..."); let (file_list, alg) = storage .store_all_in_archive(target_archive, &tempdir.path()) .await?; + info!("removing temp-dir..."); fs::remove_dir_all(&tempdir).await?; Ok(Some((file_list, alg))) @@ -152,6 +169,7 @@ mod tests { use crate::testing::TestEnvironment; use docs_rs_storage::{PathNotFoundError, StorageKind, source_archive_path}; use docs_rs_types::testing::{KRATE, V1}; + use futures_util::StreamExt as _; use pretty_assertions::assert_eq; use test_case::test_case;