diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 56626c37f..a55042029 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -1,16 +1,22 @@ -use crate::types::FileRange; +use crate::{Config, blob::StreamingBlob, types::FileRange}; use anyhow::{Context as _, Result, anyhow, bail}; -use docs_rs_types::CompressionAlgorithm; +use dashmap::DashMap; +use docs_rs_types::{BuildId, CompressionAlgorithm}; use docs_rs_utils::spawn_blocking; -use sqlx::{Acquire as _, ConnectOptions as _, QueryBuilder, Row as _, Sqlite}; -use std::path::Path; +use sqlx::{ConnectOptions as _, Connection as _, QueryBuilder, Row as _, Sqlite}; +use std::{ + future::Future, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, +}; use tokio::{ fs, - io::{AsyncRead, AsyncSeek}, - sync::mpsc, + io::{self, AsyncRead, AsyncSeek, AsyncWriteExt as _}, + sync::{Mutex, mpsc}, }; use tokio_util::io::SyncIoBridge; -use tracing::instrument; +use tracing::{debug, instrument, warn}; pub(crate) const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; @@ -20,6 +26,175 @@ pub(crate) struct FileInfo { compression: CompressionAlgorithm, } +pub(crate) struct Cache { + local_archive_cache_path: PathBuf, + /// Locks to synchronize write-access to the locally cached archive index files. + locks: DashMap>>, +} + +pub(crate) trait Downloader { + fn fetch_archive_index<'a>( + &'a self, + remote_index_path: &'a str, + ) -> Pin> + Send + 'a>>; +} + +impl Cache { + pub(crate) fn new(config: &Config) -> Self { + Self { + local_archive_cache_path: config.local_archive_cache_path.clone(), + locks: DashMap::with_capacity(config.local_archive_cache_expected_count), + } + } + + fn local_index_path(&self, archive_path: &str, latest_build_id: Option) -> PathBuf { + self.local_archive_cache_path.join(format!( + "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", + latest_build_id.map(|id| id.0).unwrap_or(0) + )) + } + + fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { + let local_index_path = local_index_path.as_ref().to_path_buf(); + + self.locks + .entry(local_index_path) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .downgrade() + .clone() + } + + /// purge a single archive index file + pub(crate) async fn purge( + &self, + archive_path: &str, + latest_build_id: Option, + ) -> Result<()> { + let local_index_path = self.local_index_path(archive_path, latest_build_id); + let rwlock = self.local_index_cache_lock(&local_index_path); + let _write_guard = rwlock.lock().await; + + for ext in &["wal", "shm"] { + let to_delete = + local_index_path.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-{ext}")); + let _ = fs::remove_file(&to_delete).await; + } + + if fs::try_exists(&local_index_path).await? { + fs::remove_file(&local_index_path).await?; + } + + Ok(()) + } + + async fn find_inner( + &self, + archive_path: &str, + latest_build_id: Option, + path_in_archive: &str, + downloader: &impl Downloader, + ) -> Result> { + let local_index_path = self.local_index_path(archive_path, latest_build_id); + + // fast path: try to use whatever is there, no locking + match find_in_file(&local_index_path, path_in_archive).await { + Ok(res) => return Ok(res), + Err(err) => { + debug!(?err, "archive index lookup failed, will try repair."); + } + } + + let lock = self.local_index_cache_lock(&local_index_path); + let write_guard = lock.lock().await; + + // Double-check: maybe someone fixed it between our first failure and now. + if let Ok(res) = find_in_file(&local_index_path, path_in_archive).await { + return Ok(res); + } + + let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + + // We are the repairer: download fresh index into place. + self.download_archive_index(downloader, &local_index_path, &remote_index_path) + .await?; + + // Write lock is dropped here (end of scope), so others can proceed. + drop(write_guard); + + // Final attempt: if this still fails, bubble the error. + find_in_file(local_index_path, path_in_archive).await + } + + /// Find the file metadata needed to fetch a certain path inside a remote archive. + /// Will try to use a local cache of the index file, and otherwise download it + /// from storage. + #[instrument(skip(self, downloader))] + pub(crate) async fn find( + &self, + archive_path: &str, + latest_build_id: Option, + path_in_archive: &str, + downloader: &impl Downloader, + ) -> Result> { + for attempt in 0..2 { + match self + .find_inner(archive_path, latest_build_id, path_in_archive, downloader) + .await + { + Ok(file_info) => return Ok(file_info), + Err(err) if attempt == 0 => { + warn!( + ?err, + "error resolving archive index, purging local cache and retrying once" + ); + self.purge(archive_path, latest_build_id).await?; + } + Err(err) => return Err(err), + } + } + + unreachable!("find retry loop exited unexpectedly"); + } + + #[instrument(skip(self, downloader))] + pub(crate) async fn download_archive_index( + &self, + downloader: &impl Downloader, + local_index_path: &Path, + remote_index_path: &str, + ) -> Result<()> { + let parent = local_index_path + .parent() + .ok_or_else(|| anyhow!("index path without parent"))? + .to_path_buf(); + fs::create_dir_all(&parent).await?; + + // Create a unique temp file in the cache folder. + let (temp_file, mut temp_path) = spawn_blocking({ + let folder = self.local_archive_cache_path.clone(); + move || -> Result<_> { tempfile::NamedTempFile::new_in(&folder).map_err(Into::into) } + }) + .await? + .into_parts(); + + // Download into temp file. + let mut temp_file = fs::File::from_std(temp_file); + let mut stream = downloader + .fetch_archive_index(remote_index_path) + .await? + .content; + io::copy(&mut stream, &mut temp_file).await?; + temp_file.flush().await?; + temp_path.disable_cleanup(true); + + // Publish atomically. + // Will replace any existing file. + fs::rename(&temp_path, local_index_path).await?; + + Ok(()) + } +} + impl FileInfo { pub(crate) fn range(&self) -> FileRange { self.range.clone() @@ -34,7 +209,7 @@ impl FileInfo { /// Any existing DB at the given path will be deleted first. async fn sqlite_create>(path: P) -> Result { let path = path.as_ref(); - if path.exists() { + if fs::try_exists(&path).await? { fs::remove_file(path).await?; } @@ -55,7 +230,12 @@ async fn sqlite_open>(path: P) -> Result sqlx::sqlite::SqliteConnectOptions::new() .filename(path) .read_only(true) + .immutable(true) .pragma("synchronous", "off") // not needed for readonly db + .pragma("temp_store", "MEMORY") + .pragma("query_only", "ON") + .pragma("mmap_size", "536870912") // 512 MiB + .pragma("cache_size", "-4096") // 4 MiB .serialized(false) // same as OPEN_NOMUTEX .create_if_missing(false) .connect() @@ -67,13 +247,11 @@ async fn sqlite_open>(path: P) -> Result /// /// Will delete the destination file if it already exists. #[instrument(skip(zipfile))] -pub(crate) async fn create< +pub(crate) async fn create(zipfile: R, destination: P) -> Result +where R: AsyncRead + AsyncSeek + Unpin + Send + 'static, P: AsRef + std::fmt::Debug, ->( - zipfile: R, - destination: P, -) -> Result { +{ let mut conn = sqlite_create(destination).await?; let mut tx = conn.begin().await?; @@ -192,10 +370,13 @@ where } #[instrument] -pub(crate) async fn find_in_file + std::fmt::Debug>( +pub(crate) async fn find_in_file

( archive_index_path: P, search_for: &str, -) -> Result> { +) -> Result> +where + P: AsRef + std::fmt::Debug, +{ let mut conn = sqlite_open(archive_index_path).await?; find_in_sqlite_index(&mut conn, search_for).await @@ -204,11 +385,16 @@ pub(crate) async fn find_in_file + std::fmt::Debug>( #[cfg(test)] mod tests { use super::*; - use std::io::Write; + use crate::{Config, blob::StreamingBlob, types::StorageKind}; + use chrono::Utc; + use sqlx::error::DatabaseError as _; + use std::{collections::HashMap, io::Cursor, ops::Deref, pin::Pin, sync::Arc}; use zip::write::SimpleFileOptions; async fn create_test_archive(file_count: u32) -> Result { spawn_blocking(move || { + use std::io::Write as _; + let tf = tempfile::tempfile()?; let objectcontent: Vec = (0..255).collect(); @@ -227,6 +413,101 @@ mod tests { .map(fs::File::from_std) } + struct FakeDownloader { + indices: HashMap>, + download_count: std::sync::Mutex>, + delay: Option, + } + + impl FakeDownloader { + fn new() -> Self { + Self { + indices: HashMap::new(), + download_count: std::sync::Mutex::new(HashMap::new()), + delay: None, + } + } + + fn with_delay(delay: std::time::Duration) -> Self { + let mut downloader = Self::new(); + downloader.delay = Some(delay); + downloader + } + + fn download_count(&self, remote_index_path: &str) -> usize { + let download_count = self.download_count.lock().unwrap(); + *download_count.get(remote_index_path).unwrap_or(&0) + } + } + + impl Downloader for FakeDownloader { + fn fetch_archive_index<'a>( + &'a self, + remote_index_path: &'a str, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + if let Some(delay) = self.delay { + tokio::time::sleep(delay).await; + } + + let mut fetch_count = self.download_count.lock().unwrap(); + fetch_count + .entry(remote_index_path.to_string()) + .and_modify(|count| *count += 1) + .or_insert(1); + + let content = self + .indices + .get(remote_index_path) + .cloned() + .ok_or_else(|| anyhow!("missing index fixture for {remote_index_path}"))?; + + Ok(StreamingBlob { + path: remote_index_path.to_string(), + mime: mime::APPLICATION_OCTET_STREAM, + date_updated: Utc::now(), + etag: None, + compression: None, + content_length: content.len(), + content: Box::new(Cursor::new(content)), + }) + }) + } + } + + async fn create_index_bytes(file_count: u32) -> Result> { + let tf = create_test_archive(file_count).await?; + let tempfile = tempfile::NamedTempFile::new()?.into_temp_path(); + create(tf, &tempfile).await?; + fs::read(&tempfile).await.map_err(Into::into) + } + + struct TestEnv { + _cache_dir: tempfile::TempDir, + _config: Config, + cache: Cache, + } + + impl Deref for TestEnv { + type Target = Cache; + + fn deref(&self) -> &Self::Target { + &self.cache + } + } + + fn test_cache() -> Result { + let cache_dir = tempfile::tempdir()?; + let mut config = Config::test_config_with_kind(StorageKind::Memory)?; + config.local_archive_cache_path = cache_dir.path().to_path_buf(); + let cache = Cache::new(&config); + Ok(TestEnv { + _cache_dir: cache_dir, + _config: config, + cache, + }) + } + #[tokio::test] async fn index_create_save_load_sqlite() -> Result<()> { let tf = create_test_archive(1).await?; @@ -278,4 +559,244 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn outdated_local_archive_index_gets_redownloaded() -> Result<()> { + let cache = test_cache()?; + + const LATEST_BUILD_ID: Option = Some(BuildId(42)); + const ARCHIVE_NAME: &str = "test.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let cache_file = cache.local_index_path(ARCHIVE_NAME, LATEST_BUILD_ID); + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(2).await?); + + assert!(!fs::try_exists(&cache_file).await?); + assert!( + cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert!(fs::try_exists(&cache_file).await?); + assert_eq!(downloader.download_count(&remote_index_path), 1); + + // Simulate local cache corruption and ensure Cache::find repairs it. + fs::write(&cache_file, b"not-an-sqlite-index").await?; + assert!( + cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await? + .is_some() + ); + assert_eq!(downloader.download_count(&remote_index_path), 2); + + Ok(()) + } + + #[tokio::test] + async fn find_uses_local_cache_without_downloading() -> Result<()> { + let cache = test_cache()?; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let cache_file = cache.local_index_path(ARCHIVE_NAME, LATEST_BUILD_ID); + fs::create_dir_all(cache_file.parent().unwrap()).await?; + fs::write(&cache_file, create_index_bytes(1).await?).await?; + + let downloader = FakeDownloader::new(); + let result = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await?; + assert!(result.is_some()); + assert_eq!( + downloader.download_count(&format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}")), + 0 + ); + + Ok(()) + } + + #[tokio::test] + async fn find_downloads_when_local_cache_missing() -> Result<()> { + let cache = test_cache()?; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(1).await?); + + let result = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) + .await?; + assert!(result.is_some()); + assert_eq!(downloader.download_count(&remote_index_path), 1); + assert!(fs::try_exists(cache.local_index_path(ARCHIVE_NAME, LATEST_BUILD_ID)).await?); + + Ok(()) + } + + #[tokio::test] + async fn find_returns_none_for_missing_entry() -> Result<()> { + let cache = test_cache()?; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(1).await?); + + let result = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, "does-not-exist", &downloader) + .await?; + assert!(result.is_none()); + assert_eq!(downloader.download_count(&remote_index_path), 1); + + Ok(()) + } + + #[tokio::test] + async fn find_retries_once_then_errors() -> Result<()> { + let cache = test_cache()?; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.clone(), b"not-a-sqlite-index".to_vec()); + + let err = cache + .find(ARCHIVE_NAME, LATEST_BUILD_ID, "testfile0", &downloader) + .await + .unwrap_err(); + + assert_eq!( + err.downcast::() + .unwrap() + .into_database_error() + .unwrap() + .as_error() + .downcast_ref::() + .unwrap() + .message(), + "file is not a database" + ); + assert_eq!(downloader.download_count(&remote_index_path), 2); + + Ok(()) + } + + #[tokio::test] + async fn purge_removes_index_wal_and_shm() -> Result<()> { + let cache = test_cache()?; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + + let local_index = cache.local_index_path(ARCHIVE_NAME, LATEST_BUILD_ID); + let wal = local_index.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-wal")); + let shm = local_index.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-shm")); + + fs::create_dir_all(local_index.parent().unwrap()).await?; + fs::write(&local_index, b"index").await?; + fs::write(&wal, b"wal").await?; + fs::write(&shm, b"shm").await?; + + cache.purge(ARCHIVE_NAME, LATEST_BUILD_ID).await?; + + assert!(!fs::try_exists(&local_index).await?); + assert!(!fs::try_exists(&wal).await?); + assert!(!fs::try_exists(&shm).await?); + + Ok(()) + } + + #[tokio::test] + async fn purge_is_idempotent_when_files_missing() -> Result<()> { + let cache = test_cache()?; + cache.purge("missing.zip", Some(BuildId(7))).await?; + cache.purge("missing.zip", Some(BuildId(7))).await?; + + Ok(()) + } + + #[tokio::test] + async fn download_archive_index_overwrites_existing_file() -> Result<()> { + let cache = test_cache()?; + let local_index = cache.local_index_path("test.zip", Some(BuildId(7))); + fs::create_dir_all(local_index.parent().unwrap()).await?; + fs::write(&local_index, b"old").await?; + + let remote_index_path = "test.zip.index"; + let mut downloader = FakeDownloader::new(); + downloader + .indices + .insert(remote_index_path.to_string(), create_index_bytes(1).await?); + + cache + .download_archive_index(&downloader, &local_index, remote_index_path) + .await?; + + let written = fs::read(&local_index).await?; + assert!(!written.is_empty()); + assert_ne!(written, b"old"); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn concurrent_find_triggers_single_download_per_index() -> Result<()> { + let cache = test_cache()?; + let cache = Arc::new(cache.cache); + const N: usize = 16; + const LATEST_BUILD_ID: Option = Some(BuildId(7)); + const ARCHIVE_NAME: &str = "test.zip"; + const FILE_IN_ARCHIVE: &str = "testfile0"; + + let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + let mut downloader = FakeDownloader::with_delay(std::time::Duration::from_millis(50)); + downloader + .indices + .insert(remote_index_path.clone(), create_index_bytes(1).await?); + let downloader = Arc::new(downloader); + let barrier = Arc::new(tokio::sync::Barrier::new(N)); + + let mut tasks = Vec::with_capacity(N); + for _ in 0..N { + let cache = cache.clone(); + let downloader = downloader.clone(); + let barrier = barrier.clone(); + tasks.push(tokio::spawn(async move { + barrier.wait().await; + cache + .find( + ARCHIVE_NAME, + LATEST_BUILD_ID, + FILE_IN_ARCHIVE, + downloader.as_ref(), + ) + .await + })); + } + + for task in tasks { + let result = task.await??; + assert!(result.is_some()); + } + assert_eq!(downloader.download_count(&remote_index_path), 1); + + Ok(()) + } } diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index 62f7bdce7..7e949d595 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -16,25 +16,19 @@ use crate::{ }, }; use anyhow::Result; -use dashmap::DashMap; use docs_rs_mimes::{self as mimes, detect_mime}; use docs_rs_opentelemetry::AnyMeterProvider; use docs_rs_types::{BuildId, CompressionAlgorithm, KrateName, Version}; use docs_rs_utils::spawn_blocking; use futures_util::{TryStreamExt as _, future, stream::BoxStream}; -use std::{ - fmt, - path::{Path, PathBuf}, - sync::Arc, -}; -use tokio::{fs, io, sync::Mutex}; -use tracing::{debug, info_span, instrument, trace, warn}; +use std::{fmt, path::Path, pin::Pin, sync::Arc}; +use tokio::{fs, io}; +use tracing::{info_span, instrument, trace, warn}; pub struct AsyncStorage { backend: StorageBackend, config: Arc, - /// Locks to synchronize write-access to the locally cached archive index files. - locks: DashMap>>, + archive_index_cache: archive_index::Cache, } impl AsyncStorage { @@ -47,7 +41,7 @@ impl AsyncStorage { StorageKind::Memory => StorageBackend::Memory(MemoryBackend::new(otel_metrics)), StorageKind::S3 => StorageBackend::S3(S3Backend::new(&config, otel_metrics).await?), }, - locks: DashMap::with_capacity(config.local_archive_cache_expected_count), + archive_index_cache: archive_index::Cache::new(&config), config, }) } @@ -56,7 +50,7 @@ impl AsyncStorage { &self.config } - #[instrument] + #[instrument(skip(self))] pub async fn exists(&self, path: &str) -> Result { self.backend.exists(path).await } @@ -70,7 +64,7 @@ impl AsyncStorage { /// * `path` - the wanted path inside the documentation. /// * `archive_storage` - if `true`, we will assume we have a remove ZIP archive and an index /// where we can fetch the requested path from inside the ZIP file. - #[instrument] + #[instrument(skip(self))] pub async fn stream_rustdoc_file( &self, name: &KrateName, @@ -90,6 +84,7 @@ impl AsyncStorage { }) } + #[instrument(skip(self))] pub async fn fetch_source_file( &self, name: &KrateName, @@ -104,7 +99,7 @@ impl AsyncStorage { .await } - #[instrument] + #[instrument(skip(self))] pub async fn stream_source_file( &self, name: &KrateName, @@ -123,7 +118,7 @@ impl AsyncStorage { }) } - #[instrument] + #[instrument(skip(self))] pub async fn rustdoc_file_exists( &self, name: &KrateName, @@ -142,40 +137,26 @@ impl AsyncStorage { }) } - #[instrument] + #[instrument(skip(self))] pub async fn exists_in_archive( &self, archive_path: &str, latest_build_id: Option, path: &str, ) -> Result { - for attempt in 0..2 { - match self - .find_in_archive_index(archive_path, latest_build_id, path) - .await - { - Ok(file_info) => return Ok(file_info.is_some()), - Err(err) if err.downcast_ref::().is_some() => { - return Ok(false); - } - Err(err) if attempt == 0 => { - warn!( - ?err, - "error fetching range from archive, purging local index cache and retrying once" - ); - self.purge_archive_index_cache(archive_path, latest_build_id) - .await?; - - continue; - } - Err(err) => return Err(err), - } + match self + .archive_index_cache + .find(archive_path, latest_build_id, path, self) + .await + { + Ok(file_info) => Ok(file_info.is_some()), + Err(err) if err.downcast_ref::().is_some() => Ok(false), + Err(err) => Err(err), } - unreachable!("exists_in_archive retry loop exited unexpectedly"); } /// get, decompress and materialize an object from store - #[instrument] + #[instrument(skip(self))] pub async fn get(&self, path: &str, max_size: usize) -> Result { self.get_stream(path).await?.materialize(max_size).await } @@ -184,19 +165,19 @@ impl AsyncStorage { /// /// We don't decompress ourselves, S3 only decompresses with a correct /// `Content-Encoding` header set, which we don't. - #[instrument] + #[instrument(skip(self))] pub async fn get_raw_stream(&self, path: &str) -> Result { self.backend.get_stream(path, None).await } /// get a decompressing stream to an object in storage. - #[instrument] + #[instrument(skip(self))] pub async fn get_stream(&self, path: &str) -> Result { Ok(self.get_raw_stream(path).await?.decompress().await?) } /// get, decompress and materialize part of an object from store - #[instrument] + #[instrument(skip(self))] pub(crate) async fn get_range( &self, path: &str, @@ -211,7 +192,7 @@ impl AsyncStorage { } /// get a decompressing stream to a range inside an object in storage - #[instrument] + #[instrument(skip(self))] pub(crate) async fn get_range_stream( &self, path: &str, @@ -226,128 +207,7 @@ impl AsyncStorage { Ok(raw_stream.decompress().await?) } - fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { - let local_index_path = local_index_path.as_ref().to_path_buf(); - - self.locks - .entry(local_index_path) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .downgrade() - .clone() - } - - async fn purge_archive_index_cache( - &self, - archive_path: &str, - latest_build_id: Option, - ) -> Result<()> { - // we know that config.local_archive_cache_path is an absolute path, not relative. - // So it will be usable as key in the DashMap. - let local_index_path = self.config.local_archive_cache_path.join(format!( - "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", - latest_build_id.map(|id| id.0).unwrap_or(0) - )); - - let rwlock = self.local_index_cache_lock(&local_index_path); - - let _write_guard = rwlock.lock().await; - - if fs::try_exists(&local_index_path).await? { - fs::remove_file(&local_index_path).await?; - } - - Ok(()) - } - - #[instrument(skip(self))] - async fn download_archive_index( - &self, - local_index_path: &Path, - remote_index_path: &str, - ) -> Result<()> { - let parent = local_index_path - .parent() - .ok_or_else(|| anyhow::anyhow!("index path without parent"))? - .to_path_buf(); - fs::create_dir_all(&parent).await?; - - // Create a unique temp file in the cache folder. - let (temp_file, mut temp_path) = spawn_blocking({ - let folder = self.config.local_archive_cache_path.clone(); - move || -> Result<_> { tempfile::NamedTempFile::new_in(&folder).map_err(Into::into) } - }) - .await? - .into_parts(); - - // Download into temp file. - let mut temp_file = fs::File::from_std(temp_file); - let mut stream = self.get_stream(remote_index_path).await?.content; - io::copy(&mut stream, &mut temp_file).await?; - temp_file.sync_all().await?; - - temp_path.disable_cleanup(true); - - // Publish atomically. - // Will replace any existing file. - fs::rename(&temp_path, local_index_path).await?; - - // fsync parent dir to make rename durable - spawn_blocking(move || { - let dir = std::fs::File::open(parent)?; - dir.sync_all().map_err(Into::into) - }) - .await?; - - Ok(()) - } - - /// Find find the file into needed to fetch a certain path inside a remote archive. - /// Will try to use a local cache of the index file, and otherwise download it - /// from storage. #[instrument(skip(self))] - async fn find_in_archive_index( - &self, - archive_path: &str, - latest_build_id: Option, - path_in_archive: &str, - ) -> Result> { - // we know that config.local_archive_cache_path is an absolute path, not relative. - // So it will be usable as key in the DashMap. - let local_index_path = self.config.local_archive_cache_path.join(format!( - "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", - latest_build_id.map(|id| id.0).unwrap_or(0) - )); - - // fast path: try to use whatever is there, no locking - match archive_index::find_in_file(&local_index_path, path_in_archive).await { - Ok(res) => return Ok(res), - Err(err) => { - debug!(?err, "archive index lookup failed, will try repair."); - } - } - - let lock = self.local_index_cache_lock(&local_index_path); - let write_guard = lock.lock().await; - - // Double-check: maybe someone fixed it between our first failure and now. - if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await { - return Ok(res); - } - - let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - - // We are the repairer: download fresh index into place. - self.download_archive_index(&local_index_path, &remote_index_path) - .await?; - - // Write lock is dropped here (end of scope), so others can proceed. - drop(write_guard); - - // Final attempt: if this still fails, bubble the error. - archive_index::find_in_file(local_index_path, path_in_archive).await - } - - #[instrument] pub async fn get_from_archive( &self, archive_path: &str, @@ -370,7 +230,8 @@ impl AsyncStorage { ) -> Result { for attempt in 0..2 { let info = self - .find_in_archive_index(archive_path, latest_build_id, path) + .archive_index_cache + .find(archive_path, latest_build_id, path, self) .await? .ok_or(PathNotFoundError)?; @@ -410,7 +271,8 @@ impl AsyncStorage { ?err, "error fetching range from archive, purging local index cache and retrying once" ); - self.purge_archive_index_cache(archive_path, latest_build_id) + self.archive_index_cache + .purge(archive_path, latest_build_id) .await?; continue; @@ -593,7 +455,7 @@ impl AsyncStorage { #[instrument(skip(self, content))] pub async fn store_one_uncompressed( &self, - path: impl Into + std::fmt::Debug, + path: impl Into + fmt::Debug, content: impl Into>, ) -> Result<()> { let path = path.into(); @@ -617,7 +479,7 @@ impl AsyncStorage { #[instrument(skip(self, content))] pub async fn store_one( &self, - path: impl Into + std::fmt::Debug, + path: impl Into + fmt::Debug, content: impl Into>, ) -> Result { let path = path.into(); @@ -641,8 +503,8 @@ impl AsyncStorage { #[instrument(skip(self))] pub async fn store_path( &self, - target_path: impl Into + std::fmt::Debug, - source_path: impl AsRef + std::fmt::Debug, + target_path: impl Into + fmt::Debug, + source_path: impl AsRef + fmt::Debug, ) -> Result { let target_path = target_path.into(); let source_path = source_path.as_ref(); @@ -674,6 +536,7 @@ impl AsyncStorage { Ok(alg) } + #[instrument(skip(self))] pub async fn list_prefix<'a>(&'a self, prefix: &'a str) -> BoxStream<'a, Result> { self.backend.list_prefix(prefix).await } @@ -695,7 +558,16 @@ impl AsyncStorage { } } -impl std::fmt::Debug for AsyncStorage { +impl archive_index::Downloader for AsyncStorage { + fn fetch_archive_index<'a>( + &'a self, + remote_index_path: &'a str, + ) -> Pin> + Send + 'a>> { + Box::pin(self.get_stream(remote_index_path)) + } +} + +impl fmt::Debug for AsyncStorage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &self.backend { #[cfg(any(test, feature = "testing"))] @@ -705,138 +577,6 @@ impl std::fmt::Debug for AsyncStorage { } } -#[cfg(test)] -mod test { - use super::*; - use crate::testing::TestStorage; - - #[tokio::test(flavor = "multi_thread")] - async fn test_outdated_local_archive_index_gets_redownloaded() -> Result<()> { - let metrics = docs_rs_opentelemetry::testing::TestMetrics::new(); - let storage = TestStorage::from_kind(StorageKind::S3, metrics.provider()).await?; - - // virtual latest build id, used for local caching of the index files - const LATEST_BUILD_ID: Option = Some(BuildId(42)); - let cache_root = storage.config.local_archive_cache_path.clone(); - - let cache_filename = |archive_name: &str| { - cache_root.join(format!( - "{}.{}.{}", - archive_name, - LATEST_BUILD_ID.unwrap(), - ARCHIVE_INDEX_FILE_EXTENSION - )) - }; - - /// dummy archives, files will contain their name as content - async fn create_archive( - storage: &AsyncStorage, - archive_name: &str, - filenames: &[&str], - ) -> Result<()> { - let dir = tempfile::Builder::new() - .prefix("docs.rs-upload-archive-test") - .tempdir()?; - for &file in filenames.iter() { - let path = dir.path().join(file); - fs::write(path, file).await?; - } - storage - .store_all_in_archive(archive_name, dir.path()) - .await?; - - Ok(()) - } - - // create two archives with indexes that contain the same filename - create_archive( - &storage, - "test1.zip", - &["file1.txt", "file2.txt", "important.txt"], - ) - .await?; - - create_archive( - &storage, - "test2.zip", - &["important.txt", "another_file_1.txt", "another_file_2.txt"], - ) - .await?; - - for archive_name in &["test1.zip", "test2.zip"] { - assert!(storage.exists(archive_name).await?); - - assert!( - storage - .exists(&format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", archive_name)) - .await? - ); - // local index cache doesn't exist yet - let local_index_file = cache_filename(archive_name); - assert!(!fs::try_exists(&local_index_file).await?); - - // this will then create the cache - assert!( - storage - .exists_in_archive(archive_name, LATEST_BUILD_ID, "important.txt") - .await? - ); - assert!(fs::try_exists(&local_index_file).await?); - - // fetching the content out of the archive also works - assert_eq!( - storage - .get_from_archive(archive_name, LATEST_BUILD_ID, "important.txt", usize::MAX) - .await? - .content, - b"important.txt" - ); - } - - // validate if the positions are really different in the archvies, - // for the same filename. - let pos_in_test1_zip = storage - .find_in_archive_index("test1.zip", LATEST_BUILD_ID, "important.txt") - .await? - .unwrap(); - let pos_in_test2_zip = storage - .find_in_archive_index("test2.zip", LATEST_BUILD_ID, "important.txt") - .await? - .unwrap(); - - assert_ne!(pos_in_test1_zip.range(), pos_in_test2_zip.range()); - - // now I'm swapping the local index files. - // This should simulate hat I have an outdated byte-range for a file - - let local_index_file_1 = cache_filename("test1.zip"); - let local_index_file_2 = cache_filename("test2.zip"); - - { - let temp_path = cache_root.join("temp_index_swap.tmp"); - fs::rename(&local_index_file_1, &temp_path).await?; - fs::rename(&local_index_file_2, &local_index_file_1).await?; - fs::rename(&temp_path, &local_index_file_2).await?; - } - - // now try to fetch the files inside the archives again, the local files - // should be removed, refetched, and all should be fine. - // Without our fallback / delete mechanism, this would fail. - - for archive_name in &["test1.zip", "test2.zip"] { - assert_eq!( - storage - .get_from_archive(archive_name, LATEST_BUILD_ID, "important.txt", usize::MAX) - .await? - .content, - b"important.txt" - ); - } - - Ok(()) - } -} - /// Backend tests are a set of tests executed on all the supportedootorage backends. They ensure /// docs.rs behaves the same no matter the storage backend currently used. ///