diff --git a/Cargo.lock b/Cargo.lock index becafa4..cdbd3f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4917,6 +4917,23 @@ dependencies = [ "url", ] +[[package]] +name = "sqd-hotblocks-retain" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "reqwest", + "serde", + "serde_json", + "serde_yaml", + "sqd-primitives", + "tokio", + "tracing", + "tracing-subscriber", + "url", +] + [[package]] name = "sqd-polars" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 3dcb0cc..efa4e4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/data-source", "crates/dataset", "crates/hotblocks", + "crates/hotblocks-retain", "crates/polars", "crates/primitives", "crates/query", diff --git a/Dockerfile b/Dockerfile index 2252b1e..bcf8bc6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,16 @@ COPY --from=hotblocks-builder /app/target/release/sqd-hotblocks . ENTRYPOINT ["/app/sqd-hotblocks"] +FROM builder AS hotblocks-retain-builder +RUN cargo build -p sqd-hotblocks-retain --release + + +FROM rust AS hotblocks-retain +WORKDIR /app +COPY --from=hotblocks-retain-builder /app/target/release/sqd-hotblocks-retain . +ENTRYPOINT ["/app/sqd-hotblocks-retain"] + + FROM builder AS archive-builder RUN cargo build -p sqd-archive --release diff --git a/crates/hotblocks-retain/Cargo.toml b/crates/hotblocks-retain/Cargo.toml new file mode 100644 index 0000000..4b49b47 --- /dev/null +++ b/crates/hotblocks-retain/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sqd-hotblocks-retain" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true, features = ["derive", "env"] } +reqwest = { workspace = true, features = ["gzip", "json", "stream"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +sqd-primitives = { path = "../primitives" } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +url = { workspace = true } diff --git a/crates/hotblocks-retain/src/cli.rs b/crates/hotblocks-retain/src/cli.rs new file mode 100644 index 0000000..1da7eca --- /dev/null +++ b/crates/hotblocks-retain/src/cli.rs @@ -0,0 +1,32 @@ +use std::path::PathBuf; + +use clap::Parser; +use url::Url; + +#[derive(Parser, Debug)] +#[command(version, about = "Hotblocks retain service", long_about = None)] +pub struct Cli { + /// URL of the Hotblocks service to send dataset information to + #[arg(long)] + pub hotblocks_url: Url, + + /// URL of the status endpoint to poll for dataset updates + #[arg(long)] + pub status_url: Url, + + /// Path to the YAML config file listing datasets to track + #[arg(long)] + pub datasets_config: PathBuf, + + /// URL of the datasets YAML file listing available network datasets + #[arg(long)] + pub datasets_url: Url, + + /// Interval in seconds between refreshing the datasets list + #[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))] + pub datasets_update_interval_secs: u64, + + /// Additional delay in seconds after effective_from before applying retention + #[arg(long, default_value = "0")] + pub retain_delay_secs: u64, +} diff --git a/crates/hotblocks-retain/src/datasets.rs b/crates/hotblocks-retain/src/datasets.rs new file mode 100644 index 0000000..df5109c --- /dev/null +++ b/crates/hotblocks-retain/src/datasets.rs @@ -0,0 +1,40 @@ +use crate::types::DatasetId; +use serde::Deserialize; +use std::collections::HashMap; +use url::Url; + +#[derive(Deserialize)] +struct Dataset { + name: String, + id: DatasetId, +} + +#[derive(Deserialize)] +struct DatasetsFile { + #[serde(rename = "sqd-network-datasets")] + sqd_network_datasets: Vec, +} + +/// Downloads the datasets manifest and returns a map from dataset name to dataset ID. +pub async fn get_name_to_id( + client: &reqwest::Client, + url: &Url, +) -> anyhow::Result> { + let bytes = client + .get(url.as_str()) + .send() + .await? + .error_for_status()? + .bytes() + .await?; + + let file: DatasetsFile = serde_yaml::from_slice(&bytes)?; + + let map = file + .sqd_network_datasets + .into_iter() + .map(|d| (d.name, d.id)) + .collect(); + + Ok(map) +} diff --git a/crates/hotblocks-retain/src/hotblocks.rs b/crates/hotblocks-retain/src/hotblocks.rs new file mode 100644 index 0000000..75acbef --- /dev/null +++ b/crates/hotblocks-retain/src/hotblocks.rs @@ -0,0 +1,21 @@ +use reqwest::Client; +use sqd_primitives::BlockNumber; +use url::Url; + +pub async fn set_retention( + client: &Client, + base_url: &Url, + dataset: &str, + from_block: BlockNumber, +) -> anyhow::Result<()> { + let retention_url = base_url.join(&format!("/datasets/{dataset}/retention"))?; + + client + .post(retention_url) + .json(&serde_json::json!({"FromBlock": {"number": from_block}})) + .send() + .await? + .error_for_status()?; + + Ok(()) +} diff --git a/crates/hotblocks-retain/src/main.rs b/crates/hotblocks-retain/src/main.rs new file mode 100644 index 0000000..4f18180 --- /dev/null +++ b/crates/hotblocks-retain/src/main.rs @@ -0,0 +1,216 @@ +mod cli; +mod datasets; +mod hotblocks; +mod status; +mod types; + +use anyhow::Context; +use clap::Parser; +use cli::Cli; +use std::collections::HashMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::time::Instant; +use types::{DatasetId, DatasetsConfig}; +use url::Url; + +fn main() -> anyhow::Result<()> { + let args = Cli::parse(); + + let datasets: DatasetsConfig = { + let contents = std::fs::read_to_string(&args.datasets_config) + .with_context(|| format!("failed to read {}", args.datasets_config.display()))?; + serde_yaml::from_str(&contents) + .with_context(|| format!("failed to parse {}", args.datasets_config.display()))? + }; + + init_tracing(); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on( + HotblocksRetain::new( + args.hotblocks_url, + args.status_url, + args.datasets_url, + datasets, + Duration::from_secs(args.datasets_update_interval_secs), + Duration::from_secs(args.retain_delay_secs), + ) + .run(), + )?; + + Ok(()) +} + +struct HotblocksRetain { + client: reqwest::Client, + hotblocks_url: Url, + status_url: Url, + datasets_url: Url, + datasets: DatasetsConfig, + datasets_update_interval: Duration, + retain_delay: Duration, + name_to_id: HashMap, + last_datasets_refresh: Instant, + last_effective_from: Option, +} + +impl HotblocksRetain { + fn new( + hotblocks_url: Url, + status_url: Url, + datasets_url: Url, + datasets: DatasetsConfig, + datasets_update_interval: Duration, + retain_delay: Duration, + ) -> Self { + Self { + client: reqwest::Client::new(), + hotblocks_url, + status_url, + datasets_url, + datasets, + datasets_update_interval, + retain_delay, + name_to_id: HashMap::new(), + last_datasets_refresh: Instant::now() - datasets_update_interval, + last_effective_from: None, + } + } + + async fn run(&mut self) -> anyhow::Result<()> { + loop { + self.maybe_refresh_datasets().await; + + let status = match status::get_status(&self.client, self.status_url.as_str()).await { + Ok(status) => status, + Err(err) => { + tracing::warn!(error = ?err, "failed to fetch status"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + if self.last_effective_from == Some(status.effective_from) { + tracing::info!("effective_from unchanged, re-checking in 5 minutes"); + tokio::time::sleep(Duration::from_secs(300)).await; + continue; + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let apply_at = status.effective_from + self.retain_delay.as_secs(); + if now < apply_at { + let wait_secs = apply_at - now; + tracing::info!( + wait_secs, + effective_from = status.effective_from, + retain_delay_secs = self.retain_delay.as_secs(), + "waiting for effective time + retain delay" + ); + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + } + + if self.apply_retention(&status).await { + self.last_effective_from = Some(status.effective_from); + } else { + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + + async fn maybe_refresh_datasets(&mut self) { + if self.last_datasets_refresh.elapsed() < self.datasets_update_interval { + return; + } + + match datasets::get_name_to_id(&self.client, &self.datasets_url).await { + Ok(map) => { + tracing::info!("refreshed datasets manifest"); + self.name_to_id = map; + self.last_datasets_refresh = Instant::now(); + } + Err(err) => { + tracing::warn!(error = ?err, "failed to refresh datasets manifest"); + } + } + } + + async fn apply_retention(&self, status: &status::SchedulingStatus) -> bool { + let statuses = status + .datasets + .iter() + .map(|dataset| (dataset.id.as_str(), dataset.height)) + .collect::>(); + + let mut all_success = true; + + for (dataset, props) in &self.datasets { + let dataset_id = if let Some(id) = props.as_ref().and_then(|p| p.id.as_deref()) { + id + } else { + match self.name_to_id.get(dataset) { + Some(id) => id.as_str(), + None => { + tracing::warn!(dataset, "dataset not found in manifest, skipping"); + continue; + } + } + }; + + match statuses.get(dataset_id) { + Some(Some(height)) => { + match hotblocks::set_retention( + &self.client, + &self.hotblocks_url, + dataset, + *height, + ) + .await + { + Ok(()) => { + tracing::info!(dataset, height, "applied retention policy"); + } + Err(err) => { + all_success = false; + tracing::warn!(dataset, height, error = ?err, "failed to apply retention"); + } + } + } + Some(None) => { + tracing::info!(dataset, "dataset has no reported height yet"); + } + None => { + tracing::warn!(dataset, "dataset not found in status"); + } + } + } + + all_success + } +} + +fn init_tracing() { + use std::io::IsTerminal; + + let env_filter = tracing_subscriber::EnvFilter::builder().parse_lossy( + std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV).unwrap_or("info".to_string()), + ); + + if std::io::stdout().is_terminal() { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .compact() + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .json() + .with_current_span(false) + .init(); + } +} diff --git a/crates/hotblocks-retain/src/status.rs b/crates/hotblocks-retain/src/status.rs new file mode 100644 index 0000000..ecab351 --- /dev/null +++ b/crates/hotblocks-retain/src/status.rs @@ -0,0 +1,26 @@ +use crate::types::DatasetId; +use serde::Deserialize; +use sqd_primitives::BlockNumber; + +#[derive(Debug, Deserialize)] +pub struct SchedulingStatus { + pub datasets: Vec, + pub effective_from: u64, +} + +#[derive(Debug, Deserialize)] +pub struct DatasetStatus { + pub id: DatasetId, + pub height: Option, +} + +pub async fn get_status(client: &reqwest::Client, url: &str) -> anyhow::Result { + let status = client + .get(url) + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(status) +} diff --git a/crates/hotblocks-retain/src/types.rs b/crates/hotblocks-retain/src/types.rs new file mode 100644 index 0000000..81ae409 --- /dev/null +++ b/crates/hotblocks-retain/src/types.rs @@ -0,0 +1,11 @@ +use serde::Deserialize; +use std::collections::HashMap; + +pub type DatasetId = String; // s3:// + +#[derive(Debug, Clone, Deserialize)] +pub struct DatasetProps { + pub id: Option, +} + +pub type DatasetsConfig = HashMap>;