Skip to content

Commit f6b2565

Browse files
oleonardolimaclaude
andcommitted
refactor(chain)!: introduce stage based canonicalization processing
- Add new `CanonicalStage` enum for tracking the different canonicalization phases/stages. - Add new `try_advance()` method for stage progression. - Add new `is_transitive()` helper to `CanonicalReason`. - Change internal `confirmed_anchors` to `direct_anchors` for better clarity. - Update the `resolve_query()` to handle staged-based processing. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 77fcf91 commit f6b2565

1 file changed

Lines changed: 202 additions & 66 deletions

File tree

crates/chain/src/canonical_task.rs

Lines changed: 202 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@ use bitcoin::{Transaction, Txid};
1111
type CanonicalMap<A> = HashMap<Txid, (Arc<Transaction>, CanonicalReason<A>)>;
1212
type NotCanonicalSet = HashSet<Txid>;
1313

14+
/// Represents the current stage of canonicalization processing.
15+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16+
enum CanonicalStage {
17+
/// Processing directly anchored transactions.
18+
AnchoredTxs,
19+
/// Processing transactions seen in mempool.
20+
SeenTxs,
21+
/// Processing leftover transactions.
22+
LeftOverTxs,
23+
/// Processing transitively anchored transactions.
24+
TransitivelyAnchoredTxs,
25+
/// All processing is complete.
26+
Finished,
27+
}
28+
1429
/// Modifies the canonicalization algorithm.
1530
#[derive(Debug, Default, Clone)]
1631
pub struct CanonicalizationParams {
@@ -30,75 +45,147 @@ pub struct CanonicalizationTask<'g, A> {
3045
unprocessed_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3146
unprocessed_seen_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>, u64)> + 'g>,
3247
unprocessed_leftover_txs: VecDeque<(Txid, Arc<Transaction>, u32)>,
48+
unprocessed_transitively_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3349

3450
canonical: CanonicalMap<A>,
3551
not_canonical: NotCanonicalSet,
3652

3753
// Store canonical transactions in order
3854
canonical_order: Vec<Txid>,
3955

40-
// Track which transactions have confirmed anchors
41-
confirmed_anchors: HashMap<Txid, A>,
56+
// Track which transactions have direct anchors (not transitive)
57+
direct_anchors: HashMap<Txid, A>,
58+
59+
// Track the current stage of processing
60+
current_stage: CanonicalStage,
4261
}
4362

4463
impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
4564
type Output = CanonicalView<A>;
4665

4766
fn next_query(&mut self) -> Option<ChainRequest> {
48-
// Get the next unprocessed anchored tx that needs to query a chain oracle.
49-
if let Some((_txid, _tx, anchors)) = self.unprocessed_anchored_txs.front() {
50-
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
51-
return Some(ChainRequest {
52-
chain_tip: self.chain_tip,
53-
block_ids,
54-
});
67+
// Try to advance to the next stage if needed
68+
self.try_advance();
69+
70+
match self.current_stage {
71+
CanonicalStage::AnchoredTxs => {
72+
// Process directly anchored transactions first
73+
if let Some((_txid, _, anchors)) = self.unprocessed_anchored_txs.front() {
74+
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
75+
return Some(ChainRequest {
76+
chain_tip: self.chain_tip,
77+
block_ids,
78+
});
79+
}
80+
None
81+
}
82+
CanonicalStage::TransitivelyAnchoredTxs => {
83+
// Process transitively anchored transactions last
84+
if let Some((_txid, _, anchors)) =
85+
self.unprocessed_transitively_anchored_txs.front()
86+
{
87+
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
88+
return Some(ChainRequest {
89+
chain_tip: self.chain_tip,
90+
block_ids,
91+
});
92+
}
93+
None
94+
}
95+
CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => {
96+
// These stages don't need queries
97+
None
98+
}
5599
}
56-
None
57100
}
58101

59102
fn resolve_query(&mut self, response: ChainResponse) {
60-
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
61-
// Find the anchor that matches the confirmed BlockId
62-
let best_anchor = response.and_then(|block_id| {
63-
anchors
64-
.iter()
65-
.find(|anchor| anchor.anchor_block() == block_id)
66-
.cloned()
67-
});
68-
69-
match best_anchor {
70-
Some(best_anchor) => {
71-
self.confirmed_anchors.insert(txid, best_anchor.clone());
72-
if !self.is_canonicalized(txid) {
73-
self.mark_canonical(txid, tx, CanonicalReason::from_anchor(best_anchor));
103+
// Only AnchoredTxs and TransitivelyAnchoredTxs stages should receive query
104+
// responses Other stages don't generate queries and thus shouldn't call
105+
// resolve_query
106+
match self.current_stage {
107+
CanonicalStage::AnchoredTxs => {
108+
// Process directly anchored transaction response
109+
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
110+
// Find the anchor that matches the confirmed BlockId
111+
let best_anchor = response.and_then(|block_id| {
112+
anchors
113+
.iter()
114+
.find(|anchor| anchor.anchor_block() == block_id)
115+
.cloned()
116+
});
117+
118+
match best_anchor {
119+
Some(best_anchor) => {
120+
// Transaction has a confirmed anchor
121+
self.direct_anchors.insert(txid, best_anchor.clone());
122+
if !self.is_canonicalized(txid) {
123+
self.mark_canonical(
124+
txid,
125+
tx,
126+
CanonicalReason::from_anchor(best_anchor),
127+
);
128+
}
129+
}
130+
None => {
131+
// No confirmed anchor found, add to leftover transactions for later
132+
// processing
133+
self.unprocessed_leftover_txs.push_back((
134+
txid,
135+
tx,
136+
anchors
137+
.iter()
138+
.last()
139+
.expect(
140+
"tx taken from `unprocessed_anchored_txs` so it must have at least one anchor",
141+
)
142+
.confirmation_height_upper_bound(),
143+
))
144+
}
74145
}
75146
}
76-
None => {
77-
self.unprocessed_leftover_txs.push_back((
78-
txid,
79-
tx,
147+
}
148+
CanonicalStage::TransitivelyAnchoredTxs => {
149+
// Process transitively anchored transaction response
150+
if let Some((txid, _tx, anchors)) =
151+
self.unprocessed_transitively_anchored_txs.pop_front()
152+
{
153+
// Find the anchor that matches the confirmed BlockId
154+
let best_anchor = response.and_then(|block_id| {
80155
anchors
81156
.iter()
82-
.last()
83-
.expect(
84-
"tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor",
85-
)
86-
.confirmation_height_upper_bound(),
87-
))
157+
.find(|anchor| anchor.anchor_block() == block_id)
158+
.cloned()
159+
});
160+
161+
if let Some(best_anchor) = best_anchor {
162+
// Found a confirmed anchor for this transitively anchored transaction
163+
self.direct_anchors.insert(txid, best_anchor.clone());
164+
// Note: We don't re-mark as canonical since it's already marked
165+
// from being transitively anchored by its descendant
166+
}
167+
// If no confirmed anchor, we keep the transitive canonicalization status
88168
}
89169
}
170+
CanonicalStage::SeenTxs | CanonicalStage::LeftOverTxs | CanonicalStage::Finished => {
171+
// These stages don't generate queries and shouldn't receive responses
172+
debug_assert!(
173+
false,
174+
"resolve_query called for stage {:?} which doesn't generate queries",
175+
self.current_stage
176+
);
177+
}
90178
}
91179
}
92180

93181
fn is_finished(&mut self) -> bool {
94-
self.unprocessed_anchored_txs.is_empty()
182+
// Try to advance stages first
183+
self.try_advance();
184+
// Check if we've reached the Finished stage
185+
self.current_stage == CanonicalStage::Finished
95186
}
96187

97-
fn finish(mut self) -> Self::Output {
98-
// Process remaining transactions (seen and leftover)
99-
self.process_seen_txs();
100-
self.process_leftover_txs();
101-
188+
fn finish(self) -> Self::Output {
102189
// Build the canonical view
103190
let mut view_order = Vec::new();
104191
let mut view_txs = HashMap::new();
@@ -127,7 +214,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
127214
// Determine chain position based on reason
128215
let chain_position = match reason {
129216
CanonicalReason::Assumed { descendant } => match descendant {
130-
Some(_) => match self.confirmed_anchors.get(txid) {
217+
Some(_) => match self.direct_anchors.get(txid) {
131218
Some(anchor) => ChainPosition::Confirmed {
132219
anchor,
133220
transitively: None,
@@ -143,7 +230,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
143230
},
144231
},
145232
CanonicalReason::Anchor { anchor, descendant } => match descendant {
146-
Some(_) => match self.confirmed_anchors.get(txid) {
233+
Some(_) => match self.direct_anchors.get(txid) {
147234
Some(anchor) => ChainPosition::Confirmed {
148235
anchor,
149236
transitively: None,
@@ -179,6 +266,49 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
179266
}
180267

181268
impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
269+
/// Try to advance to the next stage if the current stage is complete.
270+
/// The loop continues through stages that process all their transactions at once
271+
/// (SeenTxs and LeftOverTxs) to avoid needing multiple calls.
272+
fn try_advance(&mut self) {
273+
loop {
274+
let advanced = match self.current_stage {
275+
CanonicalStage::AnchoredTxs => {
276+
if self.unprocessed_anchored_txs.is_empty() {
277+
self.current_stage = CanonicalStage::SeenTxs;
278+
true // Continue to process SeenTxs immediately
279+
} else {
280+
false // Still have work, stop advancing
281+
}
282+
}
283+
CanonicalStage::SeenTxs => {
284+
// Process all seen transactions at once
285+
self.process_seen_txs();
286+
self.current_stage = CanonicalStage::LeftOverTxs;
287+
true // Continue to process LeftOverTxs immediately
288+
}
289+
CanonicalStage::LeftOverTxs => {
290+
// Process all leftover transactions at once
291+
self.process_leftover_txs();
292+
self.current_stage = CanonicalStage::TransitivelyAnchoredTxs;
293+
false // Stop here - TransitivelyAnchoredTxs need queries
294+
}
295+
CanonicalStage::TransitivelyAnchoredTxs => {
296+
if self.unprocessed_transitively_anchored_txs.is_empty() {
297+
self.current_stage = CanonicalStage::Finished;
298+
}
299+
false // Stop advancing
300+
}
301+
CanonicalStage::Finished => {
302+
false // Already finished, nothing to do
303+
}
304+
};
305+
306+
if !advanced {
307+
break;
308+
}
309+
}
310+
}
311+
182312
/// Creates a new canonicalization task.
183313
pub fn new(
184314
tx_graph: &'g TxGraph<A>,
@@ -211,12 +341,14 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
211341
unprocessed_anchored_txs,
212342
unprocessed_seen_txs,
213343
unprocessed_leftover_txs: VecDeque::new(),
344+
unprocessed_transitively_anchored_txs: VecDeque::new(),
214345

215346
canonical: HashMap::new(),
216347
not_canonical: HashSet::new(),
217348

218349
canonical_order: Vec::new(),
219-
confirmed_anchors: HashMap::new(),
350+
direct_anchors: HashMap::new(),
351+
current_stage: CanonicalStage::AnchoredTxs,
220352
};
221353

222354
// process assumed transactions first (they don't need queries)
@@ -331,30 +463,28 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
331463
for txid in undo_not_canonical {
332464
self.not_canonical.remove(&txid);
333465
}
334-
} else {
335-
// Add to canonical order
336-
for (txid, tx, reason) in &staged_canonical {
337-
self.canonical_order.push(*txid);
338-
339-
// If this was marked transitively, check if it has anchors to verify
340-
let is_transitive = matches!(
341-
reason,
342-
CanonicalReason::Anchor {
343-
descendant: Some(_),
344-
..
345-
} | CanonicalReason::Assumed {
346-
descendant: Some(_),
347-
..
348-
}
349-
);
466+
return;
467+
}
350468

351-
if is_transitive {
352-
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
353-
// only check anchors we haven't already confirmed
354-
if !self.confirmed_anchors.contains_key(txid) {
355-
self.unprocessed_anchored_txs
356-
.push_back((*txid, tx.clone(), anchors));
357-
}
469+
// Add to canonical order
470+
for (txid, tx, reason) in &staged_canonical {
471+
self.canonical_order.push(*txid);
472+
473+
// ObservedIn transactions don't need anchor verification
474+
if matches!(reason, CanonicalReason::ObservedIn { .. }) {
475+
continue;
476+
}
477+
478+
// Check if this transaction was marked transitively and needs its own anchors verified
479+
if reason.is_transitive() {
480+
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
481+
// only check anchors we haven't already confirmed
482+
if !self.direct_anchors.contains_key(txid) {
483+
self.unprocessed_transitively_anchored_txs.push_back((
484+
*txid,
485+
tx.clone(),
486+
anchors,
487+
));
358488
}
359489
}
360490
}
@@ -449,6 +579,12 @@ impl<A: Clone> CanonicalReason<A> {
449579
CanonicalReason::ObservedIn { descendant, .. } => descendant,
450580
}
451581
}
582+
583+
/// Returns true if this reason represents a transitive canonicalization
584+
/// (i.e., the transaction is canonical because of its descendant).
585+
pub fn is_transitive(&self) -> bool {
586+
self.descendant().is_some()
587+
}
452588
}
453589

454590
#[cfg(test)]

0 commit comments

Comments
 (0)