diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f6d3ca3e..0852dd49 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -221,7 +221,12 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + let blockchain = BlockChain::spawn( + store.clone(), + validator_keys, + aggregator.clone(), + attestation_committee_count, + ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the // AggregatorController — subnet subscriptions are decided once here and diff --git a/crates/blockchain/src/coverage.rs b/crates/blockchain/src/coverage.rs new file mode 100644 index 00000000..c827e1ee --- /dev/null +++ b/crates/blockchain/src/coverage.rs @@ -0,0 +1,321 @@ +//! Per-slot attestation aggregate coverage computation. +//! +//! Mirrors the producer side of [zeam#876](https://github.com/blockblaz/zeam/pull/876) +//! on top of the metrics registered by leanSpec PR #735. +//! +//! All `Coverage` instances are bound to a fixed `(validator_count, +//! committee_count)` pair from genesis state; ethlambda's validator set +//! is immutable across slots, so no resize handling is required. + +use ethlambda_storage::Store; +use ethlambda_types::{ + attestation::{AggregationBits, validator_indices}, + block::AggregatedSignatureProof, + primitives::HashTreeRoot, +}; + +use crate::metrics; + +/// Per-validator and per-subnet presence bitsets for one coverage section. +#[derive(Debug, Clone)] +pub struct Coverage { + seen: Vec, + has_subnet: Vec, +} + +impl Coverage { + pub fn new(validator_count: usize, committee_count: usize) -> Self { + Self { + seen: vec![false; validator_count], + has_subnet: vec![false; committee_count], + } + } + + /// Subnet for validator `vid` matches `crates/net/p2p/src/lib.rs:241` + /// (`vid % committee_count`). + pub fn add_bits(&mut self, bits: &AggregationBits) { + let committee_count = self.has_subnet.len(); + if committee_count == 0 { + return; + } + for vid in validator_indices(bits) { + let vid = vid as usize; + if vid < self.seen.len() { + self.seen[vid] = true; + self.has_subnet[vid % committee_count] = true; + } + } + } + + /// Convenience: merge all `proofs` for one entry. + pub fn add_proofs(&mut self, proofs: &[AggregatedSignatureProof]) { + for proof in proofs { + self.add_bits(&proof.participants); + } + } + + pub fn merge_from(&mut self, other: &Self) { + for (dst, &src) in self.seen.iter_mut().zip(other.seen.iter()) { + *dst |= src; + } + for (dst, &src) in self.has_subnet.iter_mut().zip(other.has_subnet.iter()) { + *dst |= src; + } + } + + pub fn count_seen(&self) -> usize { + self.seen.iter().filter(|&&b| b).count() + } + + pub fn count_subnets(&self) -> usize { + self.has_subnet.iter().filter(|&&b| b).count() + } + + pub fn seen(&self) -> &[bool] { + &self.seen + } + + /// Mark validator `vid` (and its derived subnet) as covered. + pub fn mark(&mut self, vid: usize, subnet: usize) { + if vid < self.seen.len() { + self.seen[vid] = true; + } + if subnet < self.has_subnet.len() { + self.has_subnet[subnet] = true; + } + } +} + +/// Symmetric-difference counts: `(a_only, b_only)` validators. +pub fn diff_counts(a: &Coverage, b: &Coverage) -> (usize, usize) { + let len = a.seen.len().min(b.seen.len()); + let mut a_only = 0; + let mut b_only = 0; + for i in 0..len { + match (a.seen[i], b.seen[i]) { + (true, false) => a_only += 1, + (false, true) => b_only += 1, + _ => {} + } + } + (a_only, b_only) +} + +/// Emit `validators{section, subnet="combined"}` + `subnets{section}` for one section. +/// +/// Per-subnet (`subnet="subnet_N"`) series intentionally stay at zero until a +/// future PR wires per-subnet emission; this matches zeam's current emission +/// pattern (one series per section). +pub fn record_section(section: &str, coverage: &Coverage) { + metrics::set_attestation_aggregate_coverage_validators( + section, + "combined", + coverage.count_seen() as i64, + ); + metrics::set_attestation_aggregate_coverage_subnets(section, coverage.count_subnets() as i64); +} + +/// Emit `diff_validators{direction}` for both directions. +pub fn record_diff(block_only: usize, timely_only: usize) { + metrics::set_attestation_aggregate_coverage_diff_validators("block_only", block_only as i64); + metrics::set_attestation_aggregate_coverage_diff_validators("timely_only", timely_only as i64); +} + +/// Emit the post-block-merge coverage report for `reporting_slot` (the slot +/// that just finished). Reads pre-merge / late / block snapshots from the +/// Store, computes `combined` as their union, and records all 5 metrics. +pub fn emit_post_block_report(store: &Store, committee_count: u64, reporting_slot: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + + // `timely` — pre-merge snapshot of new_payloads (i.e., "prev_new" in zeam). + let mut timely = Coverage::new(validator_count, cc); + if let Some(snap) = store.pre_merge_new_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + timely.add_bits(bits); + } + } + + // `late` — current new_payloads that match the reporting slot + // (arrived after the last merge). + let mut late = Coverage::new(validator_count, cc); + for (data, proofs) in store.new_aggregated_payloads().values() { + if data.slot == reporting_slot { + late.add_proofs(proofs); + } + } + + // `block` — participant bits from the most-recently-imported block, + // if and only if its slot matches. + let mut block = Coverage::new(validator_count, cc); + if let Some(snap) = store.last_block_coverage() + && snap.slot == reporting_slot + { + for bits in &snap.participant_bits { + block.add_bits(bits); + } + } + + // `combined` — union of all three sources. + let mut combined = Coverage::new(validator_count, cc); + combined.merge_from(&timely); + combined.merge_from(&late); + combined.merge_from(&block); + + record_section("timely", &timely); + record_section("late", &late); + record_section("block", &block); + record_section("combined", &combined); + + let (block_only, timely_only) = diff_counts(&block, &timely); + record_diff(block_only, timely_only); +} + +/// Emit `agg_start_new` coverage. Called right before fork-choice aggregation +/// runs (interval 2). +pub fn emit_agg_start_new(store: &Store, committee_count: u64) { + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let mut cov = Coverage::new(validator_count, committee_count as usize); + for (_, proofs) in store.new_aggregated_payloads().values() { + cov.add_proofs(proofs); + } + record_section("agg_start_new", &cov); +} + +/// Emit `proposal_payloads`, `proposal_gossip`, `proposal_combined` for a +/// block we are about to publish. We classify validators set in the final +/// block as either covered by an existing known-payload proof for that +/// AttestationData (`payloads`) or as gossip-only (`gossip`). +pub fn emit_proposal_coverage<'a, I>(store: &Store, committee_count: u64, selected: I) +where + I: IntoIterator, +{ + let validator_count = store.head_state().validators.len(); + if validator_count == 0 || committee_count == 0 { + return; + } + let cc = committee_count as usize; + + let mut combined = Coverage::new(validator_count, cc); + let mut payloads = Coverage::new(validator_count, cc); + let mut gossip = Coverage::new(validator_count, cc); + + // For each AttestationData in the final block, OR together the known + // payload proofs for that data — those validators are payload-covered. + let known = store.known_aggregated_payloads(); + let mut payload_seen = vec![false; validator_count]; + for att in selected { + combined.add_bits(&att.aggregation_bits); + let data_root = att.data.hash_tree_root(); + if let Some((_, proofs)) = known.get(&data_root) { + for proof in proofs { + for vid in validator_indices(&proof.participants) { + let vid = vid as usize; + if vid < payload_seen.len() { + payload_seen[vid] = true; + } + } + } + } + } + + for (vid, &is_final) in combined.seen().iter().enumerate() { + if !is_final { + continue; + } + let subnet = vid % cc; + if payload_seen[vid] { + payloads.mark(vid, subnet); + } else { + gossip.mark(vid, subnet); + } + } + + record_section("proposal_payloads", &payloads); + record_section("proposal_gossip", &gossip); + record_section("proposal_combined", &combined); +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::attestation::AggregationBits; + + fn make_bits(len: usize, indices: &[usize]) -> AggregationBits { + let mut bits = AggregationBits::with_length(len).unwrap(); + for &i in indices { + bits.set(i, true).unwrap(); + } + bits + } + + #[test] + fn add_bits_marks_validators_and_subnets() { + // 8 validators, 4 subnets → vid 1 → subnet 1, vid 5 → subnet 1, vid 6 → subnet 2. + let mut cov = Coverage::new(8, 4); + cov.add_bits(&make_bits(8, &[1, 5, 6])); + + assert!(!cov.seen()[0]); + assert!(cov.seen()[1]); + assert!(cov.seen()[5]); + assert!(cov.seen()[6]); + assert_eq!(cov.count_seen(), 3); + assert_eq!(cov.count_subnets(), 2); + } + + #[test] + fn merge_from_is_set_union() { + let mut a = Coverage::new(8, 4); + a.add_bits(&make_bits(8, &[0, 1])); + let mut b = Coverage::new(8, 4); + b.add_bits(&make_bits(8, &[1, 2])); + + a.merge_from(&b); + assert_eq!(a.count_seen(), 3); + assert!(a.seen()[0] && a.seen()[1] && a.seen()[2]); + } + + #[test] + fn diff_counts_is_symmetric_difference() { + let mut block = Coverage::new(8, 4); + block.add_bits(&make_bits(8, &[0, 1, 2])); + let mut timely = Coverage::new(8, 4); + timely.add_bits(&make_bits(8, &[1, 2, 3])); + + let (block_only, timely_only) = diff_counts(&block, &timely); + assert_eq!(block_only, 1); + assert_eq!(timely_only, 1); + } + + #[test] + fn empty_coverage_counts_zero() { + let cov = Coverage::new(8, 4); + assert_eq!(cov.count_seen(), 0); + assert_eq!(cov.count_subnets(), 0); + } + + #[test] + fn zero_committee_count_is_inert() { + let mut cov = Coverage::new(8, 0); + cov.add_bits(&make_bits(8, &[0, 1, 2])); + assert_eq!(cov.count_seen(), 0); + assert_eq!(cov.count_subnets(), 0); + } + + #[test] + fn add_bits_ignores_out_of_range_indices() { + let mut cov = Coverage::new(4, 2); + cov.add_bits(&make_bits(8, &[0, 5])); + assert!(cov.seen()[0]); + assert_eq!(cov.count_seen(), 1); + assert_eq!(cov.count_subnets(), 1); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index f07a7a7e..7f9a354a 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -27,6 +27,7 @@ use tracing::{error, info, trace, warn}; use crate::store::StoreError; pub mod aggregation; +pub mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -60,6 +61,7 @@ impl BlockChain { store: Store, validator_keys: HashMap, aggregator: AggregatorController, + attestation_committee_count: u64, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -86,6 +88,7 @@ impl BlockChain { pending_block_parents: HashMap::new(), current_aggregation: None, last_tick_instant: None, + attestation_committee_count, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -139,6 +142,10 @@ pub struct BlockChainServer { /// Last tick instant for measuring interval duration. last_tick_instant: Option, + + /// Number of attestation committees (= subnet count). Used by the + /// attestation aggregate coverage emission. + attestation_committee_count: u64, } impl BlockChainServer { @@ -187,7 +194,18 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); + // Emit the post-block attestation aggregate coverage report for the + // previous slot at the start of each new slot. + if interval == 0 && slot > 0 { + coverage::emit_post_block_report( + &self.store, + self.attestation_committee_count, + slot - 1, + ); + } + if interval == 2 && is_aggregator { + coverage::emit_agg_start_new(&self.store, self.attestation_committee_count); self.start_aggregation_session(slot, ctx).await; } @@ -341,6 +359,12 @@ impl BlockChainServer { return; }; + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); + // Sign the block root with the proposal key let block_root = block.hash_tree_root(); let Ok(proposer_signature) = self diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a9aef7a0..fac3adfc 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -4,6 +4,27 @@ use std::time::Duration; use ethlambda_metrics::*; +// --- Label sets --- + +/// Section labels for attestation aggregate coverage gauges. Order matches +/// the names printed in slot/report logs. +/// +/// Slot is the X-axis (time series), not a label dimension. +pub const ATTESTATION_AGGREGATE_COVERAGE_SECTIONS: &[&str] = &[ + "timely", + "late", + "block", + "combined", + "agg_start_new", + "proposal_payloads", + "proposal_gossip", + "proposal_combined", +]; + +/// Validator-coverage delta directions between block payloads and +/// locally-aggregated pre-merge (`timely`) payloads. +pub const ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS: &[&str] = &["block_only", "timely_only"]; + // --- Gauges --- static LEAN_HEAD_SLOT: std::sync::LazyLock = std::sync::LazyLock::new(|| { @@ -104,6 +125,42 @@ static LEAN_TABLE_BYTES: std::sync::LazyLock = std::sync::LazyLock: .unwrap() }); +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_validators", + "Validator coverage in attestation aggregate reports, labeled by section and \ + subnet. subnet=combined is the section total; subnet=subnet_N is per-subnet \ + coverage. Updated each slot (slot is the X-axis).", + &["section", "subnet"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_subnets", + "Number of covered subnets in attestation aggregate reports, labeled by section. \ + Updated each slot (slot is the X-axis).", + &["section"] + ) + .unwrap() + }); + +static LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_attestation_aggregate_coverage_diff_validators", + "Count of validators in the symmetric difference between block-included aggregates \ + and locally-aggregated pre-merge (timely) aggregates for the same slot. \ + direction=block_only: in block but not in local pool. direction=timely_only: in \ + local pool but not in block. Updated each slot (slot is the X-axis).", + &["direction"] + ) + .unwrap() + }); + // --- Counters --- static LEAN_ATTESTATIONS_VALID_TOTAL: std::sync::LazyLock = @@ -409,6 +466,25 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_IS_AGGREGATOR); std::sync::LazyLock::force(&LEAN_ATTESTATION_COMMITTEE_COUNT); std::sync::LazyLock::force(&LEAN_TABLE_BYTES); + // Attestation aggregate coverage (leanMetrics: Fork-Choice Metrics). + // Per upstream leanSpec, seed only the combined-subnet series for each + // section; per-subnet series appear lazily when instrumentation writes them. + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS); + std::sync::LazyLock::force(&LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS); + for §ion in ATTESTATION_AGGREGATE_COVERAGE_SECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, "combined"]) + .set(0); + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(0); + } + for &direction in ATTESTATION_AGGREGATE_COVERAGE_DIFF_DIRECTIONS { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(0); + } // Counters std::sync::LazyLock::force(&LEAN_ATTESTATIONS_VALID_TOTAL); std::sync::LazyLock::force(&LEAN_ATTESTATIONS_INVALID_TOTAL); @@ -609,6 +685,27 @@ pub fn set_attestation_committee_count(count: u64) { LEAN_ATTESTATION_COMMITTEE_COUNT.set(count.try_into().unwrap_or_default()); } +/// Set `lean_attestation_aggregate_coverage_validators{section, subnet}`. +pub fn set_attestation_aggregate_coverage_validators(section: &str, subnet: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_VALIDATORS + .with_label_values(&[section, subnet]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_subnets{section}`. +pub fn set_attestation_aggregate_coverage_subnets(section: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_SUBNETS + .with_label_values(&[section]) + .set(value); +} + +/// Set `lean_attestation_aggregate_coverage_diff_validators{direction}`. +pub fn set_attestation_aggregate_coverage_diff_validators(direction: &str, value: i64) { + LEAN_ATTESTATION_AGGREGATE_COVERAGE_DIFF_VALIDATORS + .with_label_values(&[direction]) + .set(value); +} + /// Observe the depth of a fork choice reorg. pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 935d3f75..d5626b75 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -5,7 +5,7 @@ use ethlambda_state_transition::{ attestation_data_matches_chain, is_proposer, justified_slots_ops, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, Store}; +use ethlambda_storage::{CoverageSnapshot, ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -40,12 +40,35 @@ pub struct PostBlockCheckpoints { /// Accept new aggregated payloads, promoting them to known for fork choice. fn accept_new_attestations(store: &mut Store, log_tree: bool) { + snapshot_pre_merge_new_coverage(store); store.promote_new_aggregated_payloads(); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); metrics::update_latest_known_aggregated_payloads(store.known_aggregated_payloads_count()); update_head(store, log_tree); } +/// Capture the participant bits of every entry in `new_payloads` for the +/// attestation aggregate coverage report. Stored on the Store so the +/// post-block report at the next slot boundary can read it. +fn snapshot_pre_merge_new_coverage(store: &Store) { + let new_payloads = store.new_aggregated_payloads(); + if new_payloads.is_empty() { + return; + } + let mut slot: u64 = 0; + let mut participant_bits: Vec = Vec::new(); + for (data, proofs) in new_payloads.values() { + slot = data.slot; + for proof in proofs { + participant_bits.push(proof.participants.clone()); + } + } + store.save_pre_merge_new_coverage(CoverageSnapshot { + slot, + participant_bits, + }); +} + /// Update the head based on the fork choice rule. /// /// When `log_tree` is true, also computes block weights and logs an ASCII @@ -514,11 +537,13 @@ fn on_block_core( // Store one proof per attestation data in known aggregated payloads. let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); + let mut block_participant_bits: Vec = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); + block_participant_bits.push(att.aggregation_bits.clone()); // Count each participating validator as a valid attestation let count = validator_indices(&att.aggregation_bits).count() as u64; metrics::inc_attestations_valid(count); @@ -526,6 +551,13 @@ fn on_block_core( store.insert_known_aggregated_payloads_batch(known_entries); + // Capture block-included participant bits for the attestation aggregate + // coverage report (observability-only; does not affect fork choice). + store.save_last_block_coverage(CoverageSnapshot { + slot, + participant_bits: block_participant_bits, + }); + // Update forkchoice head based on new block and attestations update_head(store, false); diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 9662a36c..ca067f9b 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,4 +3,4 @@ pub mod backend; mod store; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, GetForkchoiceStoreError, Store}; +pub use store::{CoverageSnapshot, ForkCheckpoints, GetForkchoiceStoreError, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 04b42745..8927b885 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -4,7 +4,10 @@ use std::sync::{Arc, LazyLock, Mutex}; use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::{AttestationData, HashedAttestationData, bits_is_subset, blank_xmss_signature}, + attestation::{ + AggregationBits, AttestationData, HashedAttestationData, bits_is_subset, + blank_xmss_signature, + }, block::{ AggregatedSignatureProof, AttestationSignatures, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, @@ -485,6 +488,17 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } +/// Snapshot of `AggregationBits` for one slot, used by the attestation +/// aggregate coverage report. +/// +/// Holds raw participant bits; the consumer (blockchain crate) constructs +/// `Coverage` at emit time using the current validator and committee counts. +#[derive(Debug, Clone)] +pub struct CoverageSnapshot { + pub slot: u64, + pub participant_bits: Vec, +} + /// Fork choice store backed by a pluggable storage backend. /// /// The Store maintains all state required for fork choice and block processing: @@ -507,6 +521,12 @@ pub struct Store { known_payloads: Arc>, /// In-memory gossip signatures, consumed at interval 2 aggregation. gossip_signatures: Arc>, + /// Snapshot of `new_payloads` participant bits captured right before each + /// promote-to-known. Observability-only. + pre_merge_new_coverage: Arc>>, + /// Snapshot of the most-recently-imported block's aggregated attestation + /// participant bits. Reset on each imported block. Observability-only. + last_block_coverage: Arc>>, } impl Store { @@ -641,6 +661,8 @@ impl Store { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1251,6 +1273,42 @@ impl Store { self.known_payloads.lock().unwrap().len() } + /// Returns a snapshot of new (pending) payloads as (AttestationData, Vec) pairs. + /// + /// Mirrors [`known_aggregated_payloads`]. Used by the attestation aggregate + /// coverage report to compute coverage from `new_payloads` before promote. + pub fn new_aggregated_payloads( + &self, + ) -> HashMap)> { + let buf = self.new_payloads.lock().unwrap(); + buf.data + .iter() + .map(|(root, entry)| (*root, (entry.data.clone(), entry.proofs.clone()))) + .collect() + } + + // ============ Coverage Snapshots ============ + // + // Observability-only state captured by `accept_new_attestations` and + // `on_block_core` in the blockchain crate. Read once per slot by the + // attestation aggregate coverage report. + + pub fn save_pre_merge_new_coverage(&self, snapshot: CoverageSnapshot) { + *self.pre_merge_new_coverage.lock().unwrap() = Some(snapshot); + } + + pub fn pre_merge_new_coverage(&self) -> Option { + self.pre_merge_new_coverage.lock().unwrap().clone() + } + + pub fn save_last_block_coverage(&self, snapshot: CoverageSnapshot) { + *self.last_block_coverage.lock().unwrap() = Some(snapshot); + } + + pub fn last_block_coverage(&self) -> Option { + self.last_block_coverage.lock().unwrap().clone() + } + /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { let gossip = self.gossip_signatures.lock().unwrap(); @@ -1424,6 +1482,8 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } @@ -1437,6 +1497,8 @@ mod tests { gossip_signatures: Arc::new(Mutex::new(GossipSignatureBuffer::new( GOSSIP_SIGNATURE_CAP, ))), + pre_merge_new_coverage: Arc::new(Mutex::new(None)), + last_block_coverage: Arc::new(Mutex::new(None)), } } }