From 3370204144772b73be83120c729c7b93fd9fcdd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 21 May 2026 11:59:23 -0300 Subject: [PATCH 1/3] feat(storage): prune stale aggregated payloads on finalization The new and known aggregated payload buffers were only bounded by FIFO eviction at 64/512 entries: post-finalization entries lingered until naturally pushed out, polluting `existing_proofs_for_data` lookups and occasionally forcing recursive XMSS aggregation when plain aggregation would suffice. Mirrors leanSpec's `prune_stale_attestation_data`: drop entries whose `target.slot <= finalized.slot` from both buffers when finalization advances, alongside the existing gossip-signature prune. --- crates/storage/src/store.rs | 132 +++++++++++++++++++++++++++++++++++- 1 file changed, 130 insertions(+), 2 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b7e4e6b2..ba90f7a1 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -261,6 +261,32 @@ impl PayloadBuffer { .collect() } + /// Prune payload entries whose attestation target slot is at or below `finalized_slot`. + /// + /// Mirrors leanSpec's `prune_stale_attestation_data`: an entry is stale once its + /// target checkpoint is finalized — it can no longer contribute to fork choice and + /// keeping it around only pollutes `existing_proofs_for_data` lookups, occasionally + /// forcing recursive aggregation when plain XMSS aggregation would suffice. + /// + /// Returns the number of data_root entries removed. + fn prune(&mut self, finalized_slot: u64) -> usize { + let mut pruned_roots: HashSet = HashSet::new(); + let total_proofs = &mut self.total_proofs; + self.data.retain(|root, entry| { + if entry.data.target.slot > finalized_slot { + true + } else { + *total_proofs -= entry.proofs.len(); + pruned_roots.insert(*root); + false + } + }); + if !pruned_roots.is_empty() { + self.order.retain(|r| !pruned_roots.contains(r)); + } + pruned_roots.len() + } + /// Extract per-validator latest attestations from proofs' participation bits. /// /// Iterates entries in insertion order (via `self.order`) so that, when two @@ -727,11 +753,12 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); let pruned_sigs = self.prune_gossip_signatures(finalized.slot); + let pruned_payloads = self.prune_stale_aggregated_payloads(finalized.slot); - if pruned_chain > 0 || pruned_sigs > 0 { + if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, "Pruned finalized data" + pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data" ); } } @@ -830,6 +857,18 @@ impl Store { gossip.prune(finalized_slot) } + /// Prune aggregated payload buffers (new + known) whose target slot is at or below + /// `finalized_slot`. + /// + /// Mirrors leanSpec's `prune_stale_attestation_data` for the two aggregated payload + /// pools (gossip signatures are pruned separately by `prune_gossip_signatures`). + /// Returns the total number of data_root entries removed across both buffers. + pub fn prune_stale_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { + let pruned_new = self.new_payloads.lock().unwrap().prune(finalized_slot); + let pruned_known = self.known_payloads.lock().unwrap().prune(finalized_slot); + pruned_new + pruned_known + } + /// Prune old states beyond the retention window. /// /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any @@ -2075,6 +2114,95 @@ mod tests { assert_eq!(buf.total_proofs, 2); } + #[test] + fn payload_buffer_prune_drops_entries_with_finalized_target() { + let mut buf = PayloadBuffer::new(10); + let target_a = H256([0xaa; 32]); + let target_b = H256([0xbb; 32]); + let target_c = H256([0xcc; 32]); + + // Three entries at different target slots: 3, 5, 7. + let data_3 = make_att_data_for_target(3, target_a); + let data_5 = make_att_data_for_target(5, target_b); + let data_7 = make_att_data_for_target(7, target_c); + let root_3 = data_3.hash_tree_root(); + let root_5 = data_5.hash_tree_root(); + let root_7 = data_7.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_3), + make_proof_for_validators(&[0]), + ); + buf.push( + HashedAttestationData::new(data_5), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data_7), + make_proof_for_validators(&[3]), + ); + assert_eq!(buf.total_proofs, 3); + + // Finalized slot 5 prunes targets 3 and 5 (≤ 5), keeps target 7. + let pruned = buf.prune(5); + assert_eq!(pruned, 2); + assert!(!buf.data.contains_key(&root_3)); + assert!(!buf.data.contains_key(&root_5)); + assert!(buf.data.contains_key(&root_7)); + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.order.len(), 1); + assert_eq!(buf.order.front(), Some(&root_7)); + } + + #[test] + fn payload_buffer_prune_noop_when_nothing_stale() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data_for_target(10, H256([0xaa; 32])); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[0]), + ); + + let pruned = buf.prune(5); + assert_eq!(pruned, 0); + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.order.len(), 1); + } + + #[test] + fn store_prune_stale_aggregated_payloads_clears_both_buffers() { + let mut store = Store::test_store(); + + let stale = make_att_data_for_target(2, H256([0xaa; 32])); + let fresh = make_att_data_for_target(10, H256([0xbb; 32])); + + store.insert_new_aggregated_payload( + HashedAttestationData::new(stale.clone()), + make_proof_for_validators(&[0]), + ); + store.insert_known_aggregated_payload( + HashedAttestationData::new(stale), + make_proof_for_validators(&[1]), + ); + store.insert_new_aggregated_payload( + HashedAttestationData::new(fresh.clone()), + make_proof_for_validators(&[2]), + ); + store.insert_known_aggregated_payload( + HashedAttestationData::new(fresh), + make_proof_for_validators(&[3]), + ); + + assert_eq!(store.new_aggregated_payloads_count(), 2); + assert_eq!(store.known_aggregated_payloads_count(), 2); + + // Finalized slot 5: stale (target.slot == 2) is dropped from both buffers. + let pruned = store.prune_stale_aggregated_payloads(5); + assert_eq!(pruned, 2); + assert_eq!(store.new_aggregated_payloads_count(), 1); + assert_eq!(store.known_aggregated_payloads_count(), 1); + } + /// Build an attestation message at `slot` whose target points at `target_root`, /// distinct from the default zero target so two such datas have different roots. fn make_att_data_for_target(slot: u64, target_root: H256) -> AttestationData { From dc86eed7894e1ef0e6bf8ad6c335c090326479ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 21 May 2026 13:00:00 -0300 Subject: [PATCH 2/3] refactor(storage): drop intermediate HashSet in PayloadBuffer::prune Replace the pruned_roots HashSet with a size-delta counter; filter self.order via self.data.contains_key after retain. Same semantics, one fewer allocation per prune, and matches the disjoint-field-borrow pattern used elsewhere in the file. --- crates/storage/src/store.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index ba90f7a1..d853f1d5 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -270,21 +270,21 @@ impl PayloadBuffer { /// /// Returns the number of data_root entries removed. fn prune(&mut self, finalized_slot: u64) -> usize { - let mut pruned_roots: HashSet = HashSet::new(); + let before = self.data.len(); let total_proofs = &mut self.total_proofs; - self.data.retain(|root, entry| { + self.data.retain(|_root, entry| { if entry.data.target.slot > finalized_slot { true } else { *total_proofs -= entry.proofs.len(); - pruned_roots.insert(*root); false } }); - if !pruned_roots.is_empty() { - self.order.retain(|r| !pruned_roots.contains(r)); + let pruned = before - self.data.len(); + if pruned > 0 { + self.order.retain(|r| self.data.contains_key(r)); } - pruned_roots.len() + pruned } /// Extract per-validator latest attestations from proofs' participation bits. From 1439aac54687cf123aa49eebc5c5dfed680cb583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 21 May 2026 13:15:11 -0300 Subject: [PATCH 3/3] refactor(storage): drop intermediate HashSet in GossipSignatureBuffer::prune Mirror the same size-delta + data.contains_key pattern applied to PayloadBuffer::prune in the prior commit, so both buffers share a single prune idiom. --- crates/storage/src/store.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d853f1d5..04b42745 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -426,20 +426,20 @@ impl GossipSignatureBuffer { /// /// Returns the number of data_root entries pruned. fn prune(&mut self, finalized_slot: u64) -> usize { - let mut pruned_roots: HashSet = HashSet::new(); - self.data.retain(|root, entry| { + let before = self.data.len(); + self.data.retain(|_root, entry| { if entry.data.slot > finalized_slot { true } else { self.total_signatures -= entry.signatures.len(); - pruned_roots.insert(*root); false } }); - if !pruned_roots.is_empty() { - self.order.retain(|r| !pruned_roots.contains(r)); + let pruned = before - self.data.len(); + if pruned > 0 { + self.order.retain(|r| self.data.contains_key(r)); } - pruned_roots.len() + pruned } /// Returns a snapshot of all gossip signatures grouped by attestation data.