From e350abf01fab1665b9fc3bc9afb35592f996a451 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Thu, 19 Feb 2026 03:53:29 +0530 Subject: [PATCH 01/10] init hotblocks-sidecar crate --- Cargo.lock | 22 ++++++ Cargo.toml | 1 + crates/hotblocks-sidecar/Cargo.toml | 21 ++++++ crates/hotblocks-sidecar/src/cli.rs | 22 ++++++ crates/hotblocks-sidecar/src/hotblocks.rs | 21 ++++++ crates/hotblocks-sidecar/src/main.rs | 91 +++++++++++++++++++++++ crates/hotblocks-sidecar/src/status.rs | 24 ++++++ 7 files changed, 202 insertions(+) create mode 100644 crates/hotblocks-sidecar/Cargo.toml create mode 100644 crates/hotblocks-sidecar/src/cli.rs create mode 100644 crates/hotblocks-sidecar/src/hotblocks.rs create mode 100644 crates/hotblocks-sidecar/src/main.rs create mode 100644 crates/hotblocks-sidecar/src/status.rs diff --git a/Cargo.lock b/Cargo.lock index becafa4..d648ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4917,6 +4917,27 @@ dependencies = [ "url", ] +[[package]] +name = "sqd-hotblocks-sidecar" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "bytes", + "clap", + "futures", + "reqwest", + "serde", + "serde_json", + "sqd-primitives", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "sqd-polars" version = "0.1.0" @@ -5479,6 +5500,7 @@ dependencies = [ "http 1.2.0", "http-body 1.0.1", "pin-project-lite", + "tower", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 3dcb0cc..da6a3c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/data-source", "crates/dataset", "crates/hotblocks", + "crates/hotblocks-sidecar", "crates/polars", "crates/primitives", "crates/query", diff --git a/crates/hotblocks-sidecar/Cargo.toml b/crates/hotblocks-sidecar/Cargo.toml new file mode 100644 index 0000000..ed04b23 --- /dev/null +++ b/crates/hotblocks-sidecar/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "sqd-hotblocks-sidecar" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = { workspace = true } +axum = { workspace = true } +bytes = { workspace = true } +clap = { workspace = true, features = ["derive", "env"] } +futures = { workspace = true } +reqwest = { workspace = true, features = ["gzip", "json", "stream"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +sqd-primitives = { path = "../primitives" } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +tower-http = { version = "0.6", features = ["request-id", "util"] } +uuid = { workspace = true, features = ["v4"] } +url = { workspace = true } diff --git a/crates/hotblocks-sidecar/src/cli.rs b/crates/hotblocks-sidecar/src/cli.rs new file mode 100644 index 0000000..12403be --- /dev/null +++ b/crates/hotblocks-sidecar/src/cli.rs @@ -0,0 +1,22 @@ +use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about = "Hotblocks sidecar service", long_about = None)] +pub struct Cli { + /// URL of the Hotblocks service to send dataset information to + #[arg(long)] + pub hotblocks_url: Url, + + /// URL of the status endpoint to poll for dataset updates + #[arg(long)] + pub status_url: Url, + + /// Dataset identifiers to track (can be specified multiple times) + #[arg(long = "dataset")] + pub dataset: Vec, + + /// Interval in seconds between polling the status endpoint + #[arg(long, default_value = "60")] + pub poll_interval_secs: u64, +} diff --git a/crates/hotblocks-sidecar/src/hotblocks.rs b/crates/hotblocks-sidecar/src/hotblocks.rs new file mode 100644 index 0000000..75acbef --- /dev/null +++ b/crates/hotblocks-sidecar/src/hotblocks.rs @@ -0,0 +1,21 @@ +use reqwest::Client; +use sqd_primitives::BlockNumber; +use url::Url; + +pub async fn set_retention( + client: &Client, + base_url: &Url, + dataset: &str, + from_block: BlockNumber, +) -> anyhow::Result<()> { + let retention_url = base_url.join(&format!("/datasets/{dataset}/retention"))?; + + client + .post(retention_url) + .json(&serde_json::json!({"FromBlock": {"number": from_block}})) + .send() + .await? + .error_for_status()?; + + Ok(()) +} diff --git a/crates/hotblocks-sidecar/src/main.rs b/crates/hotblocks-sidecar/src/main.rs new file mode 100644 index 0000000..1c22e00 --- /dev/null +++ b/crates/hotblocks-sidecar/src/main.rs @@ -0,0 +1,91 @@ +mod cli; +mod hotblocks; +mod status; + +use anyhow::Context; +use clap::Parser; +use cli::Cli; +use std::collections::HashMap; +use std::time::Duration; + +fn main() -> anyhow::Result<()> { + let args = Cli::parse(); + + init_tracing(); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(run(args))?; + + Ok(()) +} + +async fn run(args: Cli) -> anyhow::Result<()> { + if args.poll_interval_secs == 0 { + anyhow::bail!("poll interval must be greater than 0"); + } + + let client = reqwest::Client::new(); + let poll_interval = Duration::from_secs(args.poll_interval_secs); + let mut interval = tokio::time::interval(poll_interval); + + loop { + let result = poll_once(&client, &args).await; + if let Err(err) = result { + tracing::warn!(error = ?err, "failed to refresh retention settings"); + } + + interval.tick().await; + } +} + +async fn poll_once(client: &reqwest::Client, args: &Cli) -> anyhow::Result<()> { + let status = status::get_status(client, args.status_url.as_str()).await?; + + let statuses = status + .datasets + .into_iter() + .map(|dataset| (dataset.id, dataset.height)) + .collect::>(); + + for dataset in &args.dataset { + match statuses.get(dataset) { + Some(Some(height)) => { + hotblocks::set_retention(client, &args.hotblocks_url, dataset, *height) + .await + .with_context(|| format!("failed to update retention for {dataset}"))?; + tracing::info!(dataset, height, "updated retention policy"); + } + Some(None) => { + tracing::info!(dataset, "dataset has no reported height yet"); + } + None => { + tracing::warn!(dataset, "dataset not found in status json"); + } + } + } + + Ok(()) +} + +fn init_tracing() { + use std::io::IsTerminal; + + let env_filter = tracing_subscriber::EnvFilter::builder().parse_lossy( + std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV).unwrap_or("info".to_string()), + ); + + if std::io::stdout().is_terminal() { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .compact() + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .json() + .with_current_span(false) + .init(); + } +} diff --git a/crates/hotblocks-sidecar/src/status.rs b/crates/hotblocks-sidecar/src/status.rs new file mode 100644 index 0000000..0168a32 --- /dev/null +++ b/crates/hotblocks-sidecar/src/status.rs @@ -0,0 +1,24 @@ +use serde::Deserialize; +use sqd_primitives::BlockNumber; + +#[derive(Debug, Deserialize)] +pub struct SchedulingStatus { + pub datasets: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct DatasetStatus { + pub id: String, + pub height: Option, +} + +pub async fn get_status(client: &reqwest::Client, url: &str) -> anyhow::Result { + let status = client + .get(url) + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(status) +} From be80c769b04d6439fa84caad19bb65474a0c81a6 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Sun, 22 Feb 2026 11:58:39 +0530 Subject: [PATCH 02/10] find correspoding dataset id by its name --- Cargo.lock | 7 +- crates/hotblocks-sidecar/Cargo.toml | 6 +- crates/hotblocks-sidecar/src/cli.rs | 10 +- crates/hotblocks-sidecar/src/datasets.rs | 40 +++++++ crates/hotblocks-sidecar/src/main.rs | 141 +++++++++++++++++------ crates/hotblocks-sidecar/src/status.rs | 3 +- crates/hotblocks-sidecar/src/types.rs | 1 + 7 files changed, 159 insertions(+), 49 deletions(-) create mode 100644 crates/hotblocks-sidecar/src/datasets.rs create mode 100644 crates/hotblocks-sidecar/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index d648ecc..b646395 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4922,20 +4922,16 @@ name = "sqd-hotblocks-sidecar" version = "0.1.0" dependencies = [ "anyhow", - "axum", - "bytes", "clap", - "futures", "reqwest", "serde", "serde_json", + "serde_yaml", "sqd-primitives", "tokio", - "tower-http", "tracing", "tracing-subscriber", "url", - "uuid", ] [[package]] @@ -5500,7 +5496,6 @@ dependencies = [ "http 1.2.0", "http-body 1.0.1", "pin-project-lite", - "tower", "tower-layer", "tower-service", "tracing", diff --git a/crates/hotblocks-sidecar/Cargo.toml b/crates/hotblocks-sidecar/Cargo.toml index ed04b23..4973519 100644 --- a/crates/hotblocks-sidecar/Cargo.toml +++ b/crates/hotblocks-sidecar/Cargo.toml @@ -5,17 +5,13 @@ edition = "2024" [dependencies] anyhow = { workspace = true } -axum = { workspace = true } -bytes = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } -futures = { workspace = true } reqwest = { workspace = true, features = ["gzip", "json", "stream"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +serde_yaml = { workspace = true } sqd-primitives = { path = "../primitives" } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } -tower-http = { version = "0.6", features = ["request-id", "util"] } -uuid = { workspace = true, features = ["v4"] } url = { workspace = true } diff --git a/crates/hotblocks-sidecar/src/cli.rs b/crates/hotblocks-sidecar/src/cli.rs index 12403be..56d0ca1 100644 --- a/crates/hotblocks-sidecar/src/cli.rs +++ b/crates/hotblocks-sidecar/src/cli.rs @@ -16,7 +16,15 @@ pub struct Cli { #[arg(long = "dataset")] pub dataset: Vec, + /// URL of the datasets YAML file listing available network datasets + #[arg(long)] + pub datasets_url: Url, + + /// Interval in seconds between refreshing the datasets list + #[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))] + pub datasets_update_interval_secs: u64, + /// Interval in seconds between polling the status endpoint - #[arg(long, default_value = "60")] + #[arg(long, default_value = "60", value_parser = clap::value_parser!(u64).range(1..))] pub poll_interval_secs: u64, } diff --git a/crates/hotblocks-sidecar/src/datasets.rs b/crates/hotblocks-sidecar/src/datasets.rs new file mode 100644 index 0000000..df5109c --- /dev/null +++ b/crates/hotblocks-sidecar/src/datasets.rs @@ -0,0 +1,40 @@ +use crate::types::DatasetId; +use serde::Deserialize; +use std::collections::HashMap; +use url::Url; + +#[derive(Deserialize)] +struct Dataset { + name: String, + id: DatasetId, +} + +#[derive(Deserialize)] +struct DatasetsFile { + #[serde(rename = "sqd-network-datasets")] + sqd_network_datasets: Vec, +} + +/// Downloads the datasets manifest and returns a map from dataset name to dataset ID. +pub async fn get_name_to_id( + client: &reqwest::Client, + url: &Url, +) -> anyhow::Result> { + let bytes = client + .get(url.as_str()) + .send() + .await? + .error_for_status()? + .bytes() + .await?; + + let file: DatasetsFile = serde_yaml::from_slice(&bytes)?; + + let map = file + .sqd_network_datasets + .into_iter() + .map(|d| (d.name, d.id)) + .collect(); + + Ok(map) +} diff --git a/crates/hotblocks-sidecar/src/main.rs b/crates/hotblocks-sidecar/src/main.rs index 1c22e00..6103d0b 100644 --- a/crates/hotblocks-sidecar/src/main.rs +++ b/crates/hotblocks-sidecar/src/main.rs @@ -1,12 +1,17 @@ mod cli; +mod datasets; mod hotblocks; mod status; +mod types; use anyhow::Context; use clap::Parser; use cli::Cli; use std::collections::HashMap; use std::time::Duration; +use tokio::time::Instant; +use types::DatasetId; +use url::Url; fn main() -> anyhow::Result<()> { let args = Cli::parse(); @@ -16,57 +21,121 @@ fn main() -> anyhow::Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? - .block_on(run(args))?; + .block_on( + Sidecar::new( + args.hotblocks_url, + args.status_url, + args.datasets_url, + args.dataset, + Duration::from_secs(args.poll_interval_secs), + Duration::from_secs(args.datasets_update_interval_secs), + ) + .run(), + )?; Ok(()) } -async fn run(args: Cli) -> anyhow::Result<()> { - if args.poll_interval_secs == 0 { - anyhow::bail!("poll interval must be greater than 0"); +struct Sidecar { + client: reqwest::Client, + hotblocks_url: Url, + status_url: Url, + datasets_url: Url, + datasets: Vec, + poll_interval: Duration, + datasets_update_interval: Duration, + name_to_id: HashMap, + last_datasets_refresh: Instant, +} + +impl Sidecar { + fn new( + hotblocks_url: Url, + status_url: Url, + datasets_url: Url, + datasets: Vec, + poll_interval: Duration, + datasets_update_interval: Duration, + ) -> Self { + Self { + client: reqwest::Client::new(), + hotblocks_url, + status_url, + datasets_url, + datasets, + poll_interval, + datasets_update_interval, + name_to_id: HashMap::new(), + last_datasets_refresh: Instant::now() - datasets_update_interval, + } } - let client = reqwest::Client::new(); - let poll_interval = Duration::from_secs(args.poll_interval_secs); - let mut interval = tokio::time::interval(poll_interval); + async fn run(&mut self) -> anyhow::Result<()> { + let mut interval = tokio::time::interval(self.poll_interval); - loop { - let result = poll_once(&client, &args).await; - if let Err(err) = result { - tracing::warn!(error = ?err, "failed to refresh retention settings"); - } + loop { + interval.tick().await; + self.maybe_refresh_datasets().await; - interval.tick().await; + if let Err(err) = self.poll_once().await { + tracing::warn!(error = ?err, "failed to refresh retention settings"); + } + } } -} -async fn poll_once(client: &reqwest::Client, args: &Cli) -> anyhow::Result<()> { - let status = status::get_status(client, args.status_url.as_str()).await?; - - let statuses = status - .datasets - .into_iter() - .map(|dataset| (dataset.id, dataset.height)) - .collect::>(); - - for dataset in &args.dataset { - match statuses.get(dataset) { - Some(Some(height)) => { - hotblocks::set_retention(client, &args.hotblocks_url, dataset, *height) - .await - .with_context(|| format!("failed to update retention for {dataset}"))?; - tracing::info!(dataset, height, "updated retention policy"); - } - Some(None) => { - tracing::info!(dataset, "dataset has no reported height yet"); + async fn maybe_refresh_datasets(&mut self) { + if self.last_datasets_refresh.elapsed() < self.datasets_update_interval { + return; + } + + match datasets::get_name_to_id(&self.client, &self.datasets_url).await { + Ok(map) => { + tracing::info!("refreshed datasets manifest"); + self.name_to_id = map; + self.last_datasets_refresh = Instant::now(); } - None => { - tracing::warn!(dataset, "dataset not found in status json"); + Err(err) => { + tracing::warn!(error = ?err, "failed to refresh datasets manifest"); } } } - Ok(()) + async fn poll_once(&self) -> anyhow::Result<()> { + let status = status::get_status(&self.client, self.status_url.as_str()).await?; + + let statuses = status + .datasets + .into_iter() + .map(|dataset| (dataset.id, dataset.height)) + .collect::>(); + + for dataset in &self.datasets { + let dataset_id = match self.name_to_id.get(dataset) { + Some(name) => name, + None => { + tracing::warn!(dataset, "dataset not found in manifest, skipping"); + continue; + } + }; + + match statuses.get(dataset_id) { + Some(Some(height)) => { + hotblocks::set_retention(&self.client, &self.hotblocks_url, dataset, *height) + .await + .with_context(|| format!("failed to update retention for {dataset}"))?; + tracing::info!(dataset, height, "updated retention policy"); + } + Some(None) => { + tracing::info!(dataset, "dataset has no reported height yet"); + } + None => { + tracing::warn!(dataset, "dataset not found in status json"); + } + } + } + + Ok(()) + } } fn init_tracing() { diff --git a/crates/hotblocks-sidecar/src/status.rs b/crates/hotblocks-sidecar/src/status.rs index 0168a32..06b4f70 100644 --- a/crates/hotblocks-sidecar/src/status.rs +++ b/crates/hotblocks-sidecar/src/status.rs @@ -1,3 +1,4 @@ +use crate::types::DatasetId; use serde::Deserialize; use sqd_primitives::BlockNumber; @@ -8,7 +9,7 @@ pub struct SchedulingStatus { #[derive(Debug, Deserialize)] pub struct DatasetStatus { - pub id: String, + pub id: DatasetId, pub height: Option, } diff --git a/crates/hotblocks-sidecar/src/types.rs b/crates/hotblocks-sidecar/src/types.rs new file mode 100644 index 0000000..257a76c --- /dev/null +++ b/crates/hotblocks-sidecar/src/types.rs @@ -0,0 +1 @@ +pub type DatasetId = String; // s3:// From 05b1c4db707b4786ab6e4aec752a6222c34103cb Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Mon, 23 Feb 2026 10:29:12 +0530 Subject: [PATCH 03/10] add hotblocks-sidecar to Dockerfile --- Dockerfile | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Dockerfile b/Dockerfile index 2252b1e..117204c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,16 @@ COPY --from=hotblocks-builder /app/target/release/sqd-hotblocks . ENTRYPOINT ["/app/sqd-hotblocks"] +FROM builder AS hotblocks-sidecar-builder +RUN cargo build -p sqd-hotblocks-sidecar --release + + +FROM rust AS hotblocks-sidecar +WORKDIR /app +COPY --from=hotblocks-sidecar-builder /app/target/release/sqd-hotblocks-sidecar . +ENTRYPOINT ["/app/sqd-hotblocks-sidecar"] + + FROM builder AS archive-builder RUN cargo build -p sqd-archive --release From c4b270b6c14cd13dead814f93df817248525a196 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Tue, 24 Feb 2026 17:56:13 +0530 Subject: [PATCH 04/10] allow to specify dataset id explicitly --- Cargo.lock | 2 +- Cargo.toml | 2 +- Dockerfile | 10 ++-- .../Cargo.toml | 2 +- .../src/cli.rs | 10 ++-- .../src/datasets.rs | 0 .../src/hotblocks.rs | 0 .../src/main.rs | 46 ++++++++++++------- .../src/status.rs | 0 crates/hotblocks-retain/src/types.rs | 15 ++++++ crates/hotblocks-sidecar/src/types.rs | 1 - 11 files changed, 58 insertions(+), 30 deletions(-) rename crates/{hotblocks-sidecar => hotblocks-retain}/Cargo.toml (94%) rename crates/{hotblocks-sidecar => hotblocks-retain}/src/cli.rs (79%) rename crates/{hotblocks-sidecar => hotblocks-retain}/src/datasets.rs (100%) rename crates/{hotblocks-sidecar => hotblocks-retain}/src/hotblocks.rs (100%) rename crates/{hotblocks-sidecar => hotblocks-retain}/src/main.rs (72%) rename crates/{hotblocks-sidecar => hotblocks-retain}/src/status.rs (100%) create mode 100644 crates/hotblocks-retain/src/types.rs delete mode 100644 crates/hotblocks-sidecar/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index b646395..cdbd3f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4918,7 +4918,7 @@ dependencies = [ ] [[package]] -name = "sqd-hotblocks-sidecar" +name = "sqd-hotblocks-retain" version = "0.1.0" dependencies = [ "anyhow", diff --git a/Cargo.toml b/Cargo.toml index da6a3c6..efa4e4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ "crates/data-source", "crates/dataset", "crates/hotblocks", - "crates/hotblocks-sidecar", + "crates/hotblocks-retain", "crates/polars", "crates/primitives", "crates/query", diff --git a/Dockerfile b/Dockerfile index 117204c..bcf8bc6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,14 +22,14 @@ COPY --from=hotblocks-builder /app/target/release/sqd-hotblocks . ENTRYPOINT ["/app/sqd-hotblocks"] -FROM builder AS hotblocks-sidecar-builder -RUN cargo build -p sqd-hotblocks-sidecar --release +FROM builder AS hotblocks-retain-builder +RUN cargo build -p sqd-hotblocks-retain --release -FROM rust AS hotblocks-sidecar +FROM rust AS hotblocks-retain WORKDIR /app -COPY --from=hotblocks-sidecar-builder /app/target/release/sqd-hotblocks-sidecar . -ENTRYPOINT ["/app/sqd-hotblocks-sidecar"] +COPY --from=hotblocks-retain-builder /app/target/release/sqd-hotblocks-retain . +ENTRYPOINT ["/app/sqd-hotblocks-retain"] FROM builder AS archive-builder diff --git a/crates/hotblocks-sidecar/Cargo.toml b/crates/hotblocks-retain/Cargo.toml similarity index 94% rename from crates/hotblocks-sidecar/Cargo.toml rename to crates/hotblocks-retain/Cargo.toml index 4973519..4b49b47 100644 --- a/crates/hotblocks-sidecar/Cargo.toml +++ b/crates/hotblocks-retain/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sqd-hotblocks-sidecar" +name = "sqd-hotblocks-retain" version = "0.1.0" edition = "2024" diff --git a/crates/hotblocks-sidecar/src/cli.rs b/crates/hotblocks-retain/src/cli.rs similarity index 79% rename from crates/hotblocks-sidecar/src/cli.rs rename to crates/hotblocks-retain/src/cli.rs index 56d0ca1..520eebb 100644 --- a/crates/hotblocks-sidecar/src/cli.rs +++ b/crates/hotblocks-retain/src/cli.rs @@ -1,8 +1,10 @@ +use std::path::PathBuf; + use clap::Parser; use url::Url; #[derive(Parser, Debug)] -#[command(version, about = "Hotblocks sidecar service", long_about = None)] +#[command(version, about = "Hotblocks retain service", long_about = None)] pub struct Cli { /// URL of the Hotblocks service to send dataset information to #[arg(long)] @@ -12,9 +14,9 @@ pub struct Cli { #[arg(long)] pub status_url: Url, - /// Dataset identifiers to track (can be specified multiple times) - #[arg(long = "dataset")] - pub dataset: Vec, + /// Path to the YAML config file listing datasets to track + #[arg(long)] + pub datasets_config: PathBuf, /// URL of the datasets YAML file listing available network datasets #[arg(long)] diff --git a/crates/hotblocks-sidecar/src/datasets.rs b/crates/hotblocks-retain/src/datasets.rs similarity index 100% rename from crates/hotblocks-sidecar/src/datasets.rs rename to crates/hotblocks-retain/src/datasets.rs diff --git a/crates/hotblocks-sidecar/src/hotblocks.rs b/crates/hotblocks-retain/src/hotblocks.rs similarity index 100% rename from crates/hotblocks-sidecar/src/hotblocks.rs rename to crates/hotblocks-retain/src/hotblocks.rs diff --git a/crates/hotblocks-sidecar/src/main.rs b/crates/hotblocks-retain/src/main.rs similarity index 72% rename from crates/hotblocks-sidecar/src/main.rs rename to crates/hotblocks-retain/src/main.rs index 6103d0b..a6eee29 100644 --- a/crates/hotblocks-sidecar/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -10,23 +10,30 @@ use cli::Cli; use std::collections::HashMap; use std::time::Duration; use tokio::time::Instant; -use types::DatasetId; +use types::{DatasetConfig, DatasetId, DatasetsConfig}; use url::Url; fn main() -> anyhow::Result<()> { let args = Cli::parse(); + let datasets_config: DatasetsConfig = { + let contents = std::fs::read_to_string(&args.datasets_config) + .with_context(|| format!("failed to read {}", args.datasets_config.display()))?; + serde_yaml::from_str(&contents) + .with_context(|| format!("failed to parse {}", args.datasets_config.display()))? + }; + init_tracing(); tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? .block_on( - Sidecar::new( + HotblocksRetain::new( args.hotblocks_url, args.status_url, args.datasets_url, - args.dataset, + datasets_config.datasets, Duration::from_secs(args.poll_interval_secs), Duration::from_secs(args.datasets_update_interval_secs), ) @@ -36,24 +43,24 @@ fn main() -> anyhow::Result<()> { Ok(()) } -struct Sidecar { +struct HotblocksRetain { client: reqwest::Client, hotblocks_url: Url, status_url: Url, datasets_url: Url, - datasets: Vec, + datasets: Vec, poll_interval: Duration, datasets_update_interval: Duration, name_to_id: HashMap, last_datasets_refresh: Instant, } -impl Sidecar { +impl HotblocksRetain { fn new( hotblocks_url: Url, status_url: Url, datasets_url: Url, - datasets: Vec, + datasets: Vec, poll_interval: Duration, datasets_update_interval: Duration, ) -> Self { @@ -110,26 +117,31 @@ impl Sidecar { .collect::>(); for dataset in &self.datasets { - let dataset_id = match self.name_to_id.get(dataset) { - Some(name) => name, - None => { - tracing::warn!(dataset, "dataset not found in manifest, skipping"); - continue; + let dataset_name = dataset.name.as_str(); + let dataset_id = if let Some(id) = &dataset.id { + id.as_str() + } else { + match self.name_to_id.get(dataset_name) { + Some(id) => id.as_str(), + None => { + tracing::warn!(dataset = dataset_name, "dataset not found in manifest, skipping"); + continue; + } } }; match statuses.get(dataset_id) { Some(Some(height)) => { - hotblocks::set_retention(&self.client, &self.hotblocks_url, dataset, *height) + hotblocks::set_retention(&self.client, &self.hotblocks_url, dataset_name, *height) .await - .with_context(|| format!("failed to update retention for {dataset}"))?; - tracing::info!(dataset, height, "updated retention policy"); + .with_context(|| format!("failed to update retention for {dataset_name}"))?; + tracing::info!(dataset = dataset_name, height, "updated retention policy"); } Some(None) => { - tracing::info!(dataset, "dataset has no reported height yet"); + tracing::info!(dataset = dataset_name, "dataset has no reported height yet"); } None => { - tracing::warn!(dataset, "dataset not found in status json"); + tracing::warn!(dataset = dataset_name, "dataset not found in status json"); } } } diff --git a/crates/hotblocks-sidecar/src/status.rs b/crates/hotblocks-retain/src/status.rs similarity index 100% rename from crates/hotblocks-sidecar/src/status.rs rename to crates/hotblocks-retain/src/status.rs diff --git a/crates/hotblocks-retain/src/types.rs b/crates/hotblocks-retain/src/types.rs new file mode 100644 index 0000000..35a3b88 --- /dev/null +++ b/crates/hotblocks-retain/src/types.rs @@ -0,0 +1,15 @@ +use serde::Deserialize; + +pub type DatasetId = String; // s3:// + +#[derive(Debug, Clone, Deserialize)] +pub struct DatasetConfig { + pub name: String, + #[serde(default)] + pub id: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DatasetsConfig { + pub datasets: Vec, +} diff --git a/crates/hotblocks-sidecar/src/types.rs b/crates/hotblocks-sidecar/src/types.rs deleted file mode 100644 index 257a76c..0000000 --- a/crates/hotblocks-sidecar/src/types.rs +++ /dev/null @@ -1 +0,0 @@ -pub type DatasetId = String; // s3:// From fb01b474d69343ce98e1b275f4185ae6f3a24f00 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Wed, 4 Mar 2026 17:33:07 +0530 Subject: [PATCH 05/10] apply retention after some delay --- crates/hotblocks-retain/src/cli.rs | 6 +-- crates/hotblocks-retain/src/main.rs | 75 ++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/crates/hotblocks-retain/src/cli.rs b/crates/hotblocks-retain/src/cli.rs index 520eebb..b458344 100644 --- a/crates/hotblocks-retain/src/cli.rs +++ b/crates/hotblocks-retain/src/cli.rs @@ -26,7 +26,7 @@ pub struct Cli { #[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))] pub datasets_update_interval_secs: u64, - /// Interval in seconds between polling the status endpoint - #[arg(long, default_value = "60", value_parser = clap::value_parser!(u64).range(1..))] - pub poll_interval_secs: u64, + /// Delay in seconds between fetching dataset heights and applying retention + #[arg(long, default_value = "300", value_parser = clap::value_parser!(u64).range(1..))] + pub retain_delay_secs: u64, } diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index a6eee29..f16c762 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -34,7 +34,7 @@ fn main() -> anyhow::Result<()> { args.status_url, args.datasets_url, datasets_config.datasets, - Duration::from_secs(args.poll_interval_secs), + Duration::from_secs(args.retain_delay_secs), Duration::from_secs(args.datasets_update_interval_secs), ) .run(), @@ -49,7 +49,7 @@ struct HotblocksRetain { status_url: Url, datasets_url: Url, datasets: Vec, - poll_interval: Duration, + retain_delay: Duration, datasets_update_interval: Duration, name_to_id: HashMap, last_datasets_refresh: Instant, @@ -61,7 +61,7 @@ impl HotblocksRetain { status_url: Url, datasets_url: Url, datasets: Vec, - poll_interval: Duration, + retain_delay: Duration, datasets_update_interval: Duration, ) -> Self { Self { @@ -70,7 +70,7 @@ impl HotblocksRetain { status_url, datasets_url, datasets, - poll_interval, + retain_delay, datasets_update_interval, name_to_id: HashMap::new(), last_datasets_refresh: Instant::now() - datasets_update_interval, @@ -78,15 +78,25 @@ impl HotblocksRetain { } async fn run(&mut self) -> anyhow::Result<()> { - let mut interval = tokio::time::interval(self.poll_interval); - loop { - interval.tick().await; self.maybe_refresh_datasets().await; - if let Err(err) = self.poll_once().await { - tracing::warn!(error = ?err, "failed to refresh retention settings"); - } + let status = match status::get_status(&self.client, self.status_url.as_str()).await { + Ok(status) => status, + Err(err) => { + tracing::warn!(error = ?err, "failed to fetch status"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + tracing::info!( + delay_secs = self.retain_delay.as_secs(), + "fetched status, waiting before applying retention" + ); + tokio::time::sleep(self.retain_delay).await; + + self.apply_retention(&status).await; } } @@ -107,13 +117,11 @@ impl HotblocksRetain { } } - async fn poll_once(&self) -> anyhow::Result<()> { - let status = status::get_status(&self.client, self.status_url.as_str()).await?; - + async fn apply_retention(&self, status: &status::SchedulingStatus) { let statuses = status .datasets - .into_iter() - .map(|dataset| (dataset.id, dataset.height)) + .iter() + .map(|dataset| (dataset.id.as_str(), dataset.height)) .collect::>(); for dataset in &self.datasets { @@ -124,7 +132,10 @@ impl HotblocksRetain { match self.name_to_id.get(dataset_name) { Some(id) => id.as_str(), None => { - tracing::warn!(dataset = dataset_name, "dataset not found in manifest, skipping"); + tracing::warn!( + dataset = dataset_name, + "dataset not found in manifest, skipping" + ); continue; } } @@ -132,21 +143,39 @@ impl HotblocksRetain { match statuses.get(dataset_id) { Some(Some(height)) => { - hotblocks::set_retention(&self.client, &self.hotblocks_url, dataset_name, *height) - .await - .with_context(|| format!("failed to update retention for {dataset_name}"))?; - tracing::info!(dataset = dataset_name, height, "updated retention policy"); + match hotblocks::set_retention( + &self.client, + &self.hotblocks_url, + dataset_name, + *height, + ) + .await + { + Ok(()) => { + tracing::info!( + dataset = dataset_name, + height, + "applied retention policy" + ); + } + Err(err) => { + tracing::warn!( + dataset = dataset_name, + height, + error = ?err, + "failed to apply retention" + ); + } + } } Some(None) => { tracing::info!(dataset = dataset_name, "dataset has no reported height yet"); } None => { - tracing::warn!(dataset = dataset_name, "dataset not found in status json"); + tracing::warn!(dataset = dataset_name, "dataset not found in status"); } } } - - Ok(()) } } From 78900f5b823ed96d3d687f084e3cc05777a4c417 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Wed, 4 Mar 2026 22:30:11 +0530 Subject: [PATCH 06/10] update datasets config format --- crates/hotblocks-retain/src/main.rs | 43 ++++++++++------------------ crates/hotblocks-retain/src/types.rs | 10 ++----- 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index f16c762..9cba0b5 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -10,13 +10,13 @@ use cli::Cli; use std::collections::HashMap; use std::time::Duration; use tokio::time::Instant; -use types::{DatasetConfig, DatasetId, DatasetsConfig}; +use types::{DatasetId, DatasetsConfig}; use url::Url; fn main() -> anyhow::Result<()> { let args = Cli::parse(); - let datasets_config: DatasetsConfig = { + let datasets: DatasetsConfig = { let contents = std::fs::read_to_string(&args.datasets_config) .with_context(|| format!("failed to read {}", args.datasets_config.display()))?; serde_yaml::from_str(&contents) @@ -33,7 +33,7 @@ fn main() -> anyhow::Result<()> { args.hotblocks_url, args.status_url, args.datasets_url, - datasets_config.datasets, + datasets, Duration::from_secs(args.retain_delay_secs), Duration::from_secs(args.datasets_update_interval_secs), ) @@ -48,7 +48,7 @@ struct HotblocksRetain { hotblocks_url: Url, status_url: Url, datasets_url: Url, - datasets: Vec, + datasets: DatasetsConfig, retain_delay: Duration, datasets_update_interval: Duration, name_to_id: HashMap, @@ -60,7 +60,7 @@ impl HotblocksRetain { hotblocks_url: Url, status_url: Url, datasets_url: Url, - datasets: Vec, + datasets: DatasetsConfig, retain_delay: Duration, datasets_update_interval: Duration, ) -> Self { @@ -124,18 +124,14 @@ impl HotblocksRetain { .map(|dataset| (dataset.id.as_str(), dataset.height)) .collect::>(); - for dataset in &self.datasets { - let dataset_name = dataset.name.as_str(); - let dataset_id = if let Some(id) = &dataset.id { - id.as_str() + for (dataset, props) in &self.datasets { + let dataset_id = if let Some(id) = props.as_ref().and_then(|p| p.id.as_deref()) { + id } else { - match self.name_to_id.get(dataset_name) { + match self.name_to_id.get(dataset) { Some(id) => id.as_str(), None => { - tracing::warn!( - dataset = dataset_name, - "dataset not found in manifest, skipping" - ); + tracing::warn!(dataset, "dataset not found in manifest, skipping"); continue; } } @@ -146,33 +142,24 @@ impl HotblocksRetain { match hotblocks::set_retention( &self.client, &self.hotblocks_url, - dataset_name, + dataset, *height, ) .await { Ok(()) => { - tracing::info!( - dataset = dataset_name, - height, - "applied retention policy" - ); + tracing::info!(dataset, height, "applied retention policy"); } Err(err) => { - tracing::warn!( - dataset = dataset_name, - height, - error = ?err, - "failed to apply retention" - ); + tracing::warn!(dataset, height, error = ?err, "failed to apply retention"); } } } Some(None) => { - tracing::info!(dataset = dataset_name, "dataset has no reported height yet"); + tracing::info!(dataset, "dataset has no reported height yet"); } None => { - tracing::warn!(dataset = dataset_name, "dataset not found in status"); + tracing::warn!(dataset, "dataset not found in status"); } } } diff --git a/crates/hotblocks-retain/src/types.rs b/crates/hotblocks-retain/src/types.rs index 35a3b88..81ae409 100644 --- a/crates/hotblocks-retain/src/types.rs +++ b/crates/hotblocks-retain/src/types.rs @@ -1,15 +1,11 @@ use serde::Deserialize; +use std::collections::HashMap; pub type DatasetId = String; // s3:// #[derive(Debug, Clone, Deserialize)] -pub struct DatasetConfig { - pub name: String, - #[serde(default)] +pub struct DatasetProps { pub id: Option, } -#[derive(Debug, Clone, Deserialize)] -pub struct DatasetsConfig { - pub datasets: Vec, -} +pub type DatasetsConfig = HashMap>; From 283d393fe5efb07c90e9fbbee876edea3be892e1 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Wed, 1 Apr 2026 18:22:08 +0300 Subject: [PATCH 07/10] respect effective_from from scheduling status part of NET-68 --- crates/hotblocks-retain/src/cli.rs | 4 ---- crates/hotblocks-retain/src/main.rs | 32 ++++++++++++++++++--------- crates/hotblocks-retain/src/status.rs | 1 + 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/crates/hotblocks-retain/src/cli.rs b/crates/hotblocks-retain/src/cli.rs index b458344..5af2a32 100644 --- a/crates/hotblocks-retain/src/cli.rs +++ b/crates/hotblocks-retain/src/cli.rs @@ -25,8 +25,4 @@ pub struct Cli { /// Interval in seconds between refreshing the datasets list #[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))] pub datasets_update_interval_secs: u64, - - /// Delay in seconds between fetching dataset heights and applying retention - #[arg(long, default_value = "300", value_parser = clap::value_parser!(u64).range(1..))] - pub retain_delay_secs: u64, } diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index 9cba0b5..5235339 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -8,7 +8,7 @@ use anyhow::Context; use clap::Parser; use cli::Cli; use std::collections::HashMap; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::time::Instant; use types::{DatasetId, DatasetsConfig}; use url::Url; @@ -34,7 +34,6 @@ fn main() -> anyhow::Result<()> { args.status_url, args.datasets_url, datasets, - Duration::from_secs(args.retain_delay_secs), Duration::from_secs(args.datasets_update_interval_secs), ) .run(), @@ -49,10 +48,10 @@ struct HotblocksRetain { status_url: Url, datasets_url: Url, datasets: DatasetsConfig, - retain_delay: Duration, datasets_update_interval: Duration, name_to_id: HashMap, last_datasets_refresh: Instant, + last_effective_from: Option, } impl HotblocksRetain { @@ -61,7 +60,6 @@ impl HotblocksRetain { status_url: Url, datasets_url: Url, datasets: DatasetsConfig, - retain_delay: Duration, datasets_update_interval: Duration, ) -> Self { Self { @@ -70,10 +68,10 @@ impl HotblocksRetain { status_url, datasets_url, datasets, - retain_delay, datasets_update_interval, name_to_id: HashMap::new(), last_datasets_refresh: Instant::now() - datasets_update_interval, + last_effective_from: None, } } @@ -90,11 +88,25 @@ impl HotblocksRetain { } }; - tracing::info!( - delay_secs = self.retain_delay.as_secs(), - "fetched status, waiting before applying retention" - ); - tokio::time::sleep(self.retain_delay).await; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + if now < status.effective_from { + if self.last_effective_from == Some(status.effective_from) { + tracing::info!("effective_from unchanged, re-checking in 5 minutes"); + tokio::time::sleep(Duration::from_secs(300)).await; + continue; + } + + self.last_effective_from = Some(status.effective_from); + let wait_secs = status.effective_from - now; + tracing::info!(wait_secs, effective_from = status.effective_from, "waiting for effective time"); + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + } else { + self.last_effective_from = Some(status.effective_from); + } self.apply_retention(&status).await; } diff --git a/crates/hotblocks-retain/src/status.rs b/crates/hotblocks-retain/src/status.rs index 06b4f70..ecab351 100644 --- a/crates/hotblocks-retain/src/status.rs +++ b/crates/hotblocks-retain/src/status.rs @@ -5,6 +5,7 @@ use sqd_primitives::BlockNumber; #[derive(Debug, Deserialize)] pub struct SchedulingStatus { pub datasets: Vec, + pub effective_from: u64, } #[derive(Debug, Deserialize)] From 4a4880755dcd12ef52985622e0d894f45bf49dff Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Wed, 1 Apr 2026 18:51:06 +0300 Subject: [PATCH 08/10] optimize how often retention is applied part of NET-68 --- crates/hotblocks-retain/src/main.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index 5235339..1ad4d95 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -88,27 +88,25 @@ impl HotblocksRetain { } }; + if self.last_effective_from == Some(status.effective_from) { + tracing::info!("effective_from unchanged, re-checking in 5 minutes"); + tokio::time::sleep(Duration::from_secs(300)).await; + continue; + } + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); if now < status.effective_from { - if self.last_effective_from == Some(status.effective_from) { - tracing::info!("effective_from unchanged, re-checking in 5 minutes"); - tokio::time::sleep(Duration::from_secs(300)).await; - continue; - } - - self.last_effective_from = Some(status.effective_from); let wait_secs = status.effective_from - now; tracing::info!(wait_secs, effective_from = status.effective_from, "waiting for effective time"); tokio::time::sleep(Duration::from_secs(wait_secs)).await; - } else { - self.last_effective_from = Some(status.effective_from); } self.apply_retention(&status).await; + self.last_effective_from = Some(status.effective_from); } } From 11008206b5ee5b03095a8138964e99365c15665e Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Wed, 1 Apr 2026 19:18:34 +0300 Subject: [PATCH 09/10] do not set last_effective_from if any retention apply failed --- crates/hotblocks-retain/src/main.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index 1ad4d95..6a9cab0 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -105,8 +105,11 @@ impl HotblocksRetain { tokio::time::sleep(Duration::from_secs(wait_secs)).await; } - self.apply_retention(&status).await; - self.last_effective_from = Some(status.effective_from); + if self.apply_retention(&status).await { + self.last_effective_from = Some(status.effective_from); + } else { + tokio::time::sleep(Duration::from_secs(30)).await; + } } } @@ -127,13 +130,15 @@ impl HotblocksRetain { } } - async fn apply_retention(&self, status: &status::SchedulingStatus) { + async fn apply_retention(&self, status: &status::SchedulingStatus) -> bool { let statuses = status .datasets .iter() .map(|dataset| (dataset.id.as_str(), dataset.height)) .collect::>(); + let mut all_success = true; + for (dataset, props) in &self.datasets { let dataset_id = if let Some(id) = props.as_ref().and_then(|p| p.id.as_deref()) { id @@ -161,6 +166,7 @@ impl HotblocksRetain { tracing::info!(dataset, height, "applied retention policy"); } Err(err) => { + all_success = false; tracing::warn!(dataset, height, error = ?err, "failed to apply retention"); } } @@ -173,6 +179,8 @@ impl HotblocksRetain { } } } + + all_success } } From 0cdb6fa1e553b13091f91538f3516554f89f39f6 Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Tue, 7 Apr 2026 21:07:55 +0300 Subject: [PATCH 10/10] allow to configure retain delay for hotblocks-retain part of NET-68 --- crates/hotblocks-retain/src/cli.rs | 4 ++++ crates/hotblocks-retain/src/main.rs | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/hotblocks-retain/src/cli.rs b/crates/hotblocks-retain/src/cli.rs index 5af2a32..1da7eca 100644 --- a/crates/hotblocks-retain/src/cli.rs +++ b/crates/hotblocks-retain/src/cli.rs @@ -25,4 +25,8 @@ pub struct Cli { /// Interval in seconds between refreshing the datasets list #[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))] pub datasets_update_interval_secs: u64, + + /// Additional delay in seconds after effective_from before applying retention + #[arg(long, default_value = "0")] + pub retain_delay_secs: u64, } diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs index 6a9cab0..4f18180 100644 --- a/crates/hotblocks-retain/src/main.rs +++ b/crates/hotblocks-retain/src/main.rs @@ -35,6 +35,7 @@ fn main() -> anyhow::Result<()> { args.datasets_url, datasets, Duration::from_secs(args.datasets_update_interval_secs), + Duration::from_secs(args.retain_delay_secs), ) .run(), )?; @@ -49,6 +50,7 @@ struct HotblocksRetain { datasets_url: Url, datasets: DatasetsConfig, datasets_update_interval: Duration, + retain_delay: Duration, name_to_id: HashMap, last_datasets_refresh: Instant, last_effective_from: Option, @@ -61,6 +63,7 @@ impl HotblocksRetain { datasets_url: Url, datasets: DatasetsConfig, datasets_update_interval: Duration, + retain_delay: Duration, ) -> Self { Self { client: reqwest::Client::new(), @@ -69,6 +72,7 @@ impl HotblocksRetain { datasets_url, datasets, datasets_update_interval, + retain_delay, name_to_id: HashMap::new(), last_datasets_refresh: Instant::now() - datasets_update_interval, last_effective_from: None, @@ -99,9 +103,15 @@ impl HotblocksRetain { .unwrap() .as_secs(); - if now < status.effective_from { - let wait_secs = status.effective_from - now; - tracing::info!(wait_secs, effective_from = status.effective_from, "waiting for effective time"); + let apply_at = status.effective_from + self.retain_delay.as_secs(); + if now < apply_at { + let wait_secs = apply_at - now; + tracing::info!( + wait_secs, + effective_from = status.effective_from, + retain_delay_secs = self.retain_delay.as_secs(), + "waiting for effective time + retain delay" + ); tokio::time::sleep(Duration::from_secs(wait_secs)).await; }