Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub enum CheckpointSyncError {
BlockHeaderJustifiedRootMismatch,
#[error("anchor block does not match anchor state")]
AnchorPairingMismatch,
#[error("all checkpoint peers failed")]
AllPeersFailed,
}

/// Build the HTTP client used for checkpoint sync fetches.
Expand Down
113 changes: 73 additions & 40 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052).
/// Base URL(s) of checkpoint-sync peer API servers (e.g., http://peer:5052).
/// When set, skips genesis initialization and fetches the finalized state
/// and block from the peer's `/lean/v0/states/finalized` and
/// and block from each peer's `/lean/v0/states/finalized` and
/// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a
/// URL ending in `/lean/v0/states/finalized` is accepted and the trailing
/// path is stripped.
#[arg(long)]
checkpoint_sync_url: Option<String>,
///
/// Multiple URLs may be supplied for redundancy, either comma-separated
/// (`--checkpoint-sync-url u1,u2`) or by repeating the flag
/// (`--checkpoint-sync-url u1 --checkpoint-sync-url u2`). URLs are tried
/// in order; the first one that succeeds is used and any failures fall
/// over to the next URL. Startup only aborts if every URL fails.
#[arg(long, value_delimiter = ',')]
checkpoint_sync_url: Option<Vec<String>>,
/// Whether this node acts as a committee aggregator.
///
/// Seeds the initial value of the live aggregator flag shared by the
Expand Down Expand Up @@ -207,7 +213,7 @@ async fn main() -> eyre::Result<()> {
let backend = Arc::new(RocksDBBackend::open(&data_dir).expect("Failed to open RocksDB"));

let store = fetch_initial_state(
options.checkpoint_sync_url.as_deref(),
options.checkpoint_sync_url.as_deref().unwrap_or(&[]),
&genesis_config,
backend.clone(),
)
Expand Down Expand Up @@ -558,12 +564,45 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
bytes
}

/// Fetch the finalized anchor from a single checkpoint URL, retrying transient
/// races where the peer advances finalization between the state and block
/// fetches.
async fn try_checkpoint_url(
url: &str,
genesis_time: u64,
validators: &[ethlambda_types::state::Validator],
) -> Result<(State, ethlambda_types::block::SignedBlock), checkpoint_sync::CheckpointSyncError> {
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

let mut attempt = 1;
loop {
match checkpoint_sync::fetch_finalized_anchor(url, genesis_time, validators).await {
Ok(pair) => return Ok(pair),
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
%url,
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
}
}

/// Fetch the initial state for the node.
///
/// If `checkpoint_url` is provided, performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block in parallel from a
/// remote peer. Otherwise, creates a genesis state from the local genesis
/// configuration.
/// If `checkpoint_urls` is empty, creates a genesis state from the local
/// genesis configuration. Otherwise performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block from a peer. URLs are
/// tried in order: the first peer that succeeds wins, and failures fall over
/// to the next URL. Startup only aborts if every URL fails.
///
/// Fetching the matching signed block lets the local store serve a valid
/// anchor via the `BlocksByRoot` req-resp protocol; without it, peers
Expand All @@ -572,7 +611,7 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
///
/// # Arguments
///
/// * `checkpoint_url` - Optional base URL to a peer's API server
/// * `checkpoint_urls` - Zero or more base URLs of peer API servers
/// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation)
/// * `backend` - Storage backend for Store creation
///
Expand All @@ -581,49 +620,43 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
/// `Ok(Store)` on success, or `Err(CheckpointSyncError)` if checkpoint sync fails.
/// Genesis path is infallible and always returns `Ok`.
async fn fetch_initial_state(
checkpoint_url: Option<&str>,
checkpoint_urls: &[String],
genesis: &GenesisConfig,
backend: Arc<dyn StorageBackend>,
) -> Result<Store, checkpoint_sync::CheckpointSyncError> {
let validators = genesis.validators();

let Some(checkpoint_url) = checkpoint_url else {
if checkpoint_urls.is_empty() {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
};

// Checkpoint sync path
info!(%checkpoint_url, "Starting checkpoint sync");

// The state and block are fetched in parallel; if the peer advances
// finalization between the two requests the pair won't match. Retry a
// small number of times so this transient race doesn't fail node startup.
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
// Checkpoint sync path: try URLs in order, fail over to the next on error.
// Log only the count β€” URLs may carry basic-auth credentials or token query
// parameters; per-URL log lines below redact those before emission.
info!(
url_count = checkpoint_urls.len(),
"Starting checkpoint sync"
);

let mut attempt = 1;
let mut iter = checkpoint_urls.iter().peekable();
let (state, signed_block) = loop {
match checkpoint_sync::fetch_finalized_anchor(
checkpoint_url,
genesis.genesis_time,
&validators,
)
.await
{
Ok(pair) => break pair,
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
let Some(url) = iter.next() else {
return Err(checkpoint_sync::CheckpointSyncError::AllPeersFailed);
};
match try_checkpoint_url(url, genesis.genesis_time, &validators).await {
Ok(pair) => {
info!(%url, "Checkpoint sync successful with this peer");
break pair;
}
Err(err) => {
if iter.peek().is_some() {
warn!(%url, %err, "Checkpoint sync failed for this peer; trying next URL");
} else {
warn!(%url, %err, "Checkpoint sync failed for this peer; no more URLs to try");
}
}
Err(err) => return Err(err),
}
};

Expand Down