diff --git a/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json b/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json new file mode 100644 index 000000000..cd3631230 --- /dev/null +++ b/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334" +} diff --git a/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json b/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json new file mode 100644 index 000000000..61a7c2de7 --- /dev/null +++ b/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525" +} diff --git a/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json b/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json new file mode 100644 index 000000000..36491fedc --- /dev/null +++ b/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 AND version = $2;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c" +} diff --git a/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json b/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json new file mode 100644 index 000000000..50396d59a --- /dev/null +++ b/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT set_config('lock_timeout', $1, true)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "set_config", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e" +} diff --git a/crates/bin/cratesfyi/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json b/crates/bin/cratesfyi/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json new file mode 100644 index 000000000..cd3631230 --- /dev/null +++ b/crates/bin/cratesfyi/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334" +} diff --git a/crates/bin/cratesfyi/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json b/crates/bin/cratesfyi/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json new file mode 100644 index 000000000..61a7c2de7 --- /dev/null +++ b/crates/bin/cratesfyi/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525" +} diff --git a/crates/bin/cratesfyi/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json b/crates/bin/cratesfyi/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json new file mode 100644 index 000000000..50396d59a --- /dev/null +++ b/crates/bin/cratesfyi/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT set_config('lock_timeout', $1, true)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "set_config", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json b/crates/bin/docs_rs_watcher/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json new file mode 100644 index 000000000..cd3631230 --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "1105d2dff6e153dbc8fce074f6cc37f3cd187b93228f403dc2d1d2bebc37c334" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json b/crates/bin/docs_rs_watcher/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json new file mode 100644 index 000000000..61a7c2de7 --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "18effde1f350768e5f6c2cbb715d14d389c19fc176a900e0b148c793cdde0525" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json b/crates/bin/docs_rs_watcher/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json new file mode 100644 index 000000000..36491fedc --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM queue WHERE name = $1 AND version = $2;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "4fba2b9b3ef48ae8cb05538daff587e898f0f4fc035f0f26820766839c72ec7c" +} diff --git a/crates/bin/docs_rs_watcher/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json b/crates/bin/docs_rs_watcher/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json new file mode 100644 index 000000000..50396d59a --- /dev/null +++ b/crates/bin/docs_rs_watcher/.sqlx/query-fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT set_config('lock_timeout', $1, true)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "set_config", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "fcee15572f69a3e3be73825baaade8e35340f56f7b698f6a9eacd18a61dc092e" +} diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index da8317e29..7b5f17976 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -17,6 +17,9 @@ pub struct Config { // automatic rebuild configuration pub max_queued_rebuilds: Option, + /// Maximum time to wait for queue row locks when deleting crates/releases. + pub delete_lock_timeout: Duration, + pub repository: docs_rs_repository_stats::Config, } @@ -32,6 +35,10 @@ impl AppConfig for Config { )?), registry_gc_interval: env("DOCSRS_REGISTRY_GC_INTERVAL", 60 * 60)?, max_queued_rebuilds: maybe_env("DOCSRS_MAX_QUEUED_REBUILDS")?, + delete_lock_timeout: Duration::from_secs(env::( + "DOCSRS_DELETE_LOCK_TIMEOUT_SECONDS", + 20 * 60, + )?), repository: docs_rs_repository_stats::Config::from_environment()?, }) } diff --git a/crates/bin/docs_rs_watcher/src/consistency/mod.rs b/crates/bin/docs_rs_watcher/src/consistency/mod.rs index 694d4ba63..d554e91c9 100644 --- a/crates/bin/docs_rs_watcher/src/consistency/mod.rs +++ b/crates/bin/docs_rs_watcher/src/consistency/mod.rs @@ -37,7 +37,7 @@ pub async fn run_check(config: &Config, ctx: &Context, dry_run: bool) -> Result< .context("Loading crate data from index for consistency check")?; let diff = diff::calculate_diff(db_data.iter(), index_data.iter()); - let result = handle_diff(ctx, diff.iter(), dry_run).await?; + let result = handle_diff(config, ctx, diff.iter(), dry_run).await?; println!("============"); println!("SUMMARY"); @@ -75,7 +75,12 @@ struct HandleResult { yanks_corrected: u32, } -async fn handle_diff<'a, I>(ctx: &Context, iter: I, dry_run: bool) -> Result +async fn handle_diff<'a, I>( + config: &Config, + ctx: &Context, + iter: I, + dry_run: bool, +) -> Result where I: Iterator, { @@ -89,7 +94,8 @@ where match difference { diff::Difference::CrateNotInIndex(name) => { if !dry_run - && let Err(err) = delete::delete_crate(&mut conn, ctx.storage()?, name).await + && let Err(err) = + delete::delete_crate(&mut conn, ctx.storage()?, config, name).await { warn!(?difference, ?err, "error handling CrateNotInIndex"); } @@ -111,7 +117,8 @@ where diff::Difference::ReleaseNotInIndex(name, version) => { if !dry_run && let Err(err) = - delete::delete_version(&mut conn, ctx.storage()?, name, version).await + delete::delete_version(&mut conn, ctx.storage()?, config, name, version) + .await { warn!(?difference, ?err, "error handling ReleaseNotInIndex"); } @@ -185,7 +192,7 @@ mod tests { let diff = [Difference::CrateNotInIndex(KRATE)]; // calling with dry-run leads to no change - handle_diff(&env, diff.iter(), true).await?; + handle_diff(env.config(), &env, diff.iter(), true).await?; assert_eq!( count(&env, "SELECT count(*) FROM crates WHERE name = 'krate'").await?, @@ -193,7 +200,7 @@ mod tests { ); // without dry-run the crate will be deleted - handle_diff(&env, diff.iter(), false).await?; + handle_diff(env.config(), &env, diff.iter(), false).await?; assert_eq!( count(&env, "SELECT count(*) FROM crates WHERE name = 'krate'").await?, @@ -223,11 +230,11 @@ mod tests { assert_eq!(count(&env, "SELECT count(*) FROM releases").await?, 2); - handle_diff(&env, diff.iter(), true).await?; + handle_diff(env.config(), &env, diff.iter(), true).await?; assert_eq!(count(&env, "SELECT count(*) FROM releases").await?, 2); - handle_diff(&env, diff.iter(), false).await?; + handle_diff(env.config(), &env, diff.iter(), false).await?; assert_eq!( single_row::( @@ -254,14 +261,14 @@ mod tests { let diff = [Difference::ReleaseYank(KRATE, V1, false)]; - handle_diff(&env, diff.iter(), true).await?; + handle_diff(env.config(), &env, diff.iter(), true).await?; assert_eq!( single_row::(&env, "SELECT yanked FROM releases").await?, vec![true] ); - handle_diff(&env, diff.iter(), false).await?; + handle_diff(env.config(), &env, diff.iter(), false).await?; assert_eq!( single_row::(&env, "SELECT yanked FROM releases").await?, @@ -276,13 +283,13 @@ mod tests { let env = TestEnvironment::new().await?; let diff = [Difference::ReleaseNotInDb(KRATE, V1)]; - handle_diff(&env, diff.iter(), true).await?; + handle_diff(env.config(), &env, diff.iter(), true).await?; let build_queue = env.build_queue()?; assert!(build_queue.queued_crates().await?.is_empty()); - handle_diff(&env, diff.iter(), false).await?; + handle_diff(env.config(), &env, diff.iter(), false).await?; assert_eq!( build_queue @@ -301,13 +308,13 @@ mod tests { let env = TestEnvironment::new().await?; let diff = [Difference::CrateNotInDb(KRATE, vec![V1, V2])]; - handle_diff(&env, diff.iter(), true).await?; + handle_diff(env.config(), &env, diff.iter(), true).await?; let build_queue = env.build_queue()?; assert!(build_queue.queued_crates().await?.is_empty()); - handle_diff(&env, diff.iter(), false).await?; + handle_diff(env.config(), &env, diff.iter(), false).await?; assert_eq!( build_queue diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index 141369be1..5ca84b462 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -1,3 +1,4 @@ +use crate::Config; use anyhow::{Context as _, Result}; use docs_rs_database::crate_details::update_latest_version_id; use docs_rs_storage::{AsyncStorage, rustdoc_archive_path, source_archive_path}; @@ -13,13 +14,14 @@ static OTHER_STORAGE_PATHS_TO_DELETE: &[&str] = &["sources"]; pub async fn delete_crate( conn: &mut sqlx::PgConnection, storage: &AsyncStorage, + config: &Config, name: &KrateName, ) -> Result<()> { let Some(crate_id) = get_id(conn, name).await? else { return Ok(()); }; - let is_library = delete_crate_from_database(conn, name, crate_id).await?; + let is_library = delete_crate_from_database(conn, config, name, crate_id).await?; // #899 let paths = if is_library { LIBRARY_STORAGE_PATHS_TO_DELETE @@ -56,6 +58,7 @@ pub async fn delete_crate( pub async fn delete_version( conn: &mut sqlx::PgConnection, storage: &AsyncStorage, + config: &Config, name: &KrateName, version: &Version, ) -> Result<()> { @@ -63,7 +66,7 @@ pub async fn delete_version( return Ok(()); }; - let is_library = delete_version_from_database(conn, name, crate_id, version).await?; + let is_library = delete_version_from_database(conn, config, name, crate_id, version).await?; let paths = if is_library { LIBRARY_STORAGE_PATHS_TO_DELETE } else { @@ -125,11 +128,31 @@ const METADATA: &[(&str, &str)] = &[ /// Returns whether this release was a library async fn delete_version_from_database( conn: &mut sqlx::PgConnection, + config: &Config, name: &KrateName, crate_id: CrateId, version: &Version, ) -> Result { let mut transaction = conn.begin().await?; + + let delete_lock_timeout = format!("{}ms", config.delete_lock_timeout.as_millis()); + + sqlx::query!( + "SELECT set_config('lock_timeout', $1, true)", + &delete_lock_timeout + ) + .fetch_one(&mut *transaction) + .await?; + + // Lock queue rows first so we align lock order with builders and wait for in-progress builds. + sqlx::query!( + "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + name as _, + version as _ + ) + .fetch_all(&mut *transaction) + .await?; + for &(table, column) in METADATA { sqlx::query( format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)").as_str()) @@ -161,11 +184,26 @@ async fn delete_version_from_database( /// Returns whether any release in this crate was a library async fn delete_crate_from_database( conn: &mut sqlx::PgConnection, + config: &Config, name: &KrateName, crate_id: CrateId, ) -> Result { let mut transaction = conn.begin().await?; + let delete_lock_timeout = format!("{}ms", config.delete_lock_timeout.as_millis()); + + sqlx::query!( + "SELECT set_config('lock_timeout', $1, true)", + &delete_lock_timeout + ) + .fetch_one(&mut *transaction) + .await?; + + // Lock queue rows first so we align lock order with builders and wait for in-progress builds. + sqlx::query!("SELECT id FROM queue WHERE name = $1 FOR UPDATE", name as _) + .fetch_all(&mut *transaction) + .await?; + sqlx::query!( "DELETE FROM sandbox_overrides WHERE crate_name = $1", name as _ @@ -225,6 +263,7 @@ mod tests { testing::{BAR, FOO, KRATE, V1, V2}, }; use test_case::test_case; + use tokio::time::{Duration, timeout}; async fn crate_exists(conn: &mut sqlx::PgConnection, name: &KrateName) -> Result { Ok( @@ -242,6 +281,21 @@ mod tests { .is_some()) } + async fn queue_entry_exists( + conn: &mut sqlx::PgConnection, + name: &KrateName, + version: &Version, + ) -> Result { + Ok(sqlx::query!( + "SELECT id FROM queue WHERE name = $1 AND version = $2;", + name as _, + version as _ + ) + .fetch_optional(conn) + .await? + .is_some()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_get_id_uses_normalization() -> Result<()> { let env = TestEnvironment::new().await?; @@ -318,7 +372,7 @@ mod tests { ); } - delete_crate(&mut conn, storage, &FOO).await?; + delete_crate(&mut conn, storage, env.config(), &FOO).await?; assert!(!queue.has_build_queued(&FOO, &V1).await?); assert!(!queue.has_build_queued(&FOO, &V2).await?); @@ -482,7 +536,7 @@ mod tests { vec!["Peter Rabbit".to_string()] ); - delete_version(&mut conn, storage, &KRATE, &V1).await?; + delete_version(&mut conn, storage, env.config(), &KRATE, &V1).await?; assert!(!queue.has_build_queued(&KRATE, &V1).await?); assert!(queue.has_build_queued(&KRATE, &V2).await?); assert!(!release_exists(&mut conn, v1).await?); @@ -557,7 +611,7 @@ mod tests { ) .await?; - delete_version(&mut conn, env.storage()?, &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; assert!(!release_exists(&mut conn, release_id).await?); @@ -577,7 +631,7 @@ mod tests { ) .await?; - delete_crate(&mut conn, env.storage()?, &KRATE).await?; + delete_crate(&mut conn, env.storage()?, env.config(), &KRATE).await?; assert!(!crate_exists(&mut conn, &KRATE).await?); assert!(!release_exists(&mut conn, release_id).await?); @@ -592,7 +646,7 @@ mod tests { let mut conn = env.async_conn().await?; assert!(!crate_exists(&mut conn, &KRATE).await?); - delete_crate(&mut conn, env.storage()?, &KRATE).await?; + delete_crate(&mut conn, env.storage()?, env.config(), &KRATE).await?; assert!(!crate_exists(&mut conn, &KRATE).await?); @@ -606,10 +660,111 @@ mod tests { assert!(!crate_exists(&mut conn, &KRATE).await?); - delete_version(&mut conn, env.storage()?, &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; assert!(!crate_exists(&mut conn, &KRATE).await?); Ok(()) } + + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { + let env = TestEnvironment::new().await?; + let queue = env.build_queue()?; + let storage = env.storage()?; + let krate = KRATE; + let version = V1; + + let mut conn = env.async_conn().await?; + + queue.add_crate(&krate, &version, 0, None).await?; + let release_id = env + .fake_release() + .await + .name(&krate) + .version(V1) + .create() + .await?; + + let mut lock_conn = env.async_conn().await?; + let mut queue_lock = lock_conn.begin().await?; + sqlx::query!( + "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + krate as _, + version as _ + ) + .fetch_one(&mut *queue_lock) + .await?; + + timeout(Duration::from_secs(10), async { + let (delete_result, unlock_result) = tokio::join!( + delete_version(&mut conn, storage, env.config(), &krate, &version), + async { + tokio::time::sleep(Duration::from_millis(200)).await; + queue_lock.rollback().await + } + ); + delete_result?; + unlock_result?; + Ok::<_, anyhow::Error>(()) + }) + .await + .context("delete_version timed out while waiting for queue row lock")??; + + assert!(!release_exists(&mut conn, release_id).await?); + assert!(!queue_entry_exists(&mut conn, &krate, &version).await?); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_crate_waits_for_locked_queue_rows() -> Result<()> { + let env = TestEnvironment::new().await?; + let queue = env.build_queue()?; + let storage = env.storage()?; + let krate = KRATE; + let version = V1; + + let mut conn = env.async_conn().await?; + + queue.add_crate(&krate, &version, 0, None).await?; + let release_id = env + .fake_release() + .await + .name(&krate) + .version(V1) + .create() + .await?; + + let mut lock_conn = env.async_conn().await?; + let mut queue_lock = lock_conn.begin().await?; + sqlx::query!( + "SELECT id FROM queue WHERE name = $1 AND version = $2 FOR UPDATE", + krate as _, + version as _ + ) + .fetch_one(&mut *queue_lock) + .await?; + + timeout(Duration::from_secs(10), async { + let (delete_result, unlock_result) = tokio::join!( + delete_crate(&mut conn, storage, env.config(), &krate), + async { + tokio::time::sleep(Duration::from_millis(200)).await; + queue_lock.rollback().await + } + ); + delete_result?; + unlock_result?; + Ok::<_, anyhow::Error>(()) + }) + .await + .context("delete_crate timed out while waiting for queue row lock")??; + + assert!(!crate_exists(&mut conn, &krate).await?); + assert!(!release_exists(&mut conn, release_id).await?); + assert!(!queue_entry_exists(&mut conn, &krate, &version).await?); + + Ok(()) + } } diff --git a/crates/bin/docs_rs_watcher/src/index_watcher.rs b/crates/bin/docs_rs_watcher/src/index_watcher.rs index fed0efe18..bdc588bd2 100644 --- a/crates/bin/docs_rs_watcher/src/index_watcher.rs +++ b/crates/bin/docs_rs_watcher/src/index_watcher.rs @@ -1,4 +1,5 @@ use crate::{ + Config, db::{delete_crate, delete_version}, index::Index, }; @@ -89,7 +90,11 @@ async fn queue_crate_invalidation(krate: &KrateName, cdn: Option<&Cdn>) { /// Updates registry index repository and adds new crates into build queue. /// /// Returns the number of crates added -pub(crate) async fn get_new_crates(context: &Context, index: &Index) -> Result { +pub(crate) async fn get_new_crates( + context: &Context, + index: &Index, + config: &Config, +) -> Result { let mut conn = context.pool()?.get_async().await?; let last_seen_reference = last_seen_reference(&mut conn).await?; @@ -110,7 +115,7 @@ pub(crate) async fn get_new_crates(context: &Context, index: &Index) -> Result, registry: Option<&str>, + config: &Config, ) -> usize { let mut crates_added = 0; for change in changes { - match process_change(context, change, registry).await { + match process_change(context, change, registry, config).await { Ok(added) => { if added { crates_added += 1; @@ -151,6 +157,7 @@ async fn process_change( context: &Context, change: &Change, registry: Option<&str>, + config: &Config, ) -> Result { let crate_version: CrateVersion = change .versions() @@ -170,10 +177,10 @@ async fn process_change( } Change::CrateDeleted { name, .. } => { let name: KrateName = name.parse()?; - process_crate_deleted(context, &name).await? + process_crate_deleted(context, config, &name).await? } Change::VersionDeleted(_release) => { - process_version_deleted(context, &crate_version).await? + process_version_deleted(context, config, &crate_version).await? } }; Ok(change.added().is_some()) @@ -223,12 +230,17 @@ async fn process_version_added( Ok(()) } -async fn process_version_deleted(context: &Context, release: &CrateVersion) -> Result<()> { +async fn process_version_deleted( + context: &Context, + config: &Config, + release: &CrateVersion, +) -> Result<()> { let mut conn = context.pool()?.get_async().await?; delete_version( &mut conn, context.storage()?, + config, &release.name, &release.version, ) @@ -252,10 +264,14 @@ async fn process_version_deleted(context: &Context, release: &CrateVersion) -> R Ok(()) } -async fn process_crate_deleted(context: &Context, krate: &KrateName) -> Result<()> { +async fn process_crate_deleted( + context: &Context, + config: &Config, + krate: &KrateName, +) -> Result<()> { let mut conn = context.pool()?.get_async().await?; - delete_crate(&mut conn, context.storage()?, krate) + delete_crate(&mut conn, context.storage()?, config, krate) .await .with_context(|| format!("failed to delete crate {krate}"))?; info!( @@ -329,6 +345,7 @@ mod tests { use crate::testing::TestEnvironment; use docs_rs_build_queue::PRIORITY_DEFAULT; use docs_rs_types::testing::{KRATE, V1, V2}; + use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread")] @@ -431,7 +448,7 @@ mod tests { .create() .await?; - process_crate_deleted(&env, &KRATE).await?; + process_crate_deleted(&env, env.config(), &KRATE).await?; let row = sqlx::query!( "SELECT id @@ -469,7 +486,7 @@ mod tests { version: V2, ..Default::default() }; - process_version_deleted(&env, &krate).await?; + process_version_deleted(&env, env.config(), &krate).await?; let row = sqlx::query!( "SELECT id @@ -527,6 +544,7 @@ mod tests { Change::VersionDeleted(non_existing_version.into()), ], None, + env.config(), ) .await; diff --git a/crates/bin/docs_rs_watcher/src/lib.rs b/crates/bin/docs_rs_watcher/src/lib.rs index 76383ee6d..833a6c688 100644 --- a/crates/bin/docs_rs_watcher/src/lib.rs +++ b/crates/bin/docs_rs_watcher/src/lib.rs @@ -36,7 +36,7 @@ pub async fn watch_registry(config: &Config, context: &Context) -> Result<()> { debug!("Checking new crates"); let index = Index::from_config(config).await?; - match get_new_crates(context, &index).await { + match get_new_crates(context, &index, config).await { Ok(n) => debug!("{} crates added to queue", n), Err(e) => { error!(?e, "Failed to get new crates"); diff --git a/crates/bin/docs_rs_watcher/src/main.rs b/crates/bin/docs_rs_watcher/src/main.rs index f89dc40b9..ba133b8e1 100644 --- a/crates/bin/docs_rs_watcher/src/main.rs +++ b/crates/bin/docs_rs_watcher/src/main.rs @@ -192,7 +192,7 @@ impl DatabaseSubcommand { .await?; } - Self::Delete { command } => command.handle_args(ctx).await?, + Self::Delete { command } => command.handle_args(config, ctx).await?, Self::Synchronize { dry_run } => { docs_rs_watcher::consistency::run_check(&config, &ctx, dry_run).await?; @@ -223,17 +223,18 @@ enum DeleteSubcommand { } impl DeleteSubcommand { - async fn handle_args(self, ctx: Context) -> Result<()> { + async fn handle_args(self, config: Arc, ctx: Context) -> Result<()> { let mut conn = ctx.pool()?.get_async().await?; let storage = ctx.storage()?; match self { Self::Version { name, version } => { - docs_rs_watcher::delete_version(&mut conn, storage, &name, &version).await?; + docs_rs_watcher::delete_version(&mut conn, storage, &config, &name, &version) + .await?; } Self::Crate { name } => { - docs_rs_watcher::delete_crate(&mut conn, storage, &name).await?; + docs_rs_watcher::delete_crate(&mut conn, storage, &config, &name).await?; } }