diff --git a/.sqlx/query-13d3402d8d5bd2f1fee96566ed633ac319d1d8169e9115215775968a047e4069.json b/.sqlx/query-13d3402d8d5bd2f1fee96566ed633ac319d1d8169e9115215775968a047e4069.json deleted file mode 100644 index 791fc49a7..000000000 --- a/.sqlx/query-13d3402d8d5bd2f1fee96566ed633ac319d1d8169e9115215775968a047e4069.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nSELECT c.name,\n r.version AS \"version: Version\"\nFROM crates AS c\n JOIN releases AS r\n ON c.id = r.crate_id\n JOIN release_build_status AS rbs\n ON rbs.rid = r.id\n JOIN builds AS b\n ON b.rid = r.id\n AND b.build_finished = rbs.last_build_time\n AND b.rustc_nightly_date >= $1\n AND b.rustc_nightly_date < $2\n\n\n", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "name", - "type_info": "Text" - }, - { - "ordinal": 1, - "name": "version: Version", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "Date", - "Date" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "13d3402d8d5bd2f1fee96566ed633ac319d1d8169e9115215775968a047e4069" -} diff --git a/.sqlx/query-1af366cf365f5b3899425ac6c4a7aa404969aa8dc62389d0b405556d3c5bb25b.json b/.sqlx/query-1af366cf365f5b3899425ac6c4a7aa404969aa8dc62389d0b405556d3c5bb25b.json deleted file mode 100644 index c6428a207..000000000 --- a/.sqlx/query-1af366cf365f5b3899425ac6c4a7aa404969aa8dc62389d0b405556d3c5bb25b.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE releases\n SET yanked = $3\n FROM crates\n WHERE crates.id = releases.crate_id\n AND name = $1\n AND version = $2\n RETURNING crates.id as \"id: CrateId\"\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id: CrateId", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Text", - "Text", - "Bool" - ] - }, - "nullable": [ - false - ] - }, - "hash": "1af366cf365f5b3899425ac6c4a7aa404969aa8dc62389d0b405556d3c5bb25b" -} diff --git a/.sqlx/query-2bdcde85b92839f75ee67864fca58c2a4e9c177ee442eb4828e3f3f14f7c8853.json b/.sqlx/query-2bdcde85b92839f75ee67864fca58c2a4e9c177ee442eb4828e3f3f14f7c8853.json new file mode 100644 index 000000000..f091d6c52 --- /dev/null +++ b/.sqlx/query-2bdcde85b92839f75ee67864fca58c2a4e9c177ee442eb4828e3f3f14f7c8853.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT i.* FROM (\n SELECT\n c.name as \"name: KrateName\",\n r.version as \"version: Version\",\n (\n SELECT MAX(COALESCE(b.build_finished, b.build_started))\n FROM builds AS b\n WHERE b.rid = r.id\n ) AS last_build_attempt\n FROM crates AS c\n INNER JOIN releases AS r ON c.latest_version_id = r.id\n\n WHERE\n r.rustdoc_status = TRUE\n ) as i\n ORDER BY i.last_build_attempt ASC\n LIMIT $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name: KrateName", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "version: Version", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "last_build_attempt", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + null + ] + }, + "hash": "2bdcde85b92839f75ee67864fca58c2a4e9c177ee442eb4828e3f3f14f7c8853" +} diff --git a/.sqlx/query-bb6a174647b07fd68bb2e1010bab9fd12acd6a57e29bf2ecdef5a99ab2186ce0.json b/.sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json similarity index 58% rename from .sqlx/query-bb6a174647b07fd68bb2e1010bab9fd12acd6a57e29bf2ecdef5a99ab2186ce0.json rename to .sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json index 189f1c888..ae5a0b475 100644 --- a/.sqlx/query-bb6a174647b07fd68bb2e1010bab9fd12acd6a57e29bf2ecdef5a99ab2186ce0.json +++ b/.sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n id,\n name,\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE\n attempt < $1 AND\n (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2))\n ORDER BY priority ASC, attempt ASC, id ASC\n LIMIT 1\n FOR UPDATE SKIP LOCKED", + "query": "SELECT\n id,\n name as \"name: KrateName\",\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE\n attempt < $1 AND\n (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2))\n ORDER BY priority ASC, attempt ASC, id ASC\n LIMIT 1\n FOR UPDATE SKIP LOCKED", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "name", + "name": "name: KrateName", "type_info": "Text" }, { @@ -49,5 +49,5 @@ false ] }, - "hash": "bb6a174647b07fd68bb2e1010bab9fd12acd6a57e29bf2ecdef5a99ab2186ce0" + "hash": "48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349" } diff --git a/.sqlx/query-e9e7696f5b32df8981648f3c0ca71c9d032b39b373f2db2713365c2efb03d408.json b/.sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json similarity index 63% rename from .sqlx/query-e9e7696f5b32df8981648f3c0ca71c9d032b39b373f2db2713365c2efb03d408.json rename to .sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json index baa946d77..59d4a4eaf 100644 --- a/.sqlx/query-e9e7696f5b32df8981648f3c0ca71c9d032b39b373f2db2713365c2efb03d408.json +++ b/.sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n id,\n name,\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE attempt < $1\n ORDER BY priority ASC, attempt ASC, id ASC", + "query": "SELECT\n id,\n name as \"name: KrateName\",\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE attempt < $1\n ORDER BY priority ASC, attempt ASC, id ASC", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "name", + "name": "name: KrateName", "type_info": "Text" }, { @@ -48,5 +48,5 @@ false ] }, - "hash": "e9e7696f5b32df8981648f3c0ca71c9d032b39b373f2db2713365c2efb03d408" + "hash": "7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168" } diff --git a/.sqlx/query-8f9ba7439b7282048989dcf36fd6cd88e2ea238a0b8e1b42a7515f23d97eddfb.json b/.sqlx/query-8f9ba7439b7282048989dcf36fd6cd88e2ea238a0b8e1b42a7515f23d97eddfb.json new file mode 100644 index 000000000..76a6e7969 --- /dev/null +++ b/.sqlx/query-8f9ba7439b7282048989dcf36fd6cd88e2ea238a0b8e1b42a7515f23d97eddfb.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE releases\n SET yanked = $3\n FROM crates\n WHERE crates.id = releases.crate_id\n AND name = $1\n AND version = $2\n RETURNING crates.id as \"id: CrateId\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: CrateId", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Bool" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8f9ba7439b7282048989dcf36fd6cd88e2ea238a0b8e1b42a7515f23d97eddfb" +} diff --git a/.sqlx/query-aebd890b300726ef82aad7137c2a6b69fd0f7baf3d82b0bc4807852315ec63cf.json b/.sqlx/query-aebd890b300726ef82aad7137c2a6b69fd0f7baf3d82b0bc4807852315ec63cf.json deleted file mode 100644 index 945a85e93..000000000 --- a/.sqlx/query-aebd890b300726ef82aad7137c2a6b69fd0f7baf3d82b0bc4807852315ec63cf.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT i.* FROM (\n SELECT\n c.name,\n r.version as \"version: Version\",\n (\n SELECT MAX(COALESCE(b.build_finished, b.build_started))\n FROM builds AS b\n WHERE b.rid = r.id\n ) AS last_build_attempt\n FROM crates AS c\n INNER JOIN releases AS r ON c.latest_version_id = r.id\n\n WHERE\n r.rustdoc_status = TRUE\n ) as i\n ORDER BY i.last_build_attempt ASC\n LIMIT $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "name", - "type_info": "Text" - }, - { - "ordinal": 1, - "name": "version: Version", - "type_info": "Text" - }, - { - "ordinal": 2, - "name": "last_build_attempt", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - null - ] - }, - "hash": "aebd890b300726ef82aad7137c2a6b69fd0f7baf3d82b0bc4807852315ec63cf" -} diff --git a/.sqlx/query-dc6fda3230424886a21b26c0adf41e814712a9d3e6c771468109858c7f8ad295.json b/.sqlx/query-dc6fda3230424886a21b26c0adf41e814712a9d3e6c771468109858c7f8ad295.json new file mode 100644 index 000000000..8019026f4 --- /dev/null +++ b/.sqlx/query-dc6fda3230424886a21b26c0adf41e814712a9d3e6c771468109858c7f8ad295.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n c.name AS \"name: KrateName\",\n r.version AS \"version: Version\"\n FROM crates AS c\n JOIN releases AS r\n ON c.id = r.crate_id\n JOIN release_build_status AS rbs\n ON rbs.rid = r.id\n JOIN builds AS b\n ON b.rid = r.id\n AND b.build_finished = rbs.last_build_time\n AND b.rustc_nightly_date >= $1\n AND b.rustc_nightly_date < $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name: KrateName", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "version: Version", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Date", + "Date" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "dc6fda3230424886a21b26c0adf41e814712a9d3e6c771468109858c7f8ad295" +} diff --git a/Cargo.lock b/Cargo.lock index 7fb15de47..8c2cf8645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,6 +1915,7 @@ dependencies = [ "crates-index-diff", "derive_builder", "derive_more 2.1.0", + "docs_rs_build_queue", "docs_rs_cargo_metadata", "docs_rs_database", "docs_rs_env_vars", @@ -1981,6 +1982,24 @@ dependencies = [ "walkdir", ] +[[package]] +name = "docs_rs_build_queue" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "docs_rs_database", + "docs_rs_env_vars", + "docs_rs_opentelemetry", + "docs_rs_types", + "futures-util", + "opentelemetry", + "pretty_assertions", + "sqlx", + "tokio", + "tracing", +] + [[package]] name = "docs_rs_cargo_metadata" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d17695f2b..3ee2298a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ opentelemetry = "0.31.0" opentelemetry-otlp = { version = "0.31.0", features = ["grpc-tonic", "metrics"] } opentelemetry-resource-detectors = "0.10.0" opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +pretty_assertions = "1.4.0" postcard = { version = "1.1.3", default-features = false, features = ["use-std"] } rand = "0.9" regex = "1" @@ -55,6 +56,7 @@ url = { version = "2.1.1", features = ["serde"] } walkdir = "2" [dependencies] +docs_rs_build_queue = { path = "crates/lib/docs_rs_build_queue" } docs_rs_cargo_metadata = { path = "crates/lib/docs_rs_cargo_metadata" } docs_rs_database = { path = "crates/lib/docs_rs_database" } docs_rs_env_vars = { path = "crates/lib/docs_rs_env_vars" } @@ -137,6 +139,7 @@ chrono = { workspace = true } constant_time_eq = "0.4.2" [dev-dependencies] +docs_rs_build_queue = { path = "crates/lib/docs_rs_build_queue", features = ["testing"] } docs_rs_cargo_metadata = { path = "crates/lib/docs_rs_cargo_metadata", features = ["testing"] } docs_rs_database = { path = "crates/lib/docs_rs_database", features = ["testing"] } docs_rs_fastly = { path = "crates/lib/docs_rs_fastly", features = ["testing"] } @@ -152,7 +155,7 @@ test-case = { workspace = true } tower = { version = "0.5.1", features = ["util"] } opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "testing"] } indoc = "2.0.0" -pretty_assertions = "1.4.0" +pretty_assertions = { workspace = true } [build-dependencies] time = "0.3" diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json b/crates/lib/docs_rs_build_queue/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json new file mode 100644 index 000000000..14efdbfd4 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET priority = GREATEST(priority, $1)\n WHERE\n name = $2\n AND version != $3\n AND attempt < $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "130cd68b74145ec609a4903c644ce6bc8c9a6d1c3af4953e81a1dfc248a05a3c" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-1e660947261dfa1a5d1745d1732df59e0cf67ef1906da818086d063e6a0e21c6.json b/crates/lib/docs_rs_build_queue/.sqlx/query-1e660947261dfa1a5d1745d1732df59e0cf67ef1906da818086d063e6a0e21c6.json new file mode 100644 index 000000000..e1bcf20ab --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-1e660947261dfa1a5d1745d1732df59e0cf67ef1906da818086d063e6a0e21c6.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE crates\n SET latest_version_id = $2\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "1e660947261dfa1a5d1745d1732df59e0cf67ef1906da818086d063e6a0e21c6" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-4894c7d8c4e354dca1d952362b2e0cb25441e8e65b273e01ed86d2d3ecebfe84.json b/crates/lib/docs_rs_build_queue/.sqlx/query-4894c7d8c4e354dca1d952362b2e0cb25441e8e65b273e01ed86d2d3ecebfe84.json new file mode 100644 index 000000000..ca0a44f4b --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-4894c7d8c4e354dca1d952362b2e0cb25441e8e65b273e01ed86d2d3ecebfe84.json @@ -0,0 +1,87 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n releases.id as \"id: ReleaseId\",\n releases.version as \"version: Version\",\n release_build_status.build_status as \"build_status!: BuildStatus\",\n releases.yanked,\n releases.is_library,\n releases.rustdoc_status,\n releases.release_time,\n releases.target_name,\n releases.default_target,\n releases.doc_targets\n FROM releases\n INNER JOIN release_build_status ON releases.id = release_build_status.rid\n WHERE\n releases.crate_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id: ReleaseId", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "version: Version", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "build_status!: BuildStatus", + "type_info": { + "Custom": { + "name": "build_status", + "kind": { + "Enum": [ + "in_progress", + "success", + "failure" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "yanked", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "is_library", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "rustdoc_status", + "type_info": "Bool" + }, + { + "ordinal": 6, + "name": "release_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "target_name", + "type_info": "Varchar" + }, + { + "ordinal": 8, + "name": "default_target", + "type_info": "Varchar" + }, + { + "ordinal": 9, + "name": "doc_targets", + "type_info": "Json" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false, + false, + true, + true, + true, + true, + true, + true, + true + ] + }, + "hash": "4894c7d8c4e354dca1d952362b2e0cb25441e8e65b273e01ed86d2d3ecebfe84" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json b/crates/lib/docs_rs_build_queue/.sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json new file mode 100644 index 000000000..ae5a0b475 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349.json @@ -0,0 +1,53 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n id,\n name as \"name: KrateName\",\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE\n attempt < $1 AND\n (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2))\n ORDER BY priority ASC, attempt ASC, id ASC\n LIMIT 1\n FOR UPDATE SKIP LOCKED", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "name: KrateName", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "version: Version", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "priority", + "type_info": "Int4" + }, + { + "ordinal": 4, + "name": "registry", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "attempt", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4", + "Float8" + ] + }, + "nullable": [ + false, + false, + false, + false, + true, + false + ] + }, + "hash": "48eac6628305a8235aa12b45e1299615bfce99d0e693739d5d497f37600a6349" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-4a6887c2d436121cb2ba6a9c5069455b8f222d929672dc1ff810fa49c2940e2c.json b/crates/lib/docs_rs_build_queue/.sqlx/query-4a6887c2d436121cb2ba6a9c5069455b8f222d929672dc1ff810fa49c2940e2c.json new file mode 100644 index 000000000..530c4d879 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-4a6887c2d436121cb2ba6a9c5069455b8f222d929672dc1ff810fa49c2940e2c.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO config (name, value)\n VALUES ($1, $2)\n ON CONFLICT (name) DO UPDATE SET value = $2;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar", + "Json" + ] + }, + "nullable": [] + }, + "hash": "4a6887c2d436121cb2ba6a9c5069455b8f222d929672dc1ff810fa49c2940e2c" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-4b13fe2c8df2b8b8bf019344313b2bc6442482a604cf90fb6106154f8e69a1c2.json b/crates/lib/docs_rs_build_queue/.sqlx/query-4b13fe2c8df2b8b8bf019344313b2bc6442482a604cf90fb6106154f8e69a1c2.json new file mode 100644 index 000000000..6d306761e --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-4b13fe2c8df2b8b8bf019344313b2bc6442482a604cf90fb6106154f8e69a1c2.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE\n FROM queue\n WHERE\n name = $1 AND\n version = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "4b13fe2c8df2b8b8bf019344313b2bc6442482a604cf90fb6106154f8e69a1c2" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-5ad9cd6cd9d444d258f7486fda178d4dd071cf43cb0ea950574af8e2f37b4a21.json b/crates/lib/docs_rs_build_queue/.sqlx/query-5ad9cd6cd9d444d258f7486fda178d4dd071cf43cb0ea950574af8e2f37b4a21.json new file mode 100644 index 000000000..264e19fd2 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-5ad9cd6cd9d444d258f7486fda178d4dd071cf43cb0ea950574af8e2f37b4a21.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT value FROM config WHERE name = $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Json" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "5ad9cd6cd9d444d258f7486fda178d4dd071cf43cb0ea950574af8e2f37b4a21" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-743604e86c489f7f330adf83d66c810678cd8bbee215532ce26f2c4e76e54a67.json b/crates/lib/docs_rs_build_queue/.sqlx/query-743604e86c489f7f330adf83d66c810678cd8bbee215532ce26f2c4e76e54a67.json new file mode 100644 index 000000000..55b209a3f --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-743604e86c489f7f330adf83d66c810678cd8bbee215532ce26f2c4e76e54a67.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO queue (name, version, priority, registry)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (name, version) DO UPDATE\n SET priority = EXCLUDED.priority,\n registry = EXCLUDED.registry,\n attempt = 0,\n last_attempt = NULL\n ;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Int4", + "Text" + ] + }, + "nullable": [] + }, + "hash": "743604e86c489f7f330adf83d66c810678cd8bbee215532ce26f2c4e76e54a67" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json b/crates/lib/docs_rs_build_queue/.sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json new file mode 100644 index 000000000..59d4a4eaf --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n id,\n name as \"name: KrateName\",\n version as \"version: Version\",\n priority,\n registry,\n attempt\n FROM queue\n WHERE attempt < $1\n ORDER BY priority ASC, attempt ASC, id ASC", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "name: KrateName", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "version: Version", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "priority", + "type_info": "Int4" + }, + { + "ordinal": 4, + "name": "registry", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "attempt", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + false, + false, + false, + true, + false + ] + }, + "hash": "7c7149524e09ebc1578b1806643b3c5be340e9f9a60e994e6da80b2e9302a168" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-96b68919f9016705a1a36ef11a5a659e7fb431beb0017fbcfd21132f105ce722.json b/crates/lib/docs_rs_build_queue/.sqlx/query-96b68919f9016705a1a36ef11a5a659e7fb431beb0017fbcfd21132f105ce722.json new file mode 100644 index 000000000..984eff3ac --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-96b68919f9016705a1a36ef11a5a659e7fb431beb0017fbcfd21132f105ce722.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT relname\n FROM pg_class\n INNER JOIN pg_namespace ON\n pg_class.relnamespace = pg_namespace.oid\n WHERE pg_class.relkind = 'S'\n AND pg_namespace.nspname = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "relname", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + false + ] + }, + "hash": "96b68919f9016705a1a36ef11a5a659e7fb431beb0017fbcfd21132f105ce722" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-9e7595c7b9b336b24241c133870b99e1ee70e750849956d268ef1cb6df4f53d4.json b/crates/lib/docs_rs_build_queue/.sqlx/query-9e7595c7b9b336b24241c133870b99e1ee70e750849956d268ef1cb6df4f53d4.json new file mode 100644 index 000000000..fff5127ec --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-9e7595c7b9b336b24241c133870b99e1ee70e750849956d268ef1cb6df4f53d4.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE queue\n SET\n attempt = attempt + 1,\n last_attempt = NOW()\n WHERE id = $1\n RETURNING attempt;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "attempt", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false + ] + }, + "hash": "9e7595c7b9b336b24241c133870b99e1ee70e750849956d268ef1cb6df4f53d4" +} diff --git a/.sqlx/query-e95856a4991ea37e23ee3f28fbc171111accba3638f90abb39e20484d034dfe9.json b/crates/lib/docs_rs_build_queue/.sqlx/query-a23b671c7833d7c58238dbc7e8e9f46cae9db2b0c273e3245d9458614bf1243d.json similarity index 55% rename from .sqlx/query-e95856a4991ea37e23ee3f28fbc171111accba3638f90abb39e20484d034dfe9.json rename to crates/lib/docs_rs_build_queue/.sqlx/query-a23b671c7833d7c58238dbc7e8e9f46cae9db2b0c273e3245d9458614bf1243d.json index fdc235e64..f001ab425 100644 --- a/.sqlx/query-e95856a4991ea37e23ee3f28fbc171111accba3638f90abb39e20484d034dfe9.json +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-a23b671c7833d7c58238dbc7e8e9f46cae9db2b0c273e3245d9458614bf1243d.json @@ -1,14 +1,15 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO queue (name, version, priority, attempt, last_attempt )\n VALUES ('failed_crate', $1, 0, 99, NOW())", + "query": "\n INSERT INTO queue (name, version, priority, attempt, last_attempt )\n VALUES ($1, $2, 0, 99, NOW())", "describe": { "columns": [], "parameters": { "Left": [ + "Text", "Text" ] }, "nullable": [] }, - "hash": "e95856a4991ea37e23ee3f28fbc171111accba3638f90abb39e20484d034dfe9" + "hash": "a23b671c7833d7c58238dbc7e8e9f46cae9db2b0c273e3245d9458614bf1243d" } diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-a76e4776415625ee9d323db74a68a8670070276be0cea27a46a73f487430c5a3.json b/crates/lib/docs_rs_build_queue/.sqlx/query-a76e4776415625ee9d323db74a68a8670070276be0cea27a46a73f487430c5a3.json new file mode 100644 index 000000000..4f561d6a6 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-a76e4776415625ee9d323db74a68a8670070276be0cea27a46a73f487430c5a3.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n priority,\n COUNT(*) as \"count!\"\n FROM queue\n WHERE attempt < $1\n GROUP BY priority", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "priority", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "a76e4776415625ee9d323db74a68a8670070276be0cea27a46a73f487430c5a3" +} diff --git a/.sqlx/query-bcb6953f312804ebdad0370c2eaf2c816378c236c5d2028c3882a1b12c6362a1.json b/crates/lib/docs_rs_build_queue/.sqlx/query-bcb6953f312804ebdad0370c2eaf2c816378c236c5d2028c3882a1b12c6362a1.json similarity index 100% rename from .sqlx/query-bcb6953f312804ebdad0370c2eaf2c816378c236c5d2028c3882a1b12c6362a1.json rename to crates/lib/docs_rs_build_queue/.sqlx/query-bcb6953f312804ebdad0370c2eaf2c816378c236c5d2028c3882a1b12c6362a1.json diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-c8328ef704887faa486f9caebba0ba39a115b8e278ac4c6bb6b67f2dafefcfbb.json b/crates/lib/docs_rs_build_queue/.sqlx/query-c8328ef704887faa486f9caebba0ba39a115b8e278ac4c6bb6b67f2dafefcfbb.json new file mode 100644 index 000000000..b76920724 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-c8328ef704887faa486f9caebba0ba39a115b8e278ac4c6bb6b67f2dafefcfbb.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM queue WHERE id = $1;", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "c8328ef704887faa486f9caebba0ba39a115b8e278ac4c6bb6b67f2dafefcfbb" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-d16fc68c2607f6a1c94e92ca7cf95d26725b4fa8cc664a0c1474eceb67c31570.json b/crates/lib/docs_rs_build_queue/.sqlx/query-d16fc68c2607f6a1c94e92ca7cf95d26725b4fa8cc664a0c1474eceb67c31570.json new file mode 100644 index 000000000..ba6be127c --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-d16fc68c2607f6a1c94e92ca7cf95d26725b4fa8cc664a0c1474eceb67c31570.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) as \"count!\" FROM queue WHERE attempt >= $1;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + null + ] + }, + "hash": "d16fc68c2607f6a1c94e92ca7cf95d26725b4fa8cc664a0c1474eceb67c31570" +} diff --git a/.sqlx/query-e323820d9273ae10886a7a5db1864e11f94477ddef83a14b16f420c2f7b02bd0.json b/crates/lib/docs_rs_build_queue/.sqlx/query-e323820d9273ae10886a7a5db1864e11f94477ddef83a14b16f420c2f7b02bd0.json similarity index 100% rename from .sqlx/query-e323820d9273ae10886a7a5db1864e11f94477ddef83a14b16f420c2f7b02bd0.json rename to crates/lib/docs_rs_build_queue/.sqlx/query-e323820d9273ae10886a7a5db1864e11f94477ddef83a14b16f420c2f7b02bd0.json diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-f4765711eacc30103180cabe501b9c37ae3bbe46dceaa7e9332e8c898aed659c.json b/crates/lib/docs_rs_build_queue/.sqlx/query-f4765711eacc30103180cabe501b9c37ae3bbe46dceaa7e9332e8c898aed659c.json new file mode 100644 index 000000000..403de85f9 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-f4765711eacc30103180cabe501b9c37ae3bbe46dceaa7e9332e8c898aed659c.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id\n FROM queue\n WHERE\n attempt < $1 AND\n name = $2 AND\n version = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "f4765711eacc30103180cabe501b9c37ae3bbe46dceaa7e9332e8c898aed659c" +} diff --git a/crates/lib/docs_rs_build_queue/.sqlx/query-f8b389df3451e4b5e6539e9260ba6340edf69c7dba22e667aedd510e868b0f00.json b/crates/lib/docs_rs_build_queue/.sqlx/query-f8b389df3451e4b5e6539e9260ba6340edf69c7dba22e667aedd510e868b0f00.json new file mode 100644 index 000000000..937d9c012 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/.sqlx/query-f8b389df3451e4b5e6539e9260ba6340edf69c7dba22e667aedd510e868b0f00.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE\n FROM queue\n WHERE name = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "f8b389df3451e4b5e6539e9260ba6340edf69c7dba22e667aedd510e868b0f00" +} diff --git a/.sqlx/query-f8cb592ce46398544f35bafe57abb0abbf27ebba1645489e9e8142a0f3c1df93.json b/crates/lib/docs_rs_build_queue/.sqlx/query-f8cb592ce46398544f35bafe57abb0abbf27ebba1645489e9e8142a0f3c1df93.json similarity index 100% rename from .sqlx/query-f8cb592ce46398544f35bafe57abb0abbf27ebba1645489e9e8142a0f3c1df93.json rename to crates/lib/docs_rs_build_queue/.sqlx/query-f8cb592ce46398544f35bafe57abb0abbf27ebba1645489e9e8142a0f3c1df93.json diff --git a/crates/lib/docs_rs_build_queue/Cargo.toml b/crates/lib/docs_rs_build_queue/Cargo.toml new file mode 100644 index 000000000..6f59da306 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "docs_rs_build_queue" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = { workspace = true } +chrono = { workspace = true } +docs_rs_database = { path = "../docs_rs_database" } +docs_rs_env_vars = { path = "../docs_rs_env_vars" } +docs_rs_opentelemetry = { path = "../docs_rs_opentelemetry" } +docs_rs_types = { path = "../docs_rs_types" } +futures-util = { workspace = true } +opentelemetry = { workspace = true } +sqlx = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +docs_rs_opentelemetry = { path = "../docs_rs_opentelemetry", features = ["testing"] } +docs_rs_database = { path = "../docs_rs_database", features = ["testing"] } +docs_rs_types = { path = "../docs_rs_types", features = ["testing"] } +pretty_assertions = { workspace = true } + +[features] +testing = [ + "docs_rs_database/testing", + "docs_rs_opentelemetry/testing", + "docs_rs_types/testing", +] diff --git a/crates/lib/docs_rs_build_queue/src/config.rs b/crates/lib/docs_rs_build_queue/src/config.rs new file mode 100644 index 000000000..e3ec73c6d --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/config.rs @@ -0,0 +1,33 @@ +use docs_rs_env_vars::maybe_env; +use std::time::Duration; + +#[derive(Debug)] +pub struct Config { + pub build_attempts: u16, + pub delay_between_build_attempts: Duration, +} + +impl Default for Config { + fn default() -> Self { + Self { + build_attempts: 5, + delay_between_build_attempts: Duration::from_secs(60), + } + } +} + +impl Config { + pub fn from_environment() -> anyhow::Result { + let mut config = Self::default(); + + if let Some(attempts) = maybe_env::("DOCSRS_BUILD_ATTEMPTS")? { + config.build_attempts = attempts; + } + + if let Some(delay) = maybe_env::("DOCSRS_DELAY_BETWEEN_BUILD_ATTEMPTS")? { + config.delay_between_build_attempts = Duration::from_secs(delay); + } + + Ok(config) + } +} diff --git a/crates/lib/docs_rs_build_queue/src/lib.rs b/crates/lib/docs_rs_build_queue/src/lib.rs new file mode 100644 index 000000000..bdfc5eac0 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/lib.rs @@ -0,0 +1,22 @@ +mod config; +mod metrics; +mod queue; +mod types; + +pub use config::Config; +pub use queue::{blocking::BuildQueue, non_blocking::AsyncBuildQueue}; +pub use types::{BuildPackageSummary, QueuedCrate}; + +pub const PRIORITY_DEFAULT: i32 = 0; +/// Used for workspaces to avoid blocking the queue (done through the cratesfyi CLI, not used in code) +#[allow(dead_code)] +pub const PRIORITY_DEPRIORITIZED: i32 = 1; +/// Rebuilds triggered from crates.io, see issue #2442 +pub const PRIORITY_MANUAL_FROM_CRATES_IO: i32 = 5; +/// Used for rebuilds queued through cratesfyi for crate versions failed due to a broken Rustdoc nightly version. +/// Note: a broken rustdoc version does not necessarily imply a failed build. +pub const PRIORITY_BROKEN_RUSTDOC: i32 = 10; +/// Used by the synchronize cratesfyi command when queueing builds that are in the crates.io index but not in the database. +pub const PRIORITY_CONSISTENCY_CHECK: i32 = 15; +/// The static priority for background rebuilds, used when queueing rebuilds, and when rendering them collapsed in the UI. +pub const PRIORITY_CONTINUOUS: i32 = 20; diff --git a/crates/lib/docs_rs_build_queue/src/metrics.rs b/crates/lib/docs_rs_build_queue/src/metrics.rs new file mode 100644 index 000000000..edd87a148 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/metrics.rs @@ -0,0 +1,20 @@ +use docs_rs_opentelemetry::AnyMeterProvider; +use opentelemetry::metrics::Counter; + +#[derive(Debug)] +pub struct BuildQueueMetrics { + pub(crate) queued_builds: Counter, +} + +impl BuildQueueMetrics { + pub fn new(meter_provider: &AnyMeterProvider) -> Self { + let meter = meter_provider.meter("build_queue"); + const PREFIX: &str = "docsrs.build_queue"; + Self { + queued_builds: meter + .u64_counter(format!("{PREFIX}.queued_builds")) + .with_unit("1") + .build(), + } + } +} diff --git a/crates/lib/docs_rs_build_queue/src/queue/blocking.rs b/crates/lib/docs_rs_build_queue/src/queue/blocking.rs new file mode 100644 index 000000000..7ee7f0b9b --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/queue/blocking.rs @@ -0,0 +1,559 @@ +use crate::{AsyncBuildQueue, QueuedCrate, types::BuildPackageSummary}; +use anyhow::Result; +use docs_rs_types::{KrateName, Version}; +use sqlx::Connection as _; +#[cfg(test)] +use std::collections::HashMap; +use std::sync::Arc; +use tokio::runtime; +use tracing::error; + +#[derive(Debug)] +pub struct BuildQueue { + runtime: runtime::Handle, + inner: Arc, +} + +/// sync versions of async methods +impl BuildQueue { + pub fn add_crate( + &self, + name: &KrateName, + version: &Version, + priority: i32, + registry: Option<&str>, + ) -> Result<()> { + self.runtime + .block_on(self.inner.add_crate(name, version, priority, registry)) + } + + pub fn is_locked(&self) -> Result { + self.runtime.block_on(self.inner.is_locked()) + } + pub fn lock(&self) -> Result<()> { + self.runtime.block_on(self.inner.lock()) + } + pub fn unlock(&self) -> Result<()> { + self.runtime.block_on(self.inner.unlock()) + } + + #[cfg(test)] + pub(crate) fn pending_count(&self) -> Result { + self.runtime.block_on(self.inner.pending_count()) + } + #[cfg(test)] + pub(crate) fn prioritized_count(&self) -> Result { + self.runtime.block_on(self.inner.prioritized_count()) + } + #[cfg(test)] + pub(crate) fn pending_count_by_priority(&self) -> Result> { + self.runtime + .block_on(self.inner.pending_count_by_priority()) + } + #[cfg(test)] + pub(crate) fn failed_count(&self) -> Result { + self.runtime.block_on(self.inner.failed_count()) + } + #[cfg(test)] + pub(crate) fn queued_crates(&self) -> Result> { + self.runtime.block_on(self.inner.queued_crates()) + } +} + +impl BuildQueue { + pub fn new(runtime: runtime::Handle, inner: Arc) -> Self { + Self { runtime, inner } + } + + pub fn process_next_crate( + &self, + f: impl FnOnce(&QueuedCrate) -> Result, + ) -> Result> { + let mut conn = self.runtime.block_on(self.inner.db.get_async())?; + let mut transaction = self.runtime.block_on(conn.begin())?; + + // fetch the next available crate from the queue table. + // We are using `SELECT FOR UPDATE` inside a transaction so + // the QueuedCrate is locked until we are finished with it. + // `SKIP LOCKED` here will enable another build-server to just + // skip over taken (=locked) rows and start building the first + // available one. + let to_process = match self.runtime.block_on( + sqlx::query_as!( + QueuedCrate, + r#"SELECT + id, + name as "name: KrateName", + version as "version: Version", + priority, + registry, + attempt + FROM queue + WHERE + attempt < $1 AND + (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2)) + ORDER BY priority ASC, attempt ASC, id ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED"#, + self.inner.config.build_attempts as i32, + self.inner.config.delay_between_build_attempts.as_secs_f64(), + ) + .fetch_optional(&mut *transaction), + )? { + Some(krate) => krate, + None => return Ok(None), + }; + + let res = f(&to_process); + + let mut increase_attempt_count = || -> Result { + let next_attempt: i32 = self.runtime.block_on( + sqlx::query_scalar!( + "UPDATE queue + SET + attempt = attempt + 1, + last_attempt = NOW() + WHERE id = $1 + RETURNING attempt;", + to_process.id, + ) + .fetch_one(&mut *transaction), + )?; + + Ok(next_attempt) + }; + + let next_attempt: Option; + + match res { + Ok(BuildPackageSummary { + should_reattempt: false, + successful: _, + }) => { + self.runtime.block_on( + sqlx::query!("DELETE FROM queue WHERE id = $1;", to_process.id) + .execute(&mut *transaction), + )?; + next_attempt = None; + } + Ok(BuildPackageSummary { + should_reattempt: true, + successful: _, + }) => { + next_attempt = Some(increase_attempt_count()?); + } + Err(e) => { + next_attempt = Some(increase_attempt_count()?); + error!( + ?e, + name = %to_process.name, + version = %to_process.version, + "Failed to build package" + ); + } + } + + self.runtime.block_on(transaction.commit())?; + Ok(next_attempt) + } +} + +#[cfg(test)] +mod tests { + use crate::Config; + + use super::*; + use chrono::Utc; + use docs_rs_database::testing::TestDatabase; + use docs_rs_opentelemetry::testing::TestMetrics; + use docs_rs_types::testing::{KRATE, V1, V2}; + use pretty_assertions::assert_eq; + use std::time::Duration; + + const FOO: KrateName = KrateName::from_static("foo"); + const BAR: KrateName = KrateName::from_static("bar"); + const BAZ: KrateName = KrateName::from_static("baz"); + + // when we start migrating / spitting the binaries, + // we probably will create amore powerfull & flexible + // test& app context. Then we could migrate this. + struct TestEnv { + db: TestDatabase, + queue: BuildQueue, + metrics: TestMetrics, + runtime: runtime::Runtime, + } + + fn test_queue(config: Config) -> Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + let metrics = TestMetrics::new(); + let db = runtime.block_on(TestDatabase::new( + &docs_rs_database::Config::test_config()?, + metrics.provider(), + ))?; + + let async_queue = Arc::new(AsyncBuildQueue::new( + db.pool().clone(), + Arc::new(config), + metrics.provider(), + )); + + Ok(TestEnv { + db, + queue: BuildQueue { + runtime: runtime.handle().clone(), + inner: async_queue, + }, + metrics, + runtime, + }) + } + + #[test] + fn test_wait_between_build_attempts() -> Result<()> { + let env = test_queue(Config { + build_attempts: 99, + delay_between_build_attempts: Duration::from_secs(1), + })?; + + let queue = env.queue; + + queue.add_crate(&KRATE, &V1, 0, None)?; + + // first let it fail + queue.process_next_crate(|krate| { + assert_eq!(krate.name, KRATE); + anyhow::bail!("simulate a failure"); + })?; + + queue.process_next_crate(|_| { + // this can't happen since we didn't wait between attempts + unreachable!(); + })?; + + env.runtime.block_on(async { + // fake the build-attempt timestamp so it's older + let mut conn = env.db.async_conn().await; + sqlx::query!( + "UPDATE queue SET last_attempt = $1", + Utc::now() - chrono::Duration::try_seconds(60).unwrap() + ) + .execute(&mut *conn) + .await + })?; + + let mut handled = false; + // now we can process it again + queue.process_next_crate(|krate| { + assert_eq!(krate.name, KRATE); + handled = true; + Ok(BuildPackageSummary::default()) + })?; + + assert!(handled); + + Ok(()) + } + + #[test] + fn test_add_and_process_crates() -> Result<()> { + const MAX_ATTEMPTS: u16 = 3; + let env = test_queue(Config { + build_attempts: MAX_ATTEMPTS, + delay_between_build_attempts: Duration::ZERO, + })?; + let queue = env.queue; + + const LOW_PRIORITY: KrateName = KrateName::from_static("low-priority"); + const HIGH_PRIORITY_FOO: KrateName = KrateName::from_static("high-priority-foo"); + const MEDIUM_PRIORITY: KrateName = KrateName::from_static("medium-priority"); + const HIGH_PRIORITY_BAR: KrateName = KrateName::from_static("high-priority-bar"); + const STANDARD_PRIORITY: KrateName = KrateName::from_static("standard-priority"); + const HIGH_PRIORITY_BAZ: KrateName = KrateName::from_static("high-priority-baz"); + + let test_crates = [ + (LOW_PRIORITY, 1000), + (HIGH_PRIORITY_FOO, -1000), + (MEDIUM_PRIORITY, -10), + (HIGH_PRIORITY_BAR, -1000), + (STANDARD_PRIORITY, 0), + (HIGH_PRIORITY_BAZ, -1000), + ]; + for krate in &test_crates { + queue.add_crate(&krate.0, &V1, krate.1, None)?; + } + + let assert_next = |name| -> Result<()> { + queue.process_next_crate(|krate| { + assert_eq!(name, krate.name); + Ok(BuildPackageSummary::default()) + })?; + Ok(()) + }; + let assert_next_and_fail = |name| -> Result<()> { + queue.process_next_crate(|krate| { + assert_eq!(name, krate.name); + anyhow::bail!("simulate a failure"); + })?; + Ok(()) + }; + + // The first processed item is the one with the highest priority added first. + assert_next(HIGH_PRIORITY_FOO)?; + + // Simulate a failure in high-priority-bar. + assert_next_and_fail(HIGH_PRIORITY_BAR)?; + + // Continue with the next high priority crate. + assert_next(HIGH_PRIORITY_BAZ)?; + + // After all the crates with the max priority are processed, before starting to process + // crates with a lower priority the failed crates with the max priority will be tried + // again. + assert_next(HIGH_PRIORITY_BAR)?; + + // Continue processing according to the priority. + assert_next(MEDIUM_PRIORITY)?; + assert_next(STANDARD_PRIORITY)?; + + // Simulate the crate failing many times. + for _ in 0..MAX_ATTEMPTS { + assert_next_and_fail(LOW_PRIORITY)?; + } + + // Since low-priority failed many times it will be removed from the queue. Because of + // that the queue should now be empty. + let mut called = false; + queue.process_next_crate(|_| { + called = true; + Ok(BuildPackageSummary::default()) + })?; + assert!(!called, "there were still items in the queue"); + + let collected_metrics = env.metrics.collected_metrics(); + + assert_eq!( + collected_metrics + .get_metric("build_queue", "docsrs.build_queue.queued_builds")? + .get_u64_counter() + .value(), + test_crates.len() as u64 + ); + + Ok(()) + } + + #[test] + fn test_pending_count() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + assert_eq!(queue.pending_count()?, 0); + queue.add_crate(&FOO, &V1, 0, None)?; + assert_eq!(queue.pending_count()?, 1); + queue.add_crate(&BAR, &V1, 0, None)?; + assert_eq!(queue.pending_count()?, 2); + + queue.process_next_crate(|krate| { + assert_eq!(FOO, krate.name); + Ok(BuildPackageSummary::default()) + })?; + assert_eq!(queue.pending_count()?, 1); + + Ok(()) + } + + #[test] + fn test_prioritized_count() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + assert_eq!(queue.prioritized_count()?, 0); + queue.add_crate(&FOO, &V1, 0, None)?; + assert_eq!(queue.prioritized_count()?, 1); + queue.add_crate(&BAR, &V1, -100, None)?; + assert_eq!(queue.prioritized_count()?, 2); + queue.add_crate(&BAZ, &V1, 100, None)?; + assert_eq!(queue.prioritized_count()?, 2); + + queue.process_next_crate(|krate| { + assert_eq!(BAR, krate.name); + Ok(BuildPackageSummary::default()) + })?; + assert_eq!(queue.prioritized_count()?, 1); + + Ok(()) + } + + #[test] + fn test_count_by_priority() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + assert!(queue.pending_count_by_priority()?.is_empty()); + + queue.add_crate(&FOO, &V1, 1, None)?; + queue.add_crate(&BAR, &V2, 2, None)?; + queue.add_crate(&BAZ, &V2, 2, None)?; + + assert_eq!( + queue.pending_count_by_priority()?, + HashMap::from_iter(vec![(1, 1), (2, 2)]) + ); + + while queue.pending_count()? > 0 { + queue.process_next_crate(|_| Ok(BuildPackageSummary::default()))?; + } + assert!(queue.pending_count_by_priority()?.is_empty()); + + Ok(()) + } + + #[test] + fn test_failed_count_for_reattempts() -> Result<()> { + const MAX_ATTEMPTS: u16 = 3; + + let env = test_queue(Config { + build_attempts: MAX_ATTEMPTS, + delay_between_build_attempts: Duration::ZERO, + })?; + let queue = env.queue; + + assert_eq!(queue.failed_count()?, 0); + queue.add_crate(&FOO, &V1, -100, None)?; + assert_eq!(queue.failed_count()?, 0); + queue.add_crate(&BAR, &V1, 0, None)?; + + for _ in 0..MAX_ATTEMPTS { + assert_eq!(queue.failed_count()?, 0); + queue.process_next_crate(|krate| { + assert_eq!(FOO, krate.name); + Ok(BuildPackageSummary { + should_reattempt: true, + ..Default::default() + }) + })?; + } + assert_eq!(queue.failed_count()?, 1); + + queue.process_next_crate(|krate| { + assert_eq!(BAR, krate.name); + Ok(BuildPackageSummary::default()) + })?; + assert_eq!(queue.failed_count()?, 1); + + Ok(()) + } + + #[test] + fn test_failed_count_after_error() -> Result<()> { + const MAX_ATTEMPTS: u16 = 3; + + let env = test_queue(Config { + build_attempts: MAX_ATTEMPTS, + delay_between_build_attempts: Duration::ZERO, + })?; + let queue = env.queue; + + assert_eq!(queue.failed_count()?, 0); + queue.add_crate(&FOO, &V1, -100, None)?; + assert_eq!(queue.failed_count()?, 0); + queue.add_crate(&BAR, &V1, 0, None)?; + + for _ in 0..MAX_ATTEMPTS { + assert_eq!(queue.failed_count()?, 0); + queue.process_next_crate(|krate| { + assert_eq!(FOO, krate.name); + anyhow::bail!("this failed"); + })?; + } + assert_eq!(queue.failed_count()?, 1); + + queue.process_next_crate(|krate| { + assert_eq!(BAR, krate.name); + Ok(BuildPackageSummary::default()) + })?; + assert_eq!(queue.failed_count()?, 1); + + Ok(()) + } + + #[test] + fn test_queued_crates() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + let test_crates = [(BAR, 0), (FOO, -10), (BAZ, 10)]; + for krate in &test_crates { + queue.add_crate(&krate.0, &V1, krate.1, None)?; + } + + assert_eq!( + vec![(FOO, V1, -10), (BAR, V1, 0), (BAZ, V1, 10)], + queue + .queued_crates()? + .into_iter() + .map(|c| (c.name.clone(), c.version, c.priority)) + .collect::>() + ); + + Ok(()) + } + + #[test] + fn test_queue_lock() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + // unlocked without config + assert!(!queue.is_locked()?); + + queue.lock()?; + assert!(queue.is_locked()?); + + queue.unlock()?; + assert!(!queue.is_locked()?); + + Ok(()) + } + + #[test] + fn test_add_long_name() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + let name: KrateName = "krate".repeat(100)[..64].parse().unwrap(); + + queue.add_crate(&name, &V1, 0, None)?; + + queue.process_next_crate(|krate| { + assert_eq!(name, krate.name); + Ok(BuildPackageSummary::default()) + })?; + + Ok(()) + } + + #[test] + fn test_add_long_version() -> Result<()> { + let env = test_queue(Config::default())?; + let queue = env.queue; + + let long_version = Version::parse(&format!( + "1.2.3-{}+{}", + "prerelease".repeat(100), + "build".repeat(100) + ))?; + + queue.add_crate(&KRATE, &long_version, 0, None)?; + + queue.process_next_crate(|krate| { + assert_eq!(long_version, krate.version); + Ok(BuildPackageSummary::default()) + })?; + + Ok(()) + } +} diff --git a/crates/lib/docs_rs_build_queue/src/queue/mod.rs b/crates/lib/docs_rs_build_queue/src/queue/mod.rs new file mode 100644 index 000000000..ef1d7b54f --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/queue/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod blocking; +pub(crate) mod non_blocking; diff --git a/crates/lib/docs_rs_build_queue/src/queue/non_blocking.rs b/crates/lib/docs_rs_build_queue/src/queue/non_blocking.rs new file mode 100644 index 000000000..d06bff06a --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/queue/non_blocking.rs @@ -0,0 +1,389 @@ +use crate::{Config, QueuedCrate, metrics}; +use anyhow::Result; +use docs_rs_database::{ + Pool, + service_config::{ConfigName, get_config, set_config}, +}; +use docs_rs_opentelemetry::AnyMeterProvider; +use docs_rs_types::{KrateName, Version}; +use futures_util::TryStreamExt as _; +use std::{collections::HashMap, sync::Arc}; + +#[derive(Debug)] +pub struct AsyncBuildQueue { + pub(super) config: Arc, + pub(super) db: Pool, + queue_metrics: metrics::BuildQueueMetrics, + max_attempts: i32, +} + +impl AsyncBuildQueue { + pub fn new(db: Pool, config: Arc, otel_meter_provider: &AnyMeterProvider) -> Self { + AsyncBuildQueue { + max_attempts: config.build_attempts.into(), + config, + db, + queue_metrics: metrics::BuildQueueMetrics::new(otel_meter_provider), + } + } + + pub async fn add_crate( + &self, + name: &KrateName, + version: &Version, + priority: i32, + registry: Option<&str>, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + + sqlx::query!( + "INSERT INTO queue (name, version, priority, registry) + VALUES ($1, $2, $3, $4) + ON CONFLICT (name, version) DO UPDATE + SET priority = EXCLUDED.priority, + registry = EXCLUDED.registry, + attempt = 0, + last_attempt = NULL + ;", + name as _, + version as _, + priority, + registry, + ) + .execute(&mut *conn) + .await?; + + self.queue_metrics.queued_builds.add(1, &[]); + + Ok(()) + } + + pub async fn pending_count(&self) -> Result { + Ok(self + .pending_count_by_priority() + .await? + .values() + .sum::()) + } + + pub async fn prioritized_count(&self) -> Result { + Ok(self + .pending_count_by_priority() + .await? + .iter() + .filter(|&(&priority, _)| priority <= 0) + .map(|(_, count)| count) + .sum::()) + } + + pub async fn pending_count_by_priority(&self) -> Result> { + let mut conn = self.db.get_async().await?; + + Ok(sqlx::query!( + r#" + SELECT + priority, + COUNT(*) as "count!" + FROM queue + WHERE attempt < $1 + GROUP BY priority"#, + self.max_attempts, + ) + .fetch(&mut *conn) + .map_ok(|row| (row.priority, row.count as usize)) + .try_collect() + .await?) + } + + pub async fn failed_count(&self) -> Result { + let mut conn = self.db.get_async().await?; + + Ok(sqlx::query_scalar!( + r#"SELECT COUNT(*) as "count!" FROM queue WHERE attempt >= $1;"#, + self.max_attempts, + ) + .fetch_one(&mut *conn) + .await? as usize) + } + + pub async fn queued_crates(&self) -> Result> { + let mut conn = self.db.get_async().await?; + + Ok(sqlx::query_as!( + QueuedCrate, + r#"SELECT + id, + name as "name: KrateName", + version as "version: Version", + priority, + registry, + attempt + FROM queue + WHERE attempt < $1 + ORDER BY priority ASC, attempt ASC, id ASC"#, + self.max_attempts + ) + .fetch_all(&mut *conn) + .await?) + } + + pub async fn has_build_queued(&self, name: &KrateName, version: &Version) -> Result { + let mut conn = self.db.get_async().await?; + Ok(sqlx::query_scalar!( + "SELECT id + FROM queue + WHERE + attempt < $1 AND + name = $2 AND + version = $3 + ", + self.max_attempts, + name as _, + version as _, + ) + .fetch_optional(&mut *conn) + .await? + .is_some()) + } + + pub async fn remove_crate_from_queue(&self, name: &KrateName) -> Result<()> { + let mut conn = self.db.get_async().await?; + sqlx::query!( + "DELETE + FROM queue + WHERE name = $1 + ", + name as _ + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + pub async fn remove_version_from_queue( + &self, + name: &KrateName, + version: &Version, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + sqlx::query!( + "DELETE + FROM queue + WHERE + name = $1 AND + version = $2 + ", + name as _, + version as _, + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } + + /// Decreases the priority of all releases currently present in the queue not matching the version passed to *at least* new_priority. + pub async fn deprioritize_other_releases( + &self, + name: &KrateName, + latest_version: &Version, + new_priority: i32, + ) -> Result<()> { + let mut conn = self.db.get_async().await?; + sqlx::query!( + "UPDATE queue + SET priority = GREATEST(priority, $1) + WHERE + name = $2 + AND version != $3 + AND attempt < $4 + ", + new_priority, + name as _, + latest_version as _, + self.max_attempts, + ) + .execute(&mut *conn) + .await?; + + Ok(()) + } +} + +/// Locking functions. +impl AsyncBuildQueue { + /// Checks for the lock and returns whether it currently exists. + pub async fn is_locked(&self) -> Result { + let mut conn = self.db.get_async().await?; + + Ok(get_config::(&mut conn, ConfigName::QueueLocked) + .await? + .unwrap_or(false)) + } + + /// lock the queue. Daemon will check this lock and stop operating if it exists. + pub async fn lock(&self) -> Result<()> { + let mut conn = self.db.get_async().await?; + set_config(&mut conn, ConfigName::QueueLocked, true).await + } + + /// unlock the queue. + pub async fn unlock(&self) -> Result<()> { + let mut conn = self.db.get_async().await?; + set_config(&mut conn, ConfigName::QueueLocked, false).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use docs_rs_database::testing::TestDatabase; + use docs_rs_opentelemetry::testing::TestMetrics; + use docs_rs_types::testing::{KRATE, V1, V2}; + use pretty_assertions::assert_eq; + + const FAILED_KRATE: KrateName = KrateName::from_static("failed_crate"); + + // when we start migrating / spitting the binaries, + // we probably will create amore powerfull & flexible + // test& app context. Then we could migrate this. + struct TestEnv { + db: TestDatabase, + queue: AsyncBuildQueue, + } + + async fn test_queue() -> Result { + let test_metrics = TestMetrics::new(); + let db = TestDatabase::new( + &docs_rs_database::Config::test_config()?, + test_metrics.provider(), + ) + .await?; + + let queue = AsyncBuildQueue::new( + db.pool().clone(), + Arc::new(Config::from_environment()?), + test_metrics.provider(), + ); + + Ok(TestEnv { db, queue }) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_add_duplicate_doesnt_fail_last_priority_wins() -> Result<()> { + let env = test_queue().await?; + let queue = env.queue; + + queue.add_crate(&KRATE, &V1, 0, None).await?; + queue.add_crate(&KRATE, &V1, 9, None).await?; + + let queued_crates = queue.queued_crates().await?; + assert_eq!(queued_crates.len(), 1); + assert_eq!(queued_crates[0].priority, 9); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_add_duplicate_resets_attempts_and_priority() -> Result<()> { + let env = test_queue().await?; + let queue = env.queue; + + let mut conn = env.db.async_conn().await; + sqlx::query!( + " + INSERT INTO queue (name, version, priority, attempt, last_attempt ) + VALUES ($1, $2, 0, 99, NOW())", + FAILED_KRATE as _, + V1 as _ + ) + .execute(&mut *conn) + .await?; + + assert_eq!(queue.pending_count().await?, 0); + + queue.add_crate(&FAILED_KRATE, &V1, 9, None).await?; + + assert_eq!(queue.pending_count().await?, 1); + + let row = sqlx::query!( + "SELECT priority, attempt, last_attempt + FROM queue + WHERE name = $1 AND version = $2", + "failed_crate", + V1 as _ + ) + .fetch_one(&mut *conn) + .await?; + + assert_eq!(row.priority, 9); + assert_eq!(row.attempt, 0); + assert!(row.last_attempt.is_none()); + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_has_build_queued() -> Result<()> { + let env = test_queue().await?; + let queue = env.queue; + + queue.add_crate(&KRATE, &V1, 0, None).await?; + + let mut conn = env.db.async_conn().await; + assert!(queue.has_build_queued(&KRATE, &V1).await.unwrap()); + + sqlx::query!("UPDATE queue SET attempt = 6") + .execute(&mut *conn) + .await + .unwrap(); + + assert!(!queue.has_build_queued(&KRATE, &V1).await.unwrap()); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_version_from_queue() -> Result<()> { + let env = test_queue().await?; + let queue = env.queue; + + assert_eq!(queue.pending_count().await?, 0); + + queue.add_crate(&KRATE, &V1, 0, None).await?; + queue.add_crate(&KRATE, &V2, 0, None).await?; + + assert_eq!(queue.pending_count().await?, 2); + queue.remove_version_from_queue(&KRATE, &V1).await?; + + assert_eq!(queue.pending_count().await?, 1); + + // only v2 remains + if let [k] = queue.queued_crates().await?.as_slice() { + assert_eq!(k.name, KRATE); + assert_eq!(k.version, V2); + } else { + panic!("expected only one queued crate"); + } + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_crate_from_queue() -> Result<()> { + let env = test_queue().await?; + let queue = env.queue; + + assert_eq!(queue.pending_count().await?, 0); + + queue.add_crate(&KRATE, &V1, 0, None).await?; + queue.add_crate(&KRATE, &V2, 0, None).await?; + + assert_eq!(queue.pending_count().await?, 2); + queue.remove_crate_from_queue(&KRATE).await?; + + assert_eq!(queue.pending_count().await?, 0); + + Ok(()) + } +} diff --git a/crates/lib/docs_rs_build_queue/src/types.rs b/crates/lib/docs_rs_build_queue/src/types.rs new file mode 100644 index 000000000..a8a57d012 --- /dev/null +++ b/crates/lib/docs_rs_build_queue/src/types.rs @@ -0,0 +1,27 @@ +use docs_rs_types::{KrateName, Version}; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct QueuedCrate { + pub(crate) id: i32, + pub name: KrateName, + pub version: Version, + pub priority: i32, + pub registry: Option, + pub attempt: i32, +} + +#[derive(Debug)] +pub struct BuildPackageSummary { + pub successful: bool, + pub should_reattempt: bool, +} + +#[cfg(any(test, feature = "testing"))] +impl Default for BuildPackageSummary { + fn default() -> Self { + Self { + successful: true, + should_reattempt: false, + } + } +} diff --git a/crates/lib/docs_rs_database/src/testing/test_env.rs b/crates/lib/docs_rs_database/src/testing/test_env.rs index e0a747d41..9ae12f242 100644 --- a/crates/lib/docs_rs_database/src/testing/test_env.rs +++ b/crates/lib/docs_rs_database/src/testing/test_env.rs @@ -17,6 +17,11 @@ impl TestDatabase { pub async fn new(config: &Config, otel_meter_provider: &AnyMeterProvider) -> Result { // A random schema name is generated and used for the current connection. This allows each // test to create a fresh instance of the database to run within. + // + // TODO: potential performance improvements + // * optionall use "DROP SCHEMA CASCADE" instead of rolling back migrations. But CI should + // still do it? + // * use postgres template database? migrate once, just copy the template for each test? let schema = format!("docs_rs_test_schema_{}", rand::random::()); let pool = Pool::new_with_schema(config, &schema, otel_meter_provider).await?; diff --git a/crates/lib/docs_rs_types/src/lib.rs b/crates/lib/docs_rs_types/src/lib.rs index b340b5c50..1d377c3af 100644 --- a/crates/lib/docs_rs_types/src/lib.rs +++ b/crates/lib/docs_rs_types/src/lib.rs @@ -3,6 +3,8 @@ mod feature; mod ids; mod krate_name; mod req_version; +#[cfg(any(test, feature = "testing"))] +pub mod testing; mod version; pub use build_status::BuildStatus; diff --git a/crates/lib/docs_rs_types/src/testing/mod.rs b/crates/lib/docs_rs_types/src/testing/mod.rs new file mode 100644 index 000000000..57f885c32 --- /dev/null +++ b/crates/lib/docs_rs_types/src/testing/mod.rs @@ -0,0 +1,14 @@ +use crate::{KrateName, Version}; + +// testing krate name constants +pub const KRATE: KrateName = KrateName::from_static("krate"); +pub const FOO: KrateName = KrateName::from_static("foo"); +pub const BAR: KrateName = KrateName::from_static("bar"); +pub const BAZ: KrateName = KrateName::from_static("baz"); +pub const OTHER: KrateName = KrateName::from_static("other"); + +// some versions as constants for tests +pub const V0_1: Version = Version::new(0, 1, 0); +pub const V1: Version = Version::new(1, 0, 0); +pub const V2: Version = Version::new(2, 0, 0); +pub const V3: Version = Version::new(3, 0, 0); diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index c1a35e641..5af5257be 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -3,8 +3,9 @@ use chrono::NaiveDate; use clap::{Parser, Subcommand, ValueEnum}; use docs_rs::{ Config, Context, Index, PackageKind, RustwideBuilder, + build_queue::{last_seen_reference, queue_rebuilds_faulty_rustdoc, set_last_seen_reference}, db::{self, Overrides}, - queue_rebuilds_faulty_rustdoc, start_web_server, + start_web_server, utils::{ daemon::start_background_service_metric_collector, get_crate_pattern_and_priority, list_crate_priorities, queue_builder, remove_crate_priority, set_crate_priority, @@ -15,7 +16,7 @@ use docs_rs_database::{ service_config::{ConfigName, get_config, set_config}, }; use docs_rs_storage::add_path_into_database; -use docs_rs_types::{CrateId, Version}; +use docs_rs_types::{CrateId, KrateName, Version}; use futures_util::StreamExt; use std::{env, fmt::Write, net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::runtime; @@ -130,10 +131,7 @@ impl CommandLine { // metrics from the registry watcher, which should only run once, and all the time. start_background_service_metric_collector(&ctx)?; - ctx.runtime.block_on(docs_rs::utils::watch_registry( - &ctx.async_build_queue, - &ctx.config, - ))?; + ctx.runtime.block_on(docs_rs::utils::watch_registry(&ctx))?; } Self::StartBuildServer => { queue_builder(&ctx, RustwideBuilder::init(&ctx)?)?; @@ -214,36 +212,46 @@ impl QueueSubcommand { crate_name, crate_version, build_priority, - } => ctx.build_queue.add_crate( + } => { + let crate_name: KrateName = crate_name.parse()?; + ctx.build_queue.add_crate( &crate_name, &crate_version, build_priority, ctx.config.registry_url.as_deref(), - )?, + )?}, Self::GetLastSeenReference => { - if let Some(reference) = ctx.build_queue.last_seen_reference()? { - println!("Last seen reference: {reference}"); - } else { - println!("No last seen reference available"); - } + ctx.runtime.block_on(async move { + let mut conn = ctx.pool.get_async().await?; + if let Some(reference) = last_seen_reference(&mut conn).await? { + println!("Last seen reference: {reference}"); + } else { + println!("No last seen reference available"); + } + Ok::<(), anyhow::Error>(()) + })? } Self::SetLastSeenReference { reference, head } => { + ctx.runtime.block_on(async move { let reference = match (reference, head) { (Some(reference), false) => reference, (None, true) => { println!("Fetching changes to set reference to HEAD"); - ctx.runtime.block_on(async move { - let index = Index::from_config(&ctx.config).await?; - index.latest_commit_reference().await - })? + let index = Index::from_config(&ctx.config).await?; + index.latest_commit_reference().await? } (_, _) => unreachable!(), }; - ctx.build_queue.set_last_seen_reference(reference)?; + let mut conn = ctx.pool.get_async().await?; + + set_last_seen_reference(&mut conn, reference).await?; println!("Set last seen reference: {reference}"); + Ok::<(), anyhow::Error>(()) + })? + } Self::DefaultPriority { subcommand } => subcommand.handle_args(ctx)?, diff --git a/src/build_queue.rs b/src/build_queue.rs index 804167865..68167cb82 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -1,816 +1,403 @@ use crate::{ - BuildPackageSummary, Config, Context, Index, RustwideBuilder, + Config, Context, Index, PackageKind, RustwideBuilder, db::{delete_crate, delete_version}, - docbuilder::{BuilderMetrics, PackageKind}, error::Result, utils::{get_crate_priority, report_error}, }; use anyhow::Context as _; use chrono::NaiveDate; use crates_index_diff::{Change, CrateVersion}; +use docs_rs_build_queue::{ + AsyncBuildQueue, BuildPackageSummary, PRIORITY_BROKEN_RUSTDOC, PRIORITY_CONTINUOUS, + PRIORITY_MANUAL_FROM_CRATES_IO, QueuedCrate, +}; use docs_rs_database::{ - AsyncPoolClient, Pool, crate_details::update_latest_version_id, service_config::{ConfigName, get_config, set_config}, }; use docs_rs_fastly::{Cdn, CdnBehaviour as _}; -use docs_rs_opentelemetry::AnyMeterProvider; -use docs_rs_storage::AsyncStorage; use docs_rs_types::{CrateId, KrateName, Version}; use docs_rs_utils::retry; use fn_error_context::context; -use futures_util::{StreamExt, stream::TryStreamExt}; -use opentelemetry::metrics::Counter; -use sqlx::Connection as _; -use std::{collections::HashMap, sync::Arc, time::Instant}; -use tokio::runtime; +use futures_util::StreamExt; +use std::time::Instant; use tracing::{debug, error, info, instrument, warn}; -#[derive(Debug)] -struct BuildQueueMetrics { - queued_builds: Counter, -} - -impl BuildQueueMetrics { - fn new(meter_provider: &AnyMeterProvider) -> Self { - let meter = meter_provider.meter("build_queue"); - const PREFIX: &str = "docsrs.build_queue"; - Self { - queued_builds: meter - .u64_counter(format!("{PREFIX}.queued_builds")) - .with_unit("1") - .build(), - } +pub async fn last_seen_reference( + conn: &mut sqlx::PgConnection, +) -> Result> { + if let Some(value) = get_config::(conn, ConfigName::LastSeenIndexReference).await? { + return Ok(Some(crates_index_diff::gix::ObjectId::from_hex( + value.as_bytes(), + )?)); } + Ok(None) } -pub(crate) const PRIORITY_DEFAULT: i32 = 0; -/// Used for workspaces to avoid blocking the queue (done through the cratesfyi CLI, not used in code) -#[allow(dead_code)] -pub(crate) const PRIORITY_DEPRIORITIZED: i32 = 1; -/// Rebuilds triggered from crates.io, see issue #2442 -pub(crate) const PRIORITY_MANUAL_FROM_CRATES_IO: i32 = 5; -/// Used for rebuilds queued through cratesfyi for crate versions failed due to a broken Rustdoc nightly version. -/// Note: a broken rustdoc version does not necessarily imply a failed build. -pub(crate) const PRIORITY_BROKEN_RUSTDOC: i32 = 10; -/// Used by the synchronize cratesfyi command when queueing builds that are in the crates.io index but not in the database. -pub(crate) const PRIORITY_CONSISTENCY_CHECK: i32 = 15; -/// The static priority for background rebuilds, used when queueing rebuilds, and when rendering them collapsed in the UI. -pub(crate) const PRIORITY_CONTINUOUS: i32 = 20; - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct QueuedCrate { - id: i32, - pub(crate) name: String, - pub(crate) version: Version, - pub(crate) priority: i32, - pub(crate) registry: Option, - pub(crate) attempt: i32, -} - -#[derive(Debug)] -pub struct AsyncBuildQueue { - config: Arc, - storage: Arc, - pub(crate) db: Pool, - queue_metrics: BuildQueueMetrics, - builder_metrics: Arc, - cdn: Option>, - max_attempts: i32, +pub async fn set_last_seen_reference( + conn: &mut sqlx::PgConnection, + oid: crates_index_diff::gix::ObjectId, +) -> Result<()> { + set_config(conn, ConfigName::LastSeenIndexReference, oid.to_string()).await?; + Ok(()) } -impl AsyncBuildQueue { - pub fn new( - db: Pool, - config: Arc, - storage: Arc, - cdn: Option>, - otel_meter_provider: &AnyMeterProvider, - ) -> Self { - AsyncBuildQueue { - max_attempts: config.build_attempts.into(), - config, - db, - storage, - queue_metrics: BuildQueueMetrics::new(otel_meter_provider), - builder_metrics: Arc::new(BuilderMetrics::new(otel_meter_provider)), - cdn, +async fn queue_crate_invalidation(krate: &str, cdn: Option<&Cdn>) { + let krate = match krate + .parse::() + .with_context(|| format!("can't parse crate name '{}'", krate)) + { + Ok(krate) => krate, + Err(err) => { + report_error(&err); + return; } - } + }; - pub fn builder_metrics(&self) -> Arc { - self.builder_metrics.clone() - } + let Some(cdn) = &cdn else { + info!(%krate, "no CDN configured, skippping crate invalidation"); + return; + }; - pub async fn last_seen_reference(&self) -> Result> { - let mut conn = self.db.get_async().await?; - if let Some(value) = - get_config::(&mut conn, ConfigName::LastSeenIndexReference).await? - { - return Ok(Some(crates_index_diff::gix::ObjectId::from_hex( - value.as_bytes(), - )?)); - } - Ok(None) - } - - pub async fn set_last_seen_reference( - &self, - oid: crates_index_diff::gix::ObjectId, - ) -> Result<()> { - let mut conn = self.db.get_async().await?; - set_config( - &mut conn, - ConfigName::LastSeenIndexReference, - oid.to_string(), - ) - .await?; - Ok(()) + if let Err(err) = cdn.queue_crate_invalidation(&krate).await { + report_error(&err); } +} - #[context("error trying to add {name}-{version} to build queue")] - pub async fn add_crate( - &self, - name: &str, - version: &Version, - priority: i32, - registry: Option<&str>, - ) -> Result<()> { - let mut conn = self.db.get_async().await?; - - sqlx::query!( - "INSERT INTO queue (name, version, priority, registry) - VALUES ($1, $2, $3, $4) - ON CONFLICT (name, version) DO UPDATE - SET priority = EXCLUDED.priority, - registry = EXCLUDED.registry, - attempt = 0, - last_attempt = NULL - ;", - name, - version as _, - priority, - registry, - ) - .execute(&mut *conn) - .await?; +/// Updates registry index repository and adds new crates into build queue. +/// +/// Returns the number of crates added +pub async fn get_new_crates(context: &Context, index: &Index) -> Result { + let mut conn = context.pool.get_async().await?; + + let last_seen_reference = last_seen_reference(&mut conn).await?; + let last_seen_reference = if let Some(oid) = last_seen_reference { + oid + } else { + warn!( + "no last-seen reference found in our database. We assume a fresh install and + set the latest reference (HEAD) as last. This means we will then start to queue + builds for new releases only from now on, and not for all existing releases." + ); + index.latest_commit_reference().await? + }; - Ok(()) - } + index.set_last_seen_reference(last_seen_reference).await?; - pub(crate) async fn pending_count(&self) -> Result { - Ok(self - .pending_count_by_priority() - .await? - .values() - .sum::()) - } + let (changes, new_reference) = index.peek_changes_ordered().await?; - pub(crate) async fn prioritized_count(&self) -> Result { - Ok(self - .pending_count_by_priority() - .await? - .iter() - .filter(|&(&priority, _)| priority <= 0) - .map(|(_, count)| count) - .sum::()) - } + debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - pub(crate) async fn pending_count_by_priority(&self) -> Result> { - let mut conn = self.db.get_async().await?; - - Ok(sqlx::query!( - r#" - SELECT - priority, - COUNT(*) as "count!" - FROM queue - WHERE attempt < $1 - GROUP BY priority"#, - self.max_attempts, - ) - .fetch(&mut *conn) - .map_ok(|row| (row.priority, row.count as usize)) - .try_collect() - .await?) - } + let crates_added = process_changes(context, &changes, index.repository_url()).await; - pub(crate) async fn failed_count(&self) -> Result { - let mut conn = self.db.get_async().await?; + // set the reference in the database + // so this survives recreating the registry watcher + // server. + set_last_seen_reference(&mut conn, new_reference).await?; - Ok(sqlx::query_scalar!( - r#"SELECT COUNT(*) as "count!" FROM queue WHERE attempt >= $1;"#, - self.max_attempts, - ) - .fetch_one(&mut *conn) - .await? as usize) - } + Ok(crates_added) +} - pub(crate) async fn queued_crates(&self) -> Result> { - let mut conn = self.db.get_async().await?; - - Ok(sqlx::query_as!( - QueuedCrate, - r#"SELECT - id, - name, - version as "version: Version", - priority, - registry, - attempt - FROM queue - WHERE attempt < $1 - ORDER BY priority ASC, attempt ASC, id ASC"#, - self.max_attempts - ) - .fetch_all(&mut *conn) - .await?) +async fn process_changes( + context: &Context, + changes: &Vec, + registry: Option<&str>, +) -> usize { + let mut crates_added = 0; + + for change in changes { + match process_change(context, change, registry).await { + Ok(added) => { + if added { + crates_added += 1; + } + } + Err(err) => report_error(&err), + } } + crates_added +} - pub(crate) async fn has_build_queued(&self, name: &str, version: &Version) -> Result { - let mut conn = self.db.get_async().await?; - Ok(sqlx::query_scalar!( - "SELECT id - FROM queue - WHERE - attempt < $1 AND - name = $2 AND - version = $3 - ", - self.max_attempts, - name, - version as _, - ) - .fetch_optional(&mut *conn) - .await? - .is_some()) - } +/// Process a crate change, returning whether the change was a crate addition or not. +async fn process_change( + context: &Context, + change: &Change, + registry: Option<&str>, +) -> Result { + match change { + Change::Added(release) => process_version_added(context, release, registry).await?, + Change::AddedAndYanked(release) => { + process_version_added(context, release, registry).await?; + process_version_yank_status(context, release).await?; + } + Change::Unyanked(release) | Change::Yanked(release) => { + process_version_yank_status(context, release).await? + } + Change::CrateDeleted { name, .. } => process_crate_deleted(context, name.as_str()).await?, + Change::VersionDeleted(release) => process_version_deleted(context, release).await?, + }; + Ok(change.added().is_some()) +} - async fn remove_crate_from_queue(&self, name: &str) -> Result<()> { - let mut conn = self.db.get_async().await?; - sqlx::query!( - "DELETE - FROM queue - WHERE name = $1 - ", - name +/// Processes crate changes, whether they got yanked or unyanked. +async fn process_version_yank_status(context: &Context, release: &CrateVersion) -> Result<()> { + // FIXME: delay yanks of crates that have not yet finished building + // https://github.com/rust-lang/docs.rs/issues/1934 + if let Ok(release_version) = Version::parse(&release.version) { + set_yanked_inner( + context, + release.name.as_str(), + &release_version, + release.yanked, ) - .execute(&mut *conn) .await?; - - Ok(()) } - async fn remove_version_from_queue(&self, name: &str, version: &Version) -> Result<()> { - let mut conn = self.db.get_async().await?; - sqlx::query!( - "DELETE - FROM queue - WHERE - name = $1 AND - version = $2 - ", - name, - version as _, - ) - .execute(&mut *conn) - .await?; + queue_crate_invalidation(&release.name, context.cdn.as_deref()).await; + Ok(()) +} - Ok(()) - } +async fn process_version_added( + context: &Context, + release: &CrateVersion, + registry: Option<&str>, +) -> Result<()> { + let mut conn = context.pool.get_async().await?; + let priority = get_crate_priority(&mut conn, &release.name).await?; + let name: KrateName = release.name.parse()?; + let version = &release + .version + .parse() + .context("couldn't parse release version as semver")?; + context + .async_build_queue + .add_crate(&name, version, priority, registry) + .await + .with_context(|| { + format!( + "failed adding {}-{} into build queue", + release.name, release.version + ) + })?; + debug!( + name=%release.name, + version=%release.version, + "added into build queue", + ); + context + .async_build_queue + .deprioritize_other_releases(&name, version, PRIORITY_MANUAL_FROM_CRATES_IO) + .await + .unwrap_or_else(|err| report_error(&err)); + Ok(()) +} - /// Decreases the priority of all releases currently present in the queue not matching the version passed to *at least* new_priority. - pub(crate) async fn deprioritize_other_releases( - &self, - name: &str, - latest_version: &Version, - new_priority: i32, - ) -> Result<()> { - let mut conn = self.db.get_async().await?; - sqlx::query!( - "UPDATE queue - SET priority = GREATEST(priority, $1) - WHERE - name = $2 - AND version != $3 - AND attempt < $4 - ", - new_priority, - name, - latest_version as _, - self.max_attempts, +async fn process_version_deleted(context: &Context, release: &CrateVersion) -> Result<()> { + let mut conn = context.pool.get_async().await?; + + let name: KrateName = release.name.parse()?; + let version: Version = release + .version + .parse() + .context("couldn't parse release version as semver")?; + + delete_version( + &mut conn, + &context.async_storage, + &context.config, + &release.name, + &version, + ) + .await + .with_context(|| { + format!( + "failed to delete version {}-{}", + release.name, release.version ) - .execute(&mut *conn) + })?; + info!( + name=%release.name, + version=%release.version, + "release was deleted from the index and the database", + ); + queue_crate_invalidation(&name, context.cdn.as_deref()).await; + context + .async_build_queue + .remove_version_from_queue(&name, &version) .await?; - - Ok(()) - } + Ok(()) } -/// Locking functions. -impl AsyncBuildQueue { - /// Checks for the lock and returns whether it currently exists. - pub async fn is_locked(&self) -> Result { - let mut conn = self.db.get_async().await?; - - Ok(get_config::(&mut conn, ConfigName::QueueLocked) - .await? - .unwrap_or(false)) - } - - /// lock the queue. Daemon will check this lock and stop operating if it exists. - pub async fn lock(&self) -> Result<()> { - let mut conn = self.db.get_async().await?; - set_config(&mut conn, ConfigName::QueueLocked, true).await - } - - /// unlock the queue. - pub async fn unlock(&self) -> Result<()> { - let mut conn = self.db.get_async().await?; - set_config(&mut conn, ConfigName::QueueLocked, false).await - } +async fn process_crate_deleted(context: &Context, krate: &str) -> Result<()> { + let mut conn = context.pool.get_async().await?; + + delete_crate(&mut conn, &context.async_storage, &context.config, krate) + .await + .with_context(|| format!("failed to delete crate {krate}"))?; + info!( + name=%krate, + "crate deleted from the index and the database", + ); + queue_crate_invalidation(krate, context.cdn.as_deref()).await; + + let name: KrateName = krate.parse()?; + context + .async_build_queue + .remove_crate_from_queue(&name) + .await } -/// Index methods. -impl AsyncBuildQueue { - async fn queue_crate_invalidation(&self, krate: &str) { - let krate = match krate - .parse::() - .with_context(|| format!("can't parse crate name '{}'", krate)) - { - Ok(krate) => krate, - Err(err) => { - report_error(&err); - return; - } - }; - - let Some(cdn) = &self.cdn else { - info!(%krate, "no CDN configured, skippping crate invalidation"); - return; - }; - - if let Err(err) = cdn.queue_crate_invalidation(&krate).await { - report_error(&err); - } - } - - /// Updates registry index repository and adds new crates into build queue. - /// - /// Returns the number of crates added - pub async fn get_new_crates(&self, index: &Index) -> Result { - let last_seen_reference = self.last_seen_reference().await?; - let last_seen_reference = if let Some(oid) = last_seen_reference { - oid - } else { - warn!( - "no last-seen reference found in our database. We assume a fresh install and - set the latest reference (HEAD) as last. This means we will then start to queue - builds for new releases only from now on, and not for all existing releases." - ); - index.latest_commit_reference().await? - }; - - index.set_last_seen_reference(last_seen_reference).await?; - - let (changes, new_reference) = index.peek_changes_ordered().await?; - - let mut conn = self.db.get_async().await?; - - debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - - let crates_added = self - .process_changes(&mut conn, &changes, index.repository_url()) - .await; - - // set the reference in the database - // so this survives recreating the registry watcher - // server. - self.set_last_seen_reference(new_reference).await?; - - Ok(crates_added) - } - - async fn process_changes( - &self, - conn: &mut AsyncPoolClient, - changes: &Vec, - registry: Option<&str>, - ) -> usize { - let mut crates_added = 0; - - for change in changes { - match self.process_change(conn, change, registry).await { - Ok(added) => { - if added { - crates_added += 1; - } - } - Err(err) => report_error(&err), - } - } - crates_added - } - - /// Process a crate change, returning whether the change was a crate addition or not. - async fn process_change( - &self, - conn: &mut AsyncPoolClient, - change: &Change, - registry: Option<&str>, - ) -> Result { - match change { - Change::Added(release) => self.process_version_added(conn, release, registry).await?, - Change::AddedAndYanked(release) => { - self.process_version_added(conn, release, registry).await?; - self.process_version_yank_status(conn, release).await?; - } - Change::Unyanked(release) | Change::Yanked(release) => { - self.process_version_yank_status(conn, release).await? - } - Change::CrateDeleted { name, .. } => { - self.process_crate_deleted(conn, name.as_str()).await? - } - Change::VersionDeleted(release) => self.process_version_deleted(conn, release).await?, - }; - Ok(change.added().is_some()) - } - - /// Processes crate changes, whether they got yanked or unyanked. - async fn process_version_yank_status( - &self, - conn: &mut AsyncPoolClient, - release: &CrateVersion, - ) -> Result<()> { - // FIXME: delay yanks of crates that have not yet finished building - // https://github.com/rust-lang/docs.rs/issues/1934 - if let Ok(release_version) = Version::parse(&release.version) { - self.set_yanked_inner( - conn, - release.name.as_str(), - &release_version, - release.yanked, - ) - .await?; - } - - self.queue_crate_invalidation(&release.name).await; - Ok(()) - } +pub async fn set_yanked( + context: &Context, + name: &str, + version: &Version, + yanked: bool, +) -> Result<()> { + set_yanked_inner(context, name, version, yanked).await +} - async fn process_version_added( - &self, - conn: &mut AsyncPoolClient, - release: &CrateVersion, - registry: Option<&str>, - ) -> Result<()> { - let priority = get_crate_priority(conn, &release.name).await?; - let version = &release - .version - .parse() - .context("couldn't parse release version as semver")?; - self.add_crate(&release.name, version, priority, registry) - .await - .with_context(|| { - format!( - "failed adding {}-{} into build queue", - release.name, release.version - ) - })?; +#[context("error trying to set {name}-{version} to yanked: {yanked}")] +async fn set_yanked_inner( + context: &Context, + name: &str, + version: &Version, + yanked: bool, +) -> Result<()> { + let mut conn = context.pool.get_async().await?; + + let activity = if yanked { "yanked" } else { "unyanked" }; + + if let Some(crate_id) = sqlx::query_scalar!( + r#"UPDATE releases + SET yanked = $3 + FROM crates + WHERE crates.id = releases.crate_id + AND name = $1 + AND version = $2 + RETURNING crates.id as "id: CrateId" + "#, + name, + version as _, + yanked, + ) + .fetch_optional(&mut *conn) + .await? + { debug!( - name=%release.name, - version=%release.version, - "added into build queue", + %name, + %version, + %activity, + "updating latest version id" ); - self.queue_metrics.queued_builds.add(1, &[]); - self.deprioritize_other_releases(&release.name, version, PRIORITY_MANUAL_FROM_CRATES_IO) + update_latest_version_id(&mut conn, crate_id).await?; + } else { + let name: KrateName = name.parse()?; + match context + .async_build_queue + .has_build_queued(&name, version) .await - .unwrap_or_else(|err| report_error(&err)); - Ok(()) - } - - async fn process_version_deleted( - &self, - conn: &mut AsyncPoolClient, - release: &CrateVersion, - ) -> Result<()> { - let version: Version = release - .version - .parse() - .context("couldn't parse release version as semver")?; - - delete_version(conn, &self.storage, &self.config, &release.name, &version) - .await - .with_context(|| { - format!( - "failed to delete version {}-{}", - release.name, release.version - ) - })?; - info!( - name=%release.name, - version=%release.version, - "release was deleted from the index and the database", - ); - self.queue_crate_invalidation(&release.name).await; - self.remove_version_from_queue(&release.name, &version) - .await?; - Ok(()) - } - - async fn process_crate_deleted(&self, conn: &mut AsyncPoolClient, krate: &str) -> Result<()> { - delete_crate(conn, &self.storage, &self.config, krate) - .await - .with_context(|| format!("failed to delete crate {krate}"))?; - info!( - name=%krate, - "crate deleted from the index and the database", - ); - self.queue_crate_invalidation(krate).await; - self.remove_crate_from_queue(krate).await - } - - pub async fn set_yanked(&self, name: &str, version: &Version, yanked: bool) -> Result<()> { - let mut conn = self.db.get_async().await?; - self.set_yanked_inner(&mut conn, name, version, yanked) - .await - } - - #[context("error trying to set {name}-{version} to yanked: {yanked}")] - async fn set_yanked_inner( - &self, - conn: &mut sqlx::PgConnection, - name: &str, - version: &Version, - yanked: bool, - ) -> Result<()> { - let activity = if yanked { "yanked" } else { "unyanked" }; - - if let Some(crate_id) = sqlx::query_scalar!( - r#"UPDATE releases - SET yanked = $3 - FROM crates - WHERE crates.id = releases.crate_id - AND name = $1 - AND version = $2 - RETURNING crates.id as "id: CrateId" - "#, - name, - version as _, - yanked, - ) - .fetch_optional(&mut *conn) - .await? + .context("error trying to fetch build queue") { - debug!( - %name, - %version, - %activity, - "updating latest version id" - ); - update_latest_version_id(&mut *conn, crate_id).await?; - } else { - match self - .has_build_queued(name, version) - .await - .context("error trying to fetch build queue") - { - Ok(false) => { - error!( - %name, - %version, - "tried to yank or unyank non-existing release", - ); - } - Ok(true) => { - // the rustwide builder will fetch the current yank state from - // crates.io, so and missed update here will be fixed after the - // build is finished. - } - Err(err) => { - report_error(&err); - } + Ok(false) => { + error!( + %name, + %version, + "tried to yank or unyank non-existing release", + ); + } + Ok(true) => { + // the rustwide builder will fetch the current yank state from + // crates.io, so and missed update here will be fixed after the + // build is finished. + } + Err(err) => { + report_error(&err); } } - - Ok(()) - } -} - -#[derive(Debug)] -pub struct BuildQueue { - runtime: runtime::Handle, - inner: Arc, -} - -/// sync versions of async methods -impl BuildQueue { - pub fn add_crate( - &self, - name: &str, - version: &Version, - priority: i32, - registry: Option<&str>, - ) -> Result<()> { - self.runtime - .block_on(self.inner.add_crate(name, version, priority, registry)) } - pub fn set_yanked(&self, name: &str, version: &Version, yanked: bool) -> Result<()> { - self.runtime - .block_on(self.inner.set_yanked(name, version, yanked)) - } - pub fn is_locked(&self) -> Result { - self.runtime.block_on(self.inner.is_locked()) - } - pub fn lock(&self) -> Result<()> { - self.runtime.block_on(self.inner.lock()) - } - pub fn unlock(&self) -> Result<()> { - self.runtime.block_on(self.inner.unlock()) - } - pub fn last_seen_reference(&self) -> Result> { - self.runtime.block_on(self.inner.last_seen_reference()) - } - pub fn set_last_seen_reference(&self, oid: crates_index_diff::gix::ObjectId) -> Result<()> { - self.runtime - .block_on(self.inner.set_last_seen_reference(oid)) - } - #[cfg(test)] - pub(crate) fn pending_count(&self) -> Result { - self.runtime.block_on(self.inner.pending_count()) - } - #[cfg(test)] - pub(crate) fn prioritized_count(&self) -> Result { - self.runtime.block_on(self.inner.prioritized_count()) - } - #[cfg(test)] - pub(crate) fn pending_count_by_priority(&self) -> Result> { - self.runtime - .block_on(self.inner.pending_count_by_priority()) - } - #[cfg(test)] - pub(crate) fn failed_count(&self) -> Result { - self.runtime.block_on(self.inner.failed_count()) - } - #[cfg(test)] - pub(crate) fn queued_crates(&self) -> Result> { - self.runtime.block_on(self.inner.queued_crates()) - } + Ok(()) } -impl BuildQueue { - pub fn new(runtime: runtime::Handle, inner: Arc) -> Self { - Self { runtime, inner } - } - - fn process_next_crate( - &self, - f: impl FnOnce(&QueuedCrate) -> Result, - ) -> Result<()> { - let mut conn = self.runtime.block_on(self.inner.db.get_async())?; - let mut transaction = self.runtime.block_on(conn.begin())?; - - // fetch the next available crate from the queue table. - // We are using `SELECT FOR UPDATE` inside a transaction so - // the QueuedCrate is locked until we are finished with it. - // `SKIP LOCKED` here will enable another build-server to just - // skip over taken (=locked) rows and start building the first - // available one. - let to_process = match self.runtime.block_on( - sqlx::query_as!( - QueuedCrate, - r#"SELECT - id, - name, - version as "version: Version", - priority, - registry, - attempt - FROM queue - WHERE - attempt < $1 AND - (last_attempt IS NULL OR last_attempt < NOW() - make_interval(secs => $2)) - ORDER BY priority ASC, attempt ASC, id ASC - LIMIT 1 - FOR UPDATE SKIP LOCKED"#, - self.inner.max_attempts, - self.inner.config.delay_between_build_attempts.as_secs_f64(), - ) - .fetch_optional(&mut *transaction), - )? { - Some(krate) => krate, - None => return Ok(()), - }; +/// wrapper around BuildQueue::process_next_crate to handle metrics and cdn invalidation +fn process_next_crate( + context: &Context, + f: impl FnOnce(&QueuedCrate) -> Result, +) -> Result<()> { + let queue = context.build_queue.clone(); + let builder_metrics = context.builder_metrics.clone(); + let cdn = context.cdn.clone(); + let runtime = context.runtime.clone(); + let config = context.config.build_queue.clone(); + let next_attempt = queue.process_next_crate(|to_process| { let res = { let instant = Instant::now(); - let res = f(&to_process); + let res = f(to_process); let elapsed = instant.elapsed().as_secs_f64(); - self.inner.builder_metrics.build_time.record(elapsed, &[]); + builder_metrics.build_time.record(elapsed, &[]); res }; - self.inner.builder_metrics.total_builds.add(1, &[]); - - self.runtime - .block_on(self.inner.queue_crate_invalidation(&to_process.name)); - - let mut increase_attempt_count = || -> Result<()> { - let attempt: i32 = self.runtime.block_on( - sqlx::query_scalar!( - "UPDATE queue - SET - attempt = attempt + 1, - last_attempt = NOW() - WHERE id = $1 - RETURNING attempt;", - to_process.id, - ) - .fetch_one(&mut *transaction), - )?; - - if attempt >= self.inner.max_attempts { - self.inner.builder_metrics.failed_builds.add(1, &[]); - } - Ok(()) - }; + builder_metrics.total_builds.add(1, &[]); - match res { - Ok(BuildPackageSummary { - should_reattempt: false, - successful: _, - }) => { - self.runtime.block_on( - sqlx::query!("DELETE FROM queue WHERE id = $1;", to_process.id) - .execute(&mut *transaction), - )?; - } - Ok(BuildPackageSummary { - should_reattempt: true, - successful: _, - }) => { - increase_attempt_count()?; - } - Err(e) => { - increase_attempt_count()?; - report_error(&e.context(format!( - "Failed to build package {}-{} from queue", - to_process.name, to_process.version - ))) - } - } + runtime.block_on(queue_crate_invalidation(&to_process.name, cdn.as_deref())); - self.runtime.block_on(transaction.commit())?; - Ok(()) + res + })?; + + if let Some(next_attempt) = next_attempt + && next_attempt >= config.build_attempts as i32 + { + builder_metrics.failed_builds.add(1, &[]); } - /// Builds the top package from the queue. Returns whether there was a package in the queue. - /// - /// Note that this will return `Ok(true)` even if the package failed to build. - pub(crate) fn build_next_queue_package( - &self, - context: &Context, - builder: &mut RustwideBuilder, - ) -> Result { - let mut processed = false; - - self.process_next_crate(|krate| { - processed = true; - - let kind = krate - .registry - .as_ref() - .map(|r| PackageKind::Registry(r.as_str())) - .unwrap_or(PackageKind::CratesIo); - - if let Err(err) = retry( - || { - builder - .reinitialize_workspace_if_interval_passed(context) - .context("Reinitialize workspace failed, locking queue") - }, - 3, - ) { - report_error(&err); - self.lock()?; - return Err(err); - } + Ok(()) +} - if let Err(err) = builder - .update_toolchain_and_add_essential_files() - .context("Updating toolchain failed, locking queue") - { - report_error(&err); - self.lock()?; - return Err(err); - } +pub(crate) fn build_next_queue_package( + context: &Context, + builder: &mut RustwideBuilder, +) -> Result { + let mut processed = false; + + let queue = context.build_queue.clone(); + + process_next_crate(context, |krate| { + processed = true; + + let kind = krate + .registry + .as_ref() + .map(|r| PackageKind::Registry(r.as_str())) + .unwrap_or(PackageKind::CratesIo); + + if let Err(err) = retry( + || { + builder + .reinitialize_workspace_if_interval_passed(context) + .context("Reinitialize workspace failed, locking queue") + }, + 3, + ) { + report_error(&err); + queue.lock()?; + return Err(err); + } - builder.build_package(&krate.name, &krate.version, kind, krate.attempt == 0) - })?; + if let Err(err) = builder + .update_toolchain_and_add_essential_files() + .context("Updating toolchain failed, locking queue") + { + report_error(&err); + queue.lock()?; + return Err(err); + } - Ok(processed) - } + builder.build_package(&krate.name, &krate.version, kind, krate.attempt == 0) + })?; + + Ok(processed) } /// Queue rebuilds as configured. @@ -829,9 +416,9 @@ impl BuildQueue { pub async fn queue_rebuilds( conn: &mut sqlx::PgConnection, config: &Config, - build_queue: &AsyncBuildQueue, + queue: &AsyncBuildQueue, ) -> Result<()> { - let already_queued_rebuilds: usize = build_queue + let already_queued_rebuilds: usize = queue .pending_count_by_priority() .await? .iter() @@ -851,7 +438,7 @@ pub async fn queue_rebuilds( let mut results = sqlx::query!( r#"SELECT i.* FROM ( SELECT - c.name, + c.name as "name: KrateName", r.version as "version: Version", ( SELECT MAX(COALESCE(b.build_finished, b.build_started)) @@ -873,12 +460,9 @@ pub async fn queue_rebuilds( while let Some(row) = results.next().await { let row = row?; - if !build_queue - .has_build_queued(&row.name, &row.version) - .await? - { + if !queue.has_build_queued(&row.name, &row.version).await? { info!("queueing rebuild for {} {}...", &row.name, &row.version); - build_queue + queue .add_crate(&row.name, &row.version, PRIORITY_CONTINUOUS, None) .await?; } @@ -896,7 +480,7 @@ pub async fn queue_rebuilds( #[instrument(skip_all)] pub async fn queue_rebuilds_faulty_rustdoc( conn: &mut sqlx::PgConnection, - build_queue: &AsyncBuildQueue, + queue: &AsyncBuildQueue, start_nightly_date: &NaiveDate, end_nightly_date: &Option, ) -> Result { @@ -904,21 +488,20 @@ pub async fn queue_rebuilds_faulty_rustdoc( end_nightly_date.unwrap_or_else(|| start_nightly_date.succ_opt().unwrap()); let mut results = sqlx::query!( r#" -SELECT c.name, - r.version AS "version: Version" -FROM crates AS c - JOIN releases AS r - ON c.id = r.crate_id - JOIN release_build_status AS rbs + SELECT + c.name AS "name: KrateName", + r.version AS "version: Version" + FROM crates AS c + JOIN releases AS r + ON c.id = r.crate_id + JOIN release_build_status AS rbs ON rbs.rid = r.id - JOIN builds AS b - ON b.rid = r.id - AND b.build_finished = rbs.last_build_time - AND b.rustc_nightly_date >= $1 - AND b.rustc_nightly_date < $2 - - -"#, + JOIN builds AS b + ON b.rid = r.id + AND b.build_finished = rbs.last_build_time + AND b.rustc_nightly_date >= $1 + AND b.rustc_nightly_date < $2 + "#, start_nightly_date, end_nightly_date ) @@ -928,10 +511,7 @@ FROM crates AS c while let Some(row) = results.next().await { let row = row?; - if !build_queue - .has_build_queued(&row.name, &row.version) - .await? - { + if !queue.has_build_queued(&row.name, &row.version).await? { results_count += 1; info!( name=%row.name, @@ -939,7 +519,7 @@ FROM crates AS c priority=PRIORITY_BROKEN_RUSTDOC, "queueing rebuild" ); - build_queue + queue .add_crate(&row.name, &row.version, PRIORITY_BROKEN_RUSTDOC, None) .await?; } @@ -951,26 +531,26 @@ FROM crates AS c #[cfg(test)] mod tests { use super::*; - use crate::test::{FakeBuild, KRATE, TestEnvironment, V1, V2}; - use chrono::Utc; - use docs_rs_types::BuildStatus; + use crate::test::{FakeBuild, TestEnvironment, V1, V2}; + use docs_rs_build_queue::{BuildPackageSummary, PRIORITY_DEFAULT}; + use docs_rs_headers::SurrogateKey; + use docs_rs_types::{ + BuildStatus, + testing::{BAR, FOO}, + }; use pretty_assertions::assert_eq; - use std::time::Duration; #[tokio::test(flavor = "multi_thread")] async fn test_process_version_added() -> Result<()> { let env = TestEnvironment::new().await?; let build_queue = env.async_build_queue(); - let mut conn = env.async_db().async_conn().await; let krate = CrateVersion { name: "krate".parse()?, version: V1.to_string().parse()?, ..Default::default() }; - build_queue - .process_version_added(&mut conn, &krate, None) - .await?; + process_version_added(&env.context, &krate, None).await?; let queue = build_queue.queued_crates().await?; assert_eq!(queue.len(), 1); assert_eq!(queue[0].priority, PRIORITY_DEFAULT); @@ -980,9 +560,7 @@ mod tests { version: V2.to_string().parse()?, ..Default::default() }; - build_queue - .process_version_added(&mut conn, &krate, None) - .await?; + process_version_added(&env.context, &krate, None).await?; let queue = build_queue.queued_crates().await?; assert_eq!(queue.len(), 2); // The other queued version should be deprioritized @@ -997,7 +575,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_process_version_yank_status() -> Result<()> { let env = TestEnvironment::new().await?; - let build_queue = env.async_build_queue(); let mut conn = env.async_db().async_conn().await; // Given a release that is yanked @@ -1015,9 +592,7 @@ mod tests { yanked: true, ..Default::default() }; - build_queue - .process_version_yank_status(&mut conn, &krate) - .await?; + process_version_yank_status(&env.context, &krate).await?; // And verify it's actually marked as yanked let row = sqlx::query!( @@ -1037,9 +612,7 @@ mod tests { yanked: false, ..Default::default() }; - build_queue - .process_version_yank_status(&mut conn, &krate) - .await?; + process_version_yank_status(&env.context, &krate).await?; let row = sqlx::query!( "SELECT yanked @@ -1057,7 +630,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_process_crate_deleted() -> Result<()> { let env = TestEnvironment::new().await?; - let build_queue = env.async_build_queue(); let mut conn = env.async_db().async_conn().await; env.fake_release() @@ -1066,9 +638,7 @@ mod tests { .version(V1) .create() .await?; - build_queue - .process_crate_deleted(&mut conn, "krate") - .await?; + process_crate_deleted(&env.context, "krate").await?; let row = sqlx::query!( "SELECT id @@ -1085,7 +655,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_process_version_deleted() -> Result<()> { let env = TestEnvironment::new().await?; - let build_queue = env.async_build_queue(); let mut conn = env.async_db().async_conn().await; let rid_1 = env @@ -1107,9 +676,7 @@ mod tests { version: V2.to_string().parse()?, ..Default::default() }; - build_queue - .process_version_deleted(&mut conn, &krate) - .await?; + process_version_deleted(&env.context, &krate).await?; let row = sqlx::query!( "SELECT id @@ -1126,8 +693,6 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_process_changes() -> Result<()> { let env = TestEnvironment::new().await?; - let build_queue = env.async_build_queue(); - let mut conn = env.async_db().async_conn().await; env.fake_release() .await @@ -1156,18 +721,17 @@ mod tests { version: V2.to_string().parse()?, ..Default::default() }; - let added = build_queue - .process_changes( - &mut conn, - &vec![ - Change::Added(krate1), // Should be added correctly - Change::Added(krate2), // Should be added correctly - Change::VersionDeleted(krate_already_present), // Should be deleted correctly, without affecting the returned counter - Change::VersionDeleted(non_existing_krate), // Should error out, but the error should be handled gracefully - ], - None, - ) - .await; + let added = process_changes( + &env.context, + &vec![ + Change::Added(krate1), // Should be added correctly + Change::Added(krate2), // Should be added correctly + Change::VersionDeleted(krate_already_present), // Should be deleted correctly, without affecting the returned counter + Change::VersionDeleted(non_existing_krate), // Should error out, but the error should be handled gracefully + ], + None, + ) + .await; assert_eq!(added, 2); Ok(()) @@ -1399,10 +963,10 @@ mod tests { let build_queue = env.async_build_queue(); build_queue - .add_crate("foo1", &V1, PRIORITY_CONTINUOUS, None) + .add_crate(&FOO, &V1, PRIORITY_CONTINUOUS, None) .await?; build_queue - .add_crate("foo2", &V1, PRIORITY_CONTINUOUS, None) + .add_crate(&BAR, &V1, PRIORITY_CONTINUOUS, None) .await?; let mut conn = env.async_db().async_conn().await; @@ -1441,10 +1005,10 @@ mod tests { let build_queue = env.async_build_queue(); build_queue - .add_crate("foo1", &V1, PRIORITY_CONTINUOUS, None) + .add_crate(&"foo1".parse().unwrap(), &V1, PRIORITY_CONTINUOUS, None) .await?; build_queue - .add_crate("foo2", &V1, PRIORITY_CONTINUOUS, None) + .add_crate(&"foo2".parse().unwrap(), &V1, PRIORITY_CONTINUOUS, None) .await?; env.fake_release() @@ -1468,254 +1032,26 @@ mod tests { Ok(()) } - #[tokio::test(flavor = "multi_thread")] - async fn test_add_duplicate_doesnt_fail_last_priority_wins() -> Result<()> { - let env = TestEnvironment::new().await?; - - let queue = env.async_build_queue(); - - queue.add_crate("some_crate", &V1, 0, None).await?; - queue.add_crate("some_crate", &V1, 9, None).await?; - - let queued_crates = queue.queued_crates().await?; - assert_eq!(queued_crates.len(), 1); - assert_eq!(queued_crates[0].priority, 9); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_add_duplicate_resets_attempts_and_priority() -> Result<()> { - let env = - TestEnvironment::with_config(TestEnvironment::base_config().build_attempts(5).build()?) - .await?; - - let queue = env.async_build_queue(); - - let mut conn = env.async_db().async_conn().await; - sqlx::query!( - " - INSERT INTO queue (name, version, priority, attempt, last_attempt ) - VALUES ('failed_crate', $1, 0, 99, NOW())", - V1 as _ - ) - .execute(&mut *conn) - .await?; - - assert_eq!(queue.pending_count().await?, 0); - - queue.add_crate("failed_crate", &V1, 9, None).await?; - - assert_eq!(queue.pending_count().await?, 1); - - let row = sqlx::query!( - "SELECT priority, attempt, last_attempt - FROM queue - WHERE name = $1 AND version = $2", - "failed_crate", - V1 as _ - ) - .fetch_one(&mut *conn) - .await?; - - assert_eq!(row.priority, 9); - assert_eq!(row.attempt, 0); - assert!(row.last_attempt.is_none()); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_has_build_queued() -> Result<()> { - let env = TestEnvironment::new().await?; - - let queue = env.async_build_queue(); - - queue.add_crate("dummy", &V1, 0, None).await?; - - let mut conn = env.async_db().async_conn().await; - assert!(queue.has_build_queued("dummy", &V1).await.unwrap()); - - sqlx::query!("UPDATE queue SET attempt = 6") - .execute(&mut *conn) - .await - .unwrap(); - - assert!(!queue.has_build_queued("dummy", &V1).await.unwrap()); - - Ok(()) - } - - #[test] - fn test_wait_between_build_attempts() -> Result<()> { - let env = TestEnvironment::with_config_and_runtime( - TestEnvironment::base_config() - .build_attempts(99) - .delay_between_build_attempts(Duration::from_secs(1)) - .build()?, - )?; - - let runtime = env.runtime(); - - let queue = env.build_queue(); - - queue.add_crate("krate", &V1, 0, None)?; - - // first let it fail - queue.process_next_crate(|krate| { - assert_eq!(krate.name, "krate"); - anyhow::bail!("simulate a failure"); - })?; - - queue.process_next_crate(|_| { - // this can't happen since we didn't wait between attempts - unreachable!(); - })?; - - runtime.block_on(async { - // fake the build-attempt timestamp so it's older - let mut conn = env.async_db().async_conn().await; - sqlx::query!( - "UPDATE queue SET last_attempt = $1", - Utc::now() - chrono::Duration::try_seconds(60).unwrap() - ) - .execute(&mut *conn) - .await - })?; - - let mut handled = false; - // now we can process it again - queue.process_next_crate(|krate| { - assert_eq!(krate.name, "krate"); - handled = true; - Ok(BuildPackageSummary::default()) - })?; - - assert!(handled); - - Ok(()) - } - - #[test] - fn test_add_and_process_crates() -> Result<()> { - const MAX_ATTEMPTS: u16 = 3; - let env = TestEnvironment::with_config_and_runtime( - TestEnvironment::base_config() - .build_attempts(MAX_ATTEMPTS) - .delay_between_build_attempts(Duration::ZERO) - .build()?, - )?; - - let queue = env.build_queue(); - - let test_crates = [ - ("low-priority", 1000), - ("high-priority-foo", -1000), - ("medium-priority", -10), - ("high-priority-bar", -1000), - ("standard-priority", 0), - ("high-priority-baz", -1000), - ]; - for krate in &test_crates { - queue.add_crate(krate.0, &V1, krate.1, None)?; - } - - let assert_next = |name| -> Result<()> { - queue.process_next_crate(|krate| { - assert_eq!(name, krate.name); - Ok(BuildPackageSummary::default()) - })?; - Ok(()) - }; - let assert_next_and_fail = |name| -> Result<()> { - queue.process_next_crate(|krate| { - assert_eq!(name, krate.name); - anyhow::bail!("simulate a failure"); - })?; - Ok(()) - }; - - // The first processed item is the one with the highest priority added first. - assert_next("high-priority-foo")?; - - // Simulate a failure in high-priority-bar. - assert_next_and_fail("high-priority-bar")?; - - // Continue with the next high priority crate. - assert_next("high-priority-baz")?; - - // After all the crates with the max priority are processed, before starting to process - // crates with a lower priority the failed crates with the max priority will be tried - // again. - assert_next("high-priority-bar")?; - - // Continue processing according to the priority. - assert_next("medium-priority")?; - assert_next("standard-priority")?; - - // Simulate the crate failing many times. - for _ in 0..MAX_ATTEMPTS { - assert_next_and_fail("low-priority")?; - } - - // Since low-priority failed many times it will be removed from the queue. Because of - // that the queue should now be empty. - let mut called = false; - queue.process_next_crate(|_| { - called = true; - Ok(BuildPackageSummary::default()) - })?; - assert!(!called, "there were still items in the queue"); - - let collected_metrics = env.collected_metrics(); - - assert_eq!( - collected_metrics - .get_metric("builder", "docsrs.builder.total_builds")? - .get_u64_counter() - .value(), - 9 - ); - - assert_eq!( - collected_metrics - .get_metric("builder", "docsrs.builder.failed_builds")? - .get_u64_counter() - .value(), - 1 - ); - - assert_eq!( - dbg!( - collected_metrics - .get_metric("builder", "docsrs.builder.build_time")? - .get_f64_histogram() - .count() - ), - 9 - ); - - Ok(()) - } - #[test] fn test_invalidate_cdn_after_error() -> Result<()> { let env = TestEnvironment::new_with_runtime()?; let queue = env.build_queue(); - queue.add_crate("will_fail", &V1, 0, None)?; + const WILL_FAIL: KrateName = KrateName::from_static("will_fail"); + + queue.add_crate(&WILL_FAIL, &V1, 0, None)?; - queue.process_next_crate(|krate| { - assert_eq!("will_fail", krate.name); + process_next_crate(&env.context, |krate| { + assert_eq!(WILL_FAIL, krate.name); anyhow::bail!("simulate a failure"); })?; assert_eq!( env.runtime() - .block_on(env.cdn().purged_keys()) - .unwrap() - .to_string(), - "crate-will_fail", + .block_on(env.cdn().unwrap().purged_keys()) + .unwrap(), + SurrogateKey::from(WILL_FAIL).into() ); Ok(()) @@ -1727,333 +1063,58 @@ mod tests { let queue = env.build_queue(); - queue.add_crate("will_succeed", &V1, -1, None)?; + const WILL_SUCCEED: KrateName = KrateName::from_static("will_succeed"); + queue.add_crate(&WILL_SUCCEED, &V1, -1, None)?; - queue.process_next_crate(|krate| { - assert_eq!("will_succeed", krate.name); + process_next_crate(&env.context, |krate| { + assert_eq!(WILL_SUCCEED, krate.name); Ok(BuildPackageSummary::default()) })?; assert_eq!( env.runtime() - .block_on(env.cdn().purged_keys()) - .unwrap() - .to_string(), - "crate-will_succeed", - ); - - Ok(()) - } - - #[test] - fn test_pending_count() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - assert_eq!(queue.pending_count()?, 0); - queue.add_crate("foo", &V1, 0, None)?; - assert_eq!(queue.pending_count()?, 1); - queue.add_crate("bar", &V1, 0, None)?; - assert_eq!(queue.pending_count()?, 2); - - queue.process_next_crate(|krate| { - assert_eq!("foo", krate.name); - Ok(BuildPackageSummary::default()) - })?; - assert_eq!(queue.pending_count()?, 1); - - drop(env); - - Ok(()) - } - - #[test] - fn test_prioritized_count() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - assert_eq!(queue.prioritized_count()?, 0); - queue.add_crate("foo", &V1, 0, None)?; - assert_eq!(queue.prioritized_count()?, 1); - queue.add_crate("bar", &V1, -100, None)?; - assert_eq!(queue.prioritized_count()?, 2); - queue.add_crate("baz", &V1, 100, None)?; - assert_eq!(queue.prioritized_count()?, 2); - - queue.process_next_crate(|krate| { - assert_eq!("bar", krate.name); - Ok(BuildPackageSummary::default()) - })?; - assert_eq!(queue.prioritized_count()?, 1); - - Ok(()) - } - - #[test] - fn test_count_by_priority() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - assert!(queue.pending_count_by_priority()?.is_empty()); - - queue.add_crate("one", &V1, 1, None)?; - queue.add_crate("two", &V2, 2, None)?; - queue.add_crate("two_more", &V2, 2, None)?; - - assert_eq!( - queue.pending_count_by_priority()?, - HashMap::from_iter(vec![(1, 1), (2, 2)]) + .block_on(env.cdn().unwrap().purged_keys()) + .unwrap(), + SurrogateKey::from(WILL_SUCCEED).into() ); - while queue.pending_count()? > 0 { - queue.process_next_crate(|_| Ok(BuildPackageSummary::default()))?; - } - assert!(queue.pending_count_by_priority()?.is_empty()); - - Ok(()) - } - - #[test] - fn test_failed_count_for_reattempts() -> Result<()> { - let env = TestEnvironment::with_config_and_runtime( - TestEnvironment::base_config() - .build_attempts(MAX_ATTEMPTS) - .delay_between_build_attempts(Duration::ZERO) - .build()?, - )?; - - const MAX_ATTEMPTS: u16 = 3; - - let queue = env.build_queue(); - - assert_eq!(queue.failed_count()?, 0); - queue.add_crate("foo", &V1, -100, None)?; - assert_eq!(queue.failed_count()?, 0); - queue.add_crate("bar", &V1, 0, None)?; - - for _ in 0..MAX_ATTEMPTS { - assert_eq!(queue.failed_count()?, 0); - queue.process_next_crate(|krate| { - assert_eq!("foo", krate.name); - Ok(BuildPackageSummary { - should_reattempt: true, - ..Default::default() - }) - })?; - } - assert_eq!(queue.failed_count()?, 1); - - queue.process_next_crate(|krate| { - assert_eq!("bar", krate.name); - Ok(BuildPackageSummary::default()) - })?; - assert_eq!(queue.failed_count()?, 1); - - Ok(()) - } - - #[test] - fn test_failed_count_after_error() -> Result<()> { - let env = TestEnvironment::with_config_and_runtime( - TestEnvironment::base_config() - .build_attempts(MAX_ATTEMPTS) - .delay_between_build_attempts(Duration::ZERO) - .build()?, - )?; - - const MAX_ATTEMPTS: u16 = 3; - - let queue = env.build_queue(); - - assert_eq!(queue.failed_count()?, 0); - queue.add_crate("foo", &V1, -100, None)?; - assert_eq!(queue.failed_count()?, 0); - queue.add_crate("bar", &V1, 0, None)?; - - for _ in 0..MAX_ATTEMPTS { - assert_eq!(queue.failed_count()?, 0); - queue.process_next_crate(|krate| { - assert_eq!("foo", krate.name); - anyhow::bail!("this failed"); - })?; - } - assert_eq!(queue.failed_count()?, 1); - - queue.process_next_crate(|krate| { - assert_eq!("bar", krate.name); - Ok(BuildPackageSummary::default()) - })?; - assert_eq!(queue.failed_count()?, 1); - Ok(()) } - #[test] - fn test_queued_crates() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - let test_crates = [("bar", 0), ("foo", -10), ("baz", 10)]; - for krate in &test_crates { - queue.add_crate(krate.0, &V1, krate.1, None)?; - } - - assert_eq!( - vec![ - ("foo".into(), V1, -10), - ("bar".into(), V1, 0), - ("baz".into(), V1, 10), - ], - queue - .queued_crates()? - .into_iter() - .map(|c| (c.name.clone(), c.version, c.priority)) - .collect::>() - ); - - Ok(()) - } + #[tokio::test(flavor = "multi_thread")] + async fn test_last_seen_reference_in_db() -> Result<()> { + let env = TestEnvironment::new().await?; - #[test] - fn test_last_seen_reference_in_db() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; + let mut conn = env.async_db().async_conn().await; + let queue = env.async_build_queue(); + queue.unlock().await?; + assert!(!queue.is_locked().await?); - let queue = env.build_queue(); - queue.unlock()?; - assert!(!queue.is_locked()?); // initial db ref is empty - assert_eq!(queue.last_seen_reference()?, None); - assert!(!queue.is_locked()?); + assert_eq!(last_seen_reference(&mut conn).await?, None); + assert!(!queue.is_locked().await?); let oid = crates_index_diff::gix::ObjectId::from_hex( b"ffffffffffffffffffffffffffffffffffffffff", )?; - queue.set_last_seen_reference(oid)?; + set_last_seen_reference(&mut conn, oid).await?; - assert_eq!(queue.last_seen_reference()?, Some(oid)); - assert!(!queue.is_locked()?); - - Ok(()) - } - - #[test] - fn test_broken_db_reference_breaks() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - env.runtime().block_on(async { - let mut conn = env.async_db().async_conn().await; - set_config(&mut conn, ConfigName::LastSeenIndexReference, "invalid") - .await - .unwrap(); - }); - - let queue = env.build_queue(); - assert!(queue.last_seen_reference().is_err()); - - Ok(()) - } - - #[test] - fn test_queue_lock() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - // unlocked without config - assert!(!queue.is_locked()?); - - queue.lock()?; - assert!(queue.is_locked()?); - - queue.unlock()?; - assert!(!queue.is_locked()?); - - Ok(()) - } - - #[test] - fn test_add_long_name() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - let name: String = "krate".repeat(100); - - queue.add_crate(&name, &V1, 0, None)?; - - queue.process_next_crate(|krate| { - assert_eq!(name, krate.name); - Ok(BuildPackageSummary::default()) - })?; - - Ok(()) - } - - #[test] - fn test_add_long_version() -> Result<()> { - let env = TestEnvironment::new_with_runtime()?; - - let queue = env.build_queue(); - - let long_version = Version::parse(&format!( - "1.2.3-{}+{}", - "prerelease".repeat(100), - "build".repeat(100) - ))?; - - queue.add_crate("krate", &long_version, 0, None)?; - - queue.process_next_crate(|krate| { - assert_eq!(long_version, krate.version); - Ok(BuildPackageSummary::default()) - })?; - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_delete_version_from_queue() -> Result<()> { - let env = TestEnvironment::new().await?; - - let queue = env.async_build_queue(); - assert_eq!(queue.pending_count().await?, 0); - - queue.add_crate(KRATE, &V1, 0, None).await?; - queue.add_crate(KRATE, &V2, 0, None).await?; - - assert_eq!(queue.pending_count().await?, 2); - queue.remove_version_from_queue(KRATE, &V1).await?; - - assert_eq!(queue.pending_count().await?, 1); - - // only v2 remains - if let [k] = queue.queued_crates().await?.as_slice() { - assert_eq!(k.name, KRATE); - assert_eq!(k.version, V2); - } else { - panic!("expected only one queued crate"); - } + assert_eq!(last_seen_reference(&mut conn).await?, Some(oid)); + assert!(!queue.is_locked().await?); Ok(()) } #[tokio::test(flavor = "multi_thread")] - async fn test_delete_crate_from_queue() -> Result<()> { + async fn test_broken_db_reference_breaks() -> Result<()> { let env = TestEnvironment::new().await?; - let queue = env.async_build_queue(); - assert_eq!(queue.pending_count().await?, 0); - - queue.add_crate(KRATE, &V1, 0, None).await?; - queue.add_crate(KRATE, &V2, 0, None).await?; - - assert_eq!(queue.pending_count().await?, 2); - queue.remove_crate_from_queue(KRATE).await?; + let mut conn = env.async_db().async_conn().await; + set_config(&mut conn, ConfigName::LastSeenIndexReference, "invalid") + .await + .unwrap(); - assert_eq!(queue.pending_count().await?, 0); + assert!(last_seen_reference(&mut conn).await.is_err()); Ok(()) } diff --git a/src/config.rs b/src/config.rs index ac43a2d73..beedb3d52 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,8 +57,6 @@ pub struct Config { pub(crate) build_workspace_reinitialization_interval: Duration, // Build params - pub(crate) build_attempts: u16, - pub(crate) delay_between_build_attempts: Duration, pub(crate) rustwide_workspace: PathBuf, pub(crate) temp_dir: PathBuf, pub(crate) inside_docker: bool, @@ -77,6 +75,7 @@ pub struct Config { pub(crate) database: docs_rs_database::Config, pub(crate) repository_stats: docs_rs_repository_stats::Config, pub(crate) storage: Arc, + pub(crate) build_queue: Arc, } impl Config { @@ -104,11 +103,6 @@ impl Config { let temp_dir = prefix.join("tmp"); Ok(ConfigBuilder::default() - .build_attempts(env("DOCSRS_BUILD_ATTEMPTS", 5u16)?) - .delay_between_build_attempts(Duration::from_secs(env::( - "DOCSRS_DELAY_BETWEEN_BUILD_ATTEMPTS", - 60, - )?)) .delay_between_registry_fetches(Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, @@ -157,6 +151,7 @@ impl Config { .registry_api(docs_rs_registry_api::Config::from_environment()?) .database(docs_rs_database::Config::from_environment()?) .repository_stats(docs_rs_repository_stats::Config::from_environment()?) - .storage(Arc::new(docs_rs_storage::Config::from_environment()?))) + .storage(Arc::new(docs_rs_storage::Config::from_environment()?)) + .build_queue(Arc::new(docs_rs_build_queue::Config::from_environment()?))) } } diff --git a/src/context.rs b/src/context.rs index 8643509d0..cb1f0397d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,5 +1,6 @@ -use crate::{AsyncBuildQueue, BuildQueue, Config}; +use crate::{Config, docbuilder::BuilderMetrics}; use anyhow::Result; +use docs_rs_build_queue::{AsyncBuildQueue, BuildQueue}; use docs_rs_database::Pool; use docs_rs_fastly::Cdn; use docs_rs_opentelemetry::{AnyMeterProvider, get_meter_provider}; @@ -12,6 +13,7 @@ use tokio::runtime; pub struct Context { pub config: Arc, pub async_build_queue: Arc, + pub builder_metrics: Arc, // temporary place until the refactor is finished pub build_queue: Arc, pub storage: Arc, pub async_storage: Arc, @@ -73,9 +75,7 @@ impl Context { let cdn = cdn.map(Arc::new); let async_build_queue = Arc::new(AsyncBuildQueue::new( pool.clone(), - config.clone(), - async_storage.clone(), - cdn.clone(), + config.build_queue.clone(), &meter_provider, )); @@ -88,6 +88,7 @@ impl Context { Ok(Self { async_build_queue, build_queue, + builder_metrics: Arc::new(BuilderMetrics::new(&meter_provider)), storage, async_storage, cdn, diff --git a/src/docbuilder/mod.rs b/src/docbuilder/mod.rs index f0d3fdb03..6ba423a8d 100644 --- a/src/docbuilder/mod.rs +++ b/src/docbuilder/mod.rs @@ -3,9 +3,7 @@ mod rustwide_builder; pub(crate) use self::limits::Limits; pub(crate) use self::rustwide_builder::DocCoverage; -pub use self::rustwide_builder::{ - BuildPackageSummary, BuilderMetrics, PackageKind, RustwideBuilder, -}; +pub use self::rustwide_builder::{BuilderMetrics, PackageKind, RustwideBuilder}; #[cfg(test)] pub use self::rustwide_builder::{ diff --git a/src/docbuilder/rustwide_builder.rs b/src/docbuilder/rustwide_builder.rs index e6ff16cd7..f77f27d74 100644 --- a/src/docbuilder/rustwide_builder.rs +++ b/src/docbuilder/rustwide_builder.rs @@ -11,6 +11,7 @@ use crate::{ utils::{copy_dir_all, report_error}, }; use anyhow::{Context as _, Error, anyhow, bail}; +use docs_rs_build_queue::BuildPackageSummary; use docs_rs_cargo_metadata::{CargoMetadata, MetadataPackage}; use docs_rs_database::{ Pool, @@ -212,7 +213,7 @@ impl RustwideBuilder { registry_api: context.registry_api.clone(), repository_stats_updater: context.repository_stats_updater.clone(), workspace_initialize_time: Instant::now(), - builder_metrics: context.async_build_queue.builder_metrics(), + builder_metrics: context.builder_metrics.clone(), }) } @@ -1429,22 +1430,6 @@ pub(crate) struct BuildResult { pub(crate) successful: bool, } -#[derive(Debug)] -pub struct BuildPackageSummary { - pub successful: bool, - pub should_reattempt: bool, -} - -#[cfg(test)] -impl Default for BuildPackageSummary { - fn default() -> Self { - Self { - successful: true, - should_reattempt: false, - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 2fdc1fb62..1f98db8d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,13 +6,10 @@ clippy::result_large_err, )] -pub use self::build_queue::{ - AsyncBuildQueue, BuildQueue, queue_rebuilds, queue_rebuilds_faulty_rustdoc, -}; pub use self::config::Config; pub use self::context::Context; pub use self::docbuilder::PackageKind; -pub use self::docbuilder::{BuildPackageSummary, RustwideBuilder}; +pub use self::docbuilder::RustwideBuilder; pub use self::index::Index; pub use self::web::start_web_server; @@ -21,7 +18,7 @@ pub use docs_rs_utils::{ }; pub use font_awesome_as_a_crate::icons; -mod build_queue; +pub mod build_queue; mod config; mod context; pub mod db; diff --git a/src/metrics/service.rs b/src/metrics/service.rs index d2ad20c33..218ef78d9 100644 --- a/src/metrics/service.rs +++ b/src/metrics/service.rs @@ -1,5 +1,5 @@ -use crate::AsyncBuildQueue; use anyhow::{Error, Result}; +use docs_rs_build_queue::AsyncBuildQueue; use docs_rs_opentelemetry::AnyMeterProvider; use opentelemetry::{KeyValue, metrics::Gauge}; use std::collections::HashSet; diff --git a/src/test/mod.rs b/src/test/mod.rs index 75dae9758..3a738ea39 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod headers; pub(crate) use self::fakes::{FakeBuild, fake_release_that_failed_before_build}; use crate::{ - AsyncBuildQueue, BuildQueue, Config, Context, + Config, Context, config::ConfigBuilder, error::Result, web::{build_axum_app, cache, page::TemplateData}, @@ -12,6 +12,7 @@ use anyhow::{Context as _, anyhow}; use axum::body::Bytes; use axum::{Router, body::Body, http::Request, response::Response as AxumResponse}; use axum_extra::headers::{ETag, HeaderMapExt as _}; +use docs_rs_build_queue::{AsyncBuildQueue, BuildQueue}; use docs_rs_database::testing::TestDatabase; use docs_rs_fastly::Cdn; use docs_rs_headers::{IfNoneMatch, SURROGATE_CONTROL, SurrogateKeys}; @@ -493,11 +494,8 @@ impl TestEnvironment { &self.context.build_queue } - pub(crate) fn cdn(&self) -> &Cdn { - self.context - .cdn - .as_ref() - .expect("in test envs we always have the mock CDN") + pub(crate) fn cdn(&self) -> Option<&Cdn> { + self.context.cdn.as_deref() } pub(crate) fn config(&self) -> &Config { diff --git a/src/utils/consistency/db.rs b/src/utils/consistency/db.rs index f2a9cf23c..68744f0e3 100644 --- a/src/utils/consistency/db.rs +++ b/src/utils/consistency/db.rs @@ -36,7 +36,7 @@ pub(super) async fn load(conn: &mut sqlx::PgConnection, config: &Config) -> Resu ) ) AS inp ORDER BY name"#, - config.build_attempts as i32, + config.build_queue.build_attempts as i32, ) .fetch_all(conn) .await?; @@ -66,13 +66,16 @@ pub(super) async fn load(conn: &mut sqlx::PgConnection, config: &Config) -> Resu mod tests { use super::*; use crate::test::{V1, V2, V3, async_wrapper}; + use docs_rs_types::KrateName; use pretty_assertions::assert_eq; + const QUEUED: KrateName = KrateName::from_static("queued"); + #[test] fn test_load() { async_wrapper(|env| async move { env.async_build_queue() - .add_crate("queued", &V1, 0, None) + .add_crate(&QUEUED, &V1, 0, None) .await?; env.fake_release() .await diff --git a/src/utils/consistency/mod.rs b/src/utils/consistency/mod.rs index 02b2f2be6..528211a23 100644 --- a/src/utils/consistency/mod.rs +++ b/src/utils/consistency/mod.rs @@ -1,6 +1,7 @@ -use crate::build_queue::PRIORITY_CONSISTENCY_CHECK; -use crate::{Context, db::delete}; +use crate::{Context, build_queue::set_yanked, db::delete}; use anyhow::{Context as _, Result}; +use docs_rs_build_queue::PRIORITY_CONSISTENCY_CHECK; +use docs_rs_types::KrateName; use itertools::Itertools; use tracing::{info, warn}; @@ -96,11 +97,12 @@ where result.crates_deleted += 1; } diff::Difference::CrateNotInDb(name, versions) => { + let name: KrateName = name.parse()?; for version in versions { if !dry_run && let Err(err) = ctx .async_build_queue - .add_crate(name, version, PRIORITY_CONSISTENCY_CHECK, None) + .add_crate(&name, version, PRIORITY_CONSISTENCY_CHECK, None) .await { warn!("{:?}", err); @@ -124,10 +126,11 @@ where result.releases_deleted += 1; } diff::Difference::ReleaseNotInDb(name, version) => { + let name: KrateName = name.parse()?; if !dry_run && let Err(err) = ctx .async_build_queue - .add_crate(name, version, PRIORITY_CONSISTENCY_CHECK, None) + .add_crate(&name, version, PRIORITY_CONSISTENCY_CHECK, None) .await { warn!("{:?}", err); @@ -135,12 +138,8 @@ where result.builds_queued += 1; } diff::Difference::ReleaseYank(name, version, yanked) => { - if !dry_run - && let Err(err) = ctx - .async_build_queue - .set_yanked(name, version, *yanked) - .await - { + let name: KrateName = name.parse()?; + if !dry_run && let Err(err) = set_yanked(ctx, &name, version, *yanked).await { warn!("{:?}", err); } result.yanks_corrected += 1; @@ -156,7 +155,7 @@ mod tests { use super::diff::Difference; use super::*; use crate::test::{TestEnvironment, V1, V2, async_wrapper}; - use docs_rs_types::Version; + use docs_rs_types::{Version, testing::KRATE}; use sqlx::Row as _; async fn count(env: &TestEnvironment, sql: &str) -> Result { @@ -302,7 +301,7 @@ mod tests { .into_iter() .map(|c| (c.name, V1, c.priority)) .collect::>(), - vec![("krate".into(), V1, 15)] + vec![(KRATE, V1, 15)] ); Ok(()) }) @@ -328,7 +327,7 @@ mod tests { .into_iter() .map(|c| (c.name, c.version, c.priority)) .collect::>(), - vec![("krate".into(), V1, 15), ("krate".into(), V2, 15)] + vec![(KRATE, V1, 15), (KRATE, V2, 15)] ); Ok(()) }) diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 1cefffcce..00cf1c1d0 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -3,9 +3,9 @@ //! This daemon will start web server, track new packages and build them use crate::{ - AsyncBuildQueue, Config, Context, Index, RustwideBuilder, + Context, Index, RustwideBuilder, + build_queue::{get_new_crates, queue_rebuilds}, metrics::service::OtelServiceMetrics, - queue_rebuilds, utils::{queue_builder, report_error}, web::start_web_server, }; @@ -20,17 +20,20 @@ use tracing::{debug, info, trace}; /// Run the registry watcher /// NOTE: this should only be run once, otherwise crates would be added /// to the queue multiple times. -pub async fn watch_registry(build_queue: &AsyncBuildQueue, config: &Config) -> Result<(), Error> { +pub async fn watch_registry(context: &Context) -> Result<(), Error> { let mut last_gc = Instant::now(); + let config = context.config.clone(); + let queue = context.async_build_queue.clone(); + loop { - if build_queue.is_locked().await? { + if queue.is_locked().await? { debug!("Queue is locked, skipping checking new crates"); } else { debug!("Checking new crates"); - let index = Index::from_config(config).await?; - match build_queue - .get_new_crates(&index) + let index = Index::from_config(&context.config).await?; + + match get_new_crates(context, &index) .await .context("Failed to get new crates") { @@ -47,15 +50,13 @@ pub async fn watch_registry(build_queue: &AsyncBuildQueue, config: &Config) -> R } } -fn start_registry_watcher(context: &Context) -> Result<(), Error> { - let build_queue = context.async_build_queue.clone(); - let config = context.config.clone(); - - context.runtime.spawn(async move { +fn start_registry_watcher(context: Arc) -> Result<(), Error> { + let runtime = context.runtime.clone(); + runtime.spawn(async move { // space this out to prevent it from clashing against the queue-builder thread on launch tokio::time::sleep(Duration::from_secs(30)).await; - watch_registry(&build_queue, &config).await + watch_registry(&context).await }); Ok(()) @@ -148,7 +149,7 @@ pub fn start_daemon(context: Context, enable_registry_watcher: bool) -> Result<( if enable_registry_watcher { // check new crates every minute - start_registry_watcher(&context)?; + start_registry_watcher(context.clone())?; } // build new crates every minute diff --git a/src/utils/queue.rs b/src/utils/queue.rs index 92d697ed6..6c246fffa 100644 --- a/src/utils/queue.rs +++ b/src/utils/queue.rs @@ -1,6 +1,6 @@ //! Utilities for interacting with the build queue -use crate::build_queue::PRIORITY_DEFAULT; use crate::error::Result; +use docs_rs_build_queue::PRIORITY_DEFAULT; use futures_util::stream::TryStreamExt; /// Get the build queue priority for a crate, returns the matching pattern too diff --git a/src/utils/queue_builder.rs b/src/utils/queue_builder.rs index 292cedfc0..cdfd4da94 100644 --- a/src/utils/queue_builder.rs +++ b/src/utils/queue_builder.rs @@ -1,4 +1,5 @@ use crate::Context; +use crate::build_queue::build_next_queue_package; use crate::{docbuilder::RustwideBuilder, utils::report_error}; use anyhow::{Context as _, Error}; use std::panic::{AssertUnwindSafe, catch_unwind}; @@ -39,7 +40,7 @@ pub fn queue_builder(context: &Context, mut builder: RustwideBuilder) -> Result< // If a panic occurs while building a crate, lock the queue until an admin has a chance to look at it. debug!("Checking build queue"); let res = catch_unwind(AssertUnwindSafe(|| { - match build_queue.build_next_queue_package(context, &mut builder) { + match build_next_queue_package(context, &mut builder) { Ok(true) => {} Ok(false) => { debug!("Queue is empty, going back to sleep"); diff --git a/src/web/builds.rs b/src/web/builds.rs index b6aabe7ee..07fb2c021 100644 --- a/src/web/builds.rs +++ b/src/web/builds.rs @@ -1,6 +1,5 @@ use crate::{ - AsyncBuildQueue, Config, - build_queue::PRIORITY_MANUAL_FROM_CRATES_IO, + Config, docbuilder::Limits, impl_axum_webpage, web::{ @@ -21,6 +20,7 @@ use axum_extra::{ }; use chrono::{DateTime, Utc}; use constant_time_eq::constant_time_eq; +use docs_rs_build_queue::{AsyncBuildQueue, PRIORITY_MANUAL_FROM_CRATES_IO}; use docs_rs_headers::CanonicalUrl; use docs_rs_types::{BuildId, BuildStatus, KrateName, ReqVersion, Version}; use http::StatusCode; @@ -222,7 +222,7 @@ mod tests { }; use anyhow::Result; use axum::{body::Body, http::Request}; - use docs_rs_types::BuildStatus; + use docs_rs_types::{BuildStatus, testing::FOO}; use kuchikiki::traits::TendrilSink; use reqwest::StatusCode; use tower::ServiceExt; @@ -409,7 +409,7 @@ mod tests { let build_queue = env.async_build_queue(); assert_eq!(build_queue.pending_count().await?, 0); - assert!(!build_queue.has_build_queued("foo", &V1).await?); + assert!(!build_queue.has_build_queued(&FOO, &V1).await?); { let app = env.web_app().await; @@ -429,7 +429,7 @@ mod tests { } assert_eq!(build_queue.pending_count().await?, 1); - assert!(build_queue.has_build_queued("foo", &V1).await?); + assert!(build_queue.has_build_queued(&FOO, &V1).await?); { let app = env.web_app().await; @@ -455,7 +455,7 @@ mod tests { } assert_eq!(build_queue.pending_count().await?, 1); - assert!(build_queue.has_build_queued("foo", &V1).await?); + assert!(build_queue.has_build_queued(&FOO, &V1).await?); Ok(()) } diff --git a/src/web/releases.rs b/src/web/releases.rs index 71357a6c4..a6496257a 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -1,9 +1,7 @@ //! Releases web handlersrelease use crate::{ - AsyncBuildQueue, Config, - build_queue::{PRIORITY_CONTINUOUS, QueuedCrate}, - impl_axum_webpage, + Config, impl_axum_webpage, utils::report_error, web::{ axum_redirect, @@ -24,6 +22,7 @@ use axum::{ }; use base64::{Engine, engine::general_purpose::STANDARD as b64}; use chrono::{DateTime, Utc}; +use docs_rs_build_queue::{AsyncBuildQueue, PRIORITY_CONTINUOUS, QueuedCrate}; use docs_rs_registry_api::{self as registry_api, RegistryApi}; use docs_rs_types::{KrateName, ReqVersion, Version}; use docs_rs_uri::encode_url_path; @@ -796,6 +795,7 @@ mod tests { use chrono::{Duration, TimeZone}; use docs_rs_registry_api::{CrateOwner, OwnerKind}; use docs_rs_types::BuildStatus; + use docs_rs_types::testing::{BAR, BAZ, FOO}; use kuchikiki::traits::TendrilSink; use mockito::Matcher; use reqwest::StatusCode; @@ -1795,9 +1795,9 @@ mod tests { ); let queue = env.async_build_queue(); - queue.add_crate("foo", &V1, 0, None).await?; - queue.add_crate("bar", &V2, -10, None).await?; - queue.add_crate("baz", &V3, 10, None).await?; + queue.add_crate(&FOO, &V1, 0, None).await?; + queue.add_crate(&BAR, &V2, -10, None).await?; + queue.add_crate(&BAZ, &V3, 10, None).await?; let full = kuchikiki::parse_html().one(web.get("/releases/queue").await?.text().await?); let items = full @@ -1835,8 +1835,8 @@ mod tests { // we have two queued releases, where the build for one is already in progress let queue = env.async_build_queue(); - queue.add_crate("foo", &V1, 0, None).await?; - queue.add_crate("bar", &V2, 0, None).await?; + queue.add_crate(&FOO, &V1, 0, None).await?; + queue.add_crate(&BAR, &V2, 0, None).await?; env.fake_release() .await @@ -1911,13 +1911,13 @@ mod tests { let web = env.web_app().await; let queue = env.async_build_queue(); queue - .add_crate("foo", &V1, PRIORITY_CONTINUOUS, None) + .add_crate(&FOO, &V1, PRIORITY_CONTINUOUS, None) .await?; queue - .add_crate("bar", &V2, PRIORITY_CONTINUOUS + 1, None) + .add_crate(&BAR, &V2, PRIORITY_CONTINUOUS + 1, None) .await?; queue - .add_crate("baz", &V3, PRIORITY_CONTINUOUS - 1, None) + .add_crate(&BAZ, &V3, PRIORITY_CONTINUOUS - 1, None) .await?; let full = kuchikiki::parse_html().one(web.get("/releases/queue").await?.text().await?);