Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 136 additions & 8 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,32 @@ impl PayloadBuffer {
.collect()
}

/// Prune payload entries whose attestation target slot is at or below `finalized_slot`.
///
/// Mirrors leanSpec's `prune_stale_attestation_data`: an entry is stale once its
/// target checkpoint is finalized — it can no longer contribute to fork choice and
/// keeping it around only pollutes `existing_proofs_for_data` lookups, occasionally
/// forcing recursive aggregation when plain XMSS aggregation would suffice.
///
/// Returns the number of data_root entries removed.
fn prune(&mut self, finalized_slot: u64) -> usize {
let before = self.data.len();
let total_proofs = &mut self.total_proofs;
self.data.retain(|_root, entry| {
if entry.data.target.slot > finalized_slot {
true
} else {
*total_proofs -= entry.proofs.len();
false
}
});
let pruned = before - self.data.len();
if pruned > 0 {
self.order.retain(|r| self.data.contains_key(r));
}
pruned
}
Comment thread
MegaRedHand marked this conversation as resolved.

/// Extract per-validator latest attestations from proofs' participation bits.
///
/// Iterates entries in insertion order (via `self.order`) so that, when two
Expand Down Expand Up @@ -400,20 +426,20 @@ impl GossipSignatureBuffer {
///
/// Returns the number of data_root entries pruned.
fn prune(&mut self, finalized_slot: u64) -> usize {
let mut pruned_roots: HashSet<H256> = HashSet::new();
self.data.retain(|root, entry| {
let before = self.data.len();
self.data.retain(|_root, entry| {
if entry.data.slot > finalized_slot {
true
} else {
self.total_signatures -= entry.signatures.len();
pruned_roots.insert(*root);
false
}
});
if !pruned_roots.is_empty() {
self.order.retain(|r| !pruned_roots.contains(r));
let pruned = before - self.data.len();
if pruned > 0 {
self.order.retain(|r| self.data.contains_key(r));
}
pruned_roots.len()
pruned
}

/// Returns a snapshot of all gossip signatures grouped by attestation data.
Expand Down Expand Up @@ -727,11 +753,12 @@ impl Store {
{
let pruned_chain = self.prune_live_chain(finalized.slot);
let pruned_sigs = self.prune_gossip_signatures(finalized.slot);
let pruned_payloads = self.prune_stale_aggregated_payloads(finalized.slot);

if pruned_chain > 0 || pruned_sigs > 0 {
if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 {
info!(
finalized_slot = finalized.slot,
pruned_chain, pruned_sigs, "Pruned finalized data"
pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data"
);
}
}
Expand Down Expand Up @@ -830,6 +857,18 @@ impl Store {
gossip.prune(finalized_slot)
}

/// Prune aggregated payload buffers (new + known) whose target slot is at or below
/// `finalized_slot`.
///
/// Mirrors leanSpec's `prune_stale_attestation_data` for the two aggregated payload
/// pools (gossip signatures are pruned separately by `prune_gossip_signatures`).
/// Returns the total number of data_root entries removed across both buffers.
pub fn prune_stale_aggregated_payloads(&mut self, finalized_slot: u64) -> usize {
let pruned_new = self.new_payloads.lock().unwrap().prune(finalized_slot);
let pruned_known = self.known_payloads.lock().unwrap().prune(finalized_slot);
pruned_new + pruned_known
}

/// Prune old states beyond the retention window.
///
/// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any
Expand Down Expand Up @@ -2075,6 +2114,95 @@ mod tests {
assert_eq!(buf.total_proofs, 2);
}

#[test]
fn payload_buffer_prune_drops_entries_with_finalized_target() {
let mut buf = PayloadBuffer::new(10);
let target_a = H256([0xaa; 32]);
let target_b = H256([0xbb; 32]);
let target_c = H256([0xcc; 32]);

// Three entries at different target slots: 3, 5, 7.
let data_3 = make_att_data_for_target(3, target_a);
let data_5 = make_att_data_for_target(5, target_b);
let data_7 = make_att_data_for_target(7, target_c);
let root_3 = data_3.hash_tree_root();
let root_5 = data_5.hash_tree_root();
let root_7 = data_7.hash_tree_root();

buf.push(
HashedAttestationData::new(data_3),
make_proof_for_validators(&[0]),
);
buf.push(
HashedAttestationData::new(data_5),
make_proof_for_validators(&[1, 2]),
);
buf.push(
HashedAttestationData::new(data_7),
make_proof_for_validators(&[3]),
);
assert_eq!(buf.total_proofs, 3);

// Finalized slot 5 prunes targets 3 and 5 (≤ 5), keeps target 7.
let pruned = buf.prune(5);
assert_eq!(pruned, 2);
assert!(!buf.data.contains_key(&root_3));
assert!(!buf.data.contains_key(&root_5));
assert!(buf.data.contains_key(&root_7));
assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.order.len(), 1);
assert_eq!(buf.order.front(), Some(&root_7));
}

#[test]
fn payload_buffer_prune_noop_when_nothing_stale() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data_for_target(10, H256([0xaa; 32]));
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[0]),
);

let pruned = buf.prune(5);
assert_eq!(pruned, 0);
assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.order.len(), 1);
}

#[test]
fn store_prune_stale_aggregated_payloads_clears_both_buffers() {
let mut store = Store::test_store();

let stale = make_att_data_for_target(2, H256([0xaa; 32]));
let fresh = make_att_data_for_target(10, H256([0xbb; 32]));

store.insert_new_aggregated_payload(
HashedAttestationData::new(stale.clone()),
make_proof_for_validators(&[0]),
);
store.insert_known_aggregated_payload(
HashedAttestationData::new(stale),
make_proof_for_validators(&[1]),
);
store.insert_new_aggregated_payload(
HashedAttestationData::new(fresh.clone()),
make_proof_for_validators(&[2]),
);
store.insert_known_aggregated_payload(
HashedAttestationData::new(fresh),
make_proof_for_validators(&[3]),
);

assert_eq!(store.new_aggregated_payloads_count(), 2);
assert_eq!(store.known_aggregated_payloads_count(), 2);

// Finalized slot 5: stale (target.slot == 2) is dropped from both buffers.
let pruned = store.prune_stale_aggregated_payloads(5);
assert_eq!(pruned, 2);
assert_eq!(store.new_aggregated_payloads_count(), 1);
assert_eq!(store.known_aggregated_payloads_count(), 1);
}

/// Build an attestation message at `slot` whose target points at `target_root`,
/// distinct from the default zero target so two such datas have different roots.
fn make_att_data_for_target(slot: u64, target_root: H256) -> AttestationData {
Expand Down