diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 5324c9298..21fa0ba46 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -41,7 +41,7 @@ pub(crate) const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; /// dummy size we assume in case of errors const DUMMY_FILE_SIZE: u64 = 1024 * 1024; // 1 MiB /// self-repair attempts -const FIND_ATTEMPTS: usize = 5; +const REPAIR_ATTEMPTS: usize = 5; #[derive(Debug)] struct Metrics { @@ -54,7 +54,7 @@ struct Metrics { evicted_entry_size: Histogram, // local cache misses / downloads & bytes - // includes & doesn't differentiate retries / repairs for now + // includes retries / repairs downloads: Counter, downloaded_bytes: Counter, downloaded_entry_size: Histogram, @@ -207,7 +207,7 @@ impl Cache { /// create a new archive index cache. /// /// Also starts a background task that will backfill the in-memory cache management based - /// on the local files that are already. + /// on the local files that are already present. pub(crate) async fn new( config: Arc, meter_provider: &AnyMeterProvider, @@ -259,11 +259,9 @@ impl Cache { // the specified duration past from get or insert. // We don't set TTL (time to live), which would be just time-after-insert. .time_to_idle(config.ttl) - // we weigh each cache entry by the file size of the sqlite database. - // The max size of the cache for all of docs.rs is 500 GiB at the time of writing. - // In KiB, this would be around 500k, which makes KiB the right unit. - // Anything bigger (like MiB) would mean that we count smaller dbs than 1 MiB as if - // they were 1 MiB big. + // We weigh each cache entry by the file size of the SQLite database. + // The configured capacity is in MiB, but using KiB as moka's weight unit + // avoids counting every index smaller than 1 MiB as if it were 1 MiB. .weigher(|_key: &PathBuf, entry: &Arc| -> u32 { entry.file_size_kib }) // max capacity // not entries, but _weighted entries_. @@ -275,7 +273,7 @@ impl Cache { let path = path.to_path_buf(); let metrics = metrics_for_eviction.clone(); // The spawned task means file deletion is deferred. See the - // "benign race with the eviction listener" comment in `find_inner` + // "benign race with the eviction listener" comment in `find_index_inner` // for why this is acceptable. tokio::spawn(async move { let reason = format!("{reason:?}"); @@ -356,8 +354,8 @@ impl Cache { /// /// Should be needed only once after server startup. /// - /// While this is running, our `find_inner` & `download_archive_index` logic will just - /// fill it itself. + /// While this is running, `find_index_inner` and `download_archive_index` backfill + /// entries on demand for requested indexes. /// /// Concurrency is set to a lower value intentionally so we don't put /// too much i/o pressure onto the disk. @@ -438,11 +436,53 @@ impl Cache { Ok(()) } - pub(crate) async fn find_index( + async fn retry_with_purge( &self, archive_path: &str, latest_build_id: Option, - downloader: &impl Downloader, + mut action: F, + ) -> Result<(T, usize)> + where + F: FnMut() -> Fut, + Fut: Future>, + { + for attempt in 1..=REPAIR_ATTEMPTS { + match action().await { + Ok(value) => return Ok((value, attempt)), + Err(err) if attempt < REPAIR_ATTEMPTS => { + warn!( + ?err, + %attempt, + "archive index operation failed, purging local cache and retrying" + ); + self.purge(archive_path, latest_build_id).await?; + } + Err(err) => return Err(err), + } + } + + unreachable!("archive index retry loop exited unexpectedly"); + } + + pub(crate) async fn find_index( + &self, + archive_path: &str, + latest_build_id: Option, + downloader: &D, + ) -> Result { + let (index, _) = self + .retry_with_purge(archive_path, latest_build_id, || { + self.find_index_inner(archive_path, latest_build_id, downloader) + }) + .await?; + Ok(index) + } + + async fn find_index_inner( + &self, + archive_path: &str, + latest_build_id: Option, + downloader: &D, ) -> Result { let local_index_path = self.local_index_path(archive_path, latest_build_id); @@ -539,7 +579,8 @@ impl Cache { } })?; - // Final attempt: if this still fails, bubble the error. + // Final open for this retry attempt. If it fails, the caller's retry loop + // purges the cache entry and tries again. Index::open(local_index_path).await } @@ -547,55 +588,44 @@ impl Cache { /// Will try to use a local cache of the index file, and otherwise download it /// from storage. #[instrument(skip(self, downloader))] - pub(crate) async fn find( + pub(crate) async fn find( &self, archive_path: &str, latest_build_id: Option, path_in_archive: &str, - downloader: &impl Downloader, + downloader: &D, ) -> Result> { - for attempt in 1..=FIND_ATTEMPTS { - let result = async { + let result = self + .retry_with_purge(archive_path, latest_build_id, || async { let mut index = self - .find_index(archive_path, latest_build_id, downloader) + .find_index_inner(archive_path, latest_build_id, downloader) .await?; index.find(path_in_archive).await - } + }) .await; - match result { - Ok(file_info) => { - self.metrics.find_calls.add( - 1, - &[ - KeyValue::new("attempt", attempt.to_string()), - KeyValue::new("outcome", "success"), - ], - ); - return Ok(file_info); - } - Err(err) if attempt < FIND_ATTEMPTS => { - warn!( - ?err, - %attempt, - "error in archive index lookup, purging local cache and retrying" - ); - self.purge(archive_path, latest_build_id).await?; - } - Err(err) => { - self.metrics.find_calls.add( - 1, - &[ - KeyValue::new("attempt", attempt.to_string()), - KeyValue::new("outcome", "error"), - ], - ); - return Err(err); - } + match result { + Ok((file_info, attempt)) => { + self.metrics.find_calls.add( + 1, + &[ + KeyValue::new("attempt", attempt.to_string()), + KeyValue::new("outcome", "success"), + ], + ); + return Ok(file_info); + } + Err(err) => { + self.metrics.find_calls.add( + 1, + &[ + KeyValue::new("attempt", REPAIR_ATTEMPTS.to_string()), + KeyValue::new("outcome", "error"), + ], + ); + return Err(err); } } - - unreachable!("find retry loop exited unexpectedly"); } #[instrument(skip(self, downloader))] @@ -678,7 +708,7 @@ async fn sqlite_create>(path: P) -> Result>(path: P) -> Result { @@ -1341,7 +1371,7 @@ mod tests { } #[tokio::test] - async fn find_retries_once_then_errors() -> Result<()> { + async fn find_retries_then_errors() -> Result<()> { let cache = test_cache().await?; const LATEST_BUILD_ID: Option = Some(BuildId(7)); const ARCHIVE_NAME: &str = "test.zip"; @@ -1368,7 +1398,10 @@ mod tests { .message(), "file is not a database" ); - assert_eq!(downloader.download_count(&remote_index_path), FIND_ATTEMPTS); + assert_eq!( + downloader.download_count(&remote_index_path), + REPAIR_ATTEMPTS + ); Ok(()) } @@ -1388,14 +1421,14 @@ mod tests { let downloader = FlakyDownloader::new( remote_index_path, create_index_bytes(1).await?, - FIND_ATTEMPTS - 1, + REPAIR_ATTEMPTS - 1, ); let result = cache .find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader) .await?; assert!(result.is_some()); - assert_eq!(downloader.fetch_count(), FIND_ATTEMPTS); + assert_eq!(downloader.fetch_count(), REPAIR_ATTEMPTS); Ok(()) }