diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b7e4e6b2..04b42745 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -261,6 +261,32 @@ impl PayloadBuffer { .collect() } + /// Prune payload entries whose attestation target slot is at or below `finalized_slot`. + /// + /// Mirrors leanSpec's `prune_stale_attestation_data`: an entry is stale once its + /// target checkpoint is finalized — it can no longer contribute to fork choice and + /// keeping it around only pollutes `existing_proofs_for_data` lookups, occasionally + /// forcing recursive aggregation when plain XMSS aggregation would suffice. + /// + /// Returns the number of data_root entries removed. + fn prune(&mut self, finalized_slot: u64) -> usize { + let before = self.data.len(); + let total_proofs = &mut self.total_proofs; + self.data.retain(|_root, entry| { + if entry.data.target.slot > finalized_slot { + true + } else { + *total_proofs -= entry.proofs.len(); + false + } + }); + let pruned = before - self.data.len(); + if pruned > 0 { + self.order.retain(|r| self.data.contains_key(r)); + } + pruned + } + /// Extract per-validator latest attestations from proofs' participation bits. /// /// Iterates entries in insertion order (via `self.order`) so that, when two @@ -400,20 +426,20 @@ impl GossipSignatureBuffer { /// /// Returns the number of data_root entries pruned. fn prune(&mut self, finalized_slot: u64) -> usize { - let mut pruned_roots: HashSet = HashSet::new(); - self.data.retain(|root, entry| { + let before = self.data.len(); + self.data.retain(|_root, entry| { if entry.data.slot > finalized_slot { true } else { self.total_signatures -= entry.signatures.len(); - pruned_roots.insert(*root); false } }); - if !pruned_roots.is_empty() { - self.order.retain(|r| !pruned_roots.contains(r)); + let pruned = before - self.data.len(); + if pruned > 0 { + self.order.retain(|r| self.data.contains_key(r)); } - pruned_roots.len() + pruned } /// Returns a snapshot of all gossip signatures grouped by attestation data. @@ -727,11 +753,12 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); let pruned_sigs = self.prune_gossip_signatures(finalized.slot); + let pruned_payloads = self.prune_stale_aggregated_payloads(finalized.slot); - if pruned_chain > 0 || pruned_sigs > 0 { + if pruned_chain > 0 || pruned_sigs > 0 || pruned_payloads > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, "Pruned finalized data" + pruned_chain, pruned_sigs, pruned_payloads, "Pruned finalized data" ); } } @@ -830,6 +857,18 @@ impl Store { gossip.prune(finalized_slot) } + /// Prune aggregated payload buffers (new + known) whose target slot is at or below + /// `finalized_slot`. + /// + /// Mirrors leanSpec's `prune_stale_attestation_data` for the two aggregated payload + /// pools (gossip signatures are pruned separately by `prune_gossip_signatures`). + /// Returns the total number of data_root entries removed across both buffers. + pub fn prune_stale_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { + let pruned_new = self.new_payloads.lock().unwrap().prune(finalized_slot); + let pruned_known = self.known_payloads.lock().unwrap().prune(finalized_slot); + pruned_new + pruned_known + } + /// Prune old states beyond the retention window. /// /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any @@ -2075,6 +2114,95 @@ mod tests { assert_eq!(buf.total_proofs, 2); } + #[test] + fn payload_buffer_prune_drops_entries_with_finalized_target() { + let mut buf = PayloadBuffer::new(10); + let target_a = H256([0xaa; 32]); + let target_b = H256([0xbb; 32]); + let target_c = H256([0xcc; 32]); + + // Three entries at different target slots: 3, 5, 7. + let data_3 = make_att_data_for_target(3, target_a); + let data_5 = make_att_data_for_target(5, target_b); + let data_7 = make_att_data_for_target(7, target_c); + let root_3 = data_3.hash_tree_root(); + let root_5 = data_5.hash_tree_root(); + let root_7 = data_7.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_3), + make_proof_for_validators(&[0]), + ); + buf.push( + HashedAttestationData::new(data_5), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data_7), + make_proof_for_validators(&[3]), + ); + assert_eq!(buf.total_proofs, 3); + + // Finalized slot 5 prunes targets 3 and 5 (≤ 5), keeps target 7. + let pruned = buf.prune(5); + assert_eq!(pruned, 2); + assert!(!buf.data.contains_key(&root_3)); + assert!(!buf.data.contains_key(&root_5)); + assert!(buf.data.contains_key(&root_7)); + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.order.len(), 1); + assert_eq!(buf.order.front(), Some(&root_7)); + } + + #[test] + fn payload_buffer_prune_noop_when_nothing_stale() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data_for_target(10, H256([0xaa; 32])); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[0]), + ); + + let pruned = buf.prune(5); + assert_eq!(pruned, 0); + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.order.len(), 1); + } + + #[test] + fn store_prune_stale_aggregated_payloads_clears_both_buffers() { + let mut store = Store::test_store(); + + let stale = make_att_data_for_target(2, H256([0xaa; 32])); + let fresh = make_att_data_for_target(10, H256([0xbb; 32])); + + store.insert_new_aggregated_payload( + HashedAttestationData::new(stale.clone()), + make_proof_for_validators(&[0]), + ); + store.insert_known_aggregated_payload( + HashedAttestationData::new(stale), + make_proof_for_validators(&[1]), + ); + store.insert_new_aggregated_payload( + HashedAttestationData::new(fresh.clone()), + make_proof_for_validators(&[2]), + ); + store.insert_known_aggregated_payload( + HashedAttestationData::new(fresh), + make_proof_for_validators(&[3]), + ); + + assert_eq!(store.new_aggregated_payloads_count(), 2); + assert_eq!(store.known_aggregated_payloads_count(), 2); + + // Finalized slot 5: stale (target.slot == 2) is dropped from both buffers. + let pruned = store.prune_stale_aggregated_payloads(5); + assert_eq!(pruned, 2); + assert_eq!(store.new_aggregated_payloads_count(), 1); + assert_eq!(store.known_aggregated_payloads_count(), 1); + } + /// Build an attestation message at `slot` whose target points at `target_root`, /// distinct from the default zero target so two such datas have different roots. fn make_att_data_for_target(slot: u64, target_root: H256) -> AttestationData {