Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions crates/bin/docs_rs_admin/src/repackage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use anyhow::Result;
use docs_rs_storage::{AsyncStorage, FileEntry, rustdoc_archive_path, source_archive_path};
use docs_rs_types::{CompressionAlgorithm, KrateName, ReleaseId, Version};
use docs_rs_utils::spawn_blocking;
use futures_util::StreamExt as _;
use futures_util::TryStreamExt as _;
use sqlx::Acquire as _;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{fs, io};
use tracing::{info, instrument};
use tracing::{debug, info, instrument};

/// repackage old rustdoc / source content.
///
Expand Down Expand Up @@ -94,10 +96,9 @@ pub async fn repackage(

transaction.commit().await?;

// TODO: validate the zip file?

// only delete the old files when we were able to update database with `archive_storage=true`,
// and were able to validate the zip file.
info!("removing legacy files from storage...");
storage.delete_prefix(&rustdoc_prefix).await?;
storage.delete_prefix(&sources_prefix).await?;

Expand All @@ -113,31 +114,47 @@ async fn repackage_path(
prefix: &str,
target_archive: &str,
) -> Result<Option<(Vec<FileEntry>, CompressionAlgorithm)>> {
let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?;

let mut files = 0;
let mut list = storage.list_prefix(prefix).await;
while let Some(entry) = list.next().await {
let entry = entry?;
let mut stream = storage.get_stream(&entry).await?;

let target_path = tempdir.path().join(stream.path.trim_start_matches(prefix));

fs::create_dir_all(&target_path.parent().unwrap()).await?;
{
let mut output_file = fs::File::create(&target_path).await?;
io::copy(&mut stream.content, &mut output_file).await?;
output_file.sync_all().await?;
}
const DOWNLOAD_CONCURRENCY: usize = 8;

files += 1;
}
info!("repackage path");
let tempdir = spawn_blocking(|| tempfile::tempdir().map_err(Into::into)).await?;
let tempdir_path = tempdir.path().to_path_buf();

let files = Arc::new(AtomicUsize::new(0));
storage
.list_prefix(prefix)
.await
.try_for_each_concurrent(DOWNLOAD_CONCURRENCY, {
|entry| {
let tempdir_path = tempdir_path.clone();
let files = files.clone();
async move {
debug!(path=%entry, "downloading file");
let mut stream = storage.get_stream(&entry).await?;
let target_path = tempdir_path.join(stream.path.trim_start_matches(prefix));

if let Some(parent) = target_path.parent() {
fs::create_dir_all(parent).await?;
}
let mut output_file = fs::File::create(&target_path).await?;
io::copy(&mut stream.content, &mut output_file).await?;
output_file.sync_all().await?;

files.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
})
.await?;
let files = files.load(Ordering::Relaxed);

if files > 0 {
info!("creating zip file...");
let (file_list, alg) = storage
.store_all_in_archive(target_archive, &tempdir.path())
.await?;

info!("removing temp-dir...");
fs::remove_dir_all(&tempdir).await?;

Ok(Some((file_list, alg)))
Expand All @@ -152,6 +169,7 @@ mod tests {
use crate::testing::TestEnvironment;
use docs_rs_storage::{PathNotFoundError, StorageKind, source_archive_path};
use docs_rs_types::testing::{KRATE, V1};
use futures_util::StreamExt as _;
use pretty_assertions::assert_eq;
use test_case::test_case;

Expand Down
Loading