Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 93 additions & 21 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,24 +186,42 @@ impl BroadcasterInterface for TestBroadcaster {
struct ChainState {
blocks: Vec<(Header, Vec<Transaction>)>,
confirmed_txids: HashSet<Txid>,
/// Unconfirmed transactions (e.g., splice txs). Conflicting RBF candidates may coexist;
/// `confirm_pending_txs` determines which one confirms.
pending_txs: Vec<Transaction>,
}

impl ChainState {
fn new() -> Self {
let genesis_hash = genesis_block(Network::Bitcoin).block_hash();
let genesis_header = create_dummy_header(genesis_hash, 42);
Self { blocks: vec![(genesis_header, Vec::new())], confirmed_txids: HashSet::new() }
Self {
blocks: vec![(genesis_header, Vec::new())],
confirmed_txids: HashSet::new(),
pending_txs: Vec::new(),
}
}

fn tip_height(&self) -> u32 {
(self.blocks.len() - 1) as u32
}

fn is_outpoint_spent(&self, outpoint: &bitcoin::OutPoint) -> bool {
self.blocks.iter().any(|(_, txs)| {
txs.iter().any(|tx| {
tx.input.iter().any(|input| input.previous_output == *outpoint)
})
})
}

fn confirm_tx(&mut self, tx: Transaction) -> bool {
let txid = tx.compute_txid();
if self.confirmed_txids.contains(&txid) {
return false;
}
if tx.input.iter().any(|input| self.is_outpoint_spent(&input.previous_output)) {
return false;
}
self.confirmed_txids.insert(txid);

let prev_hash = self.blocks.last().unwrap().0.block_hash();
Expand All @@ -218,6 +236,29 @@ impl ChainState {
true
}

/// Add a transaction to the pending pool (mempool). Multiple conflicting transactions (RBF
/// candidates) may coexist; `confirm_pending_txs` selects which one to confirm. If the
/// conflicting transaction was already confirmed, the new transaction is dropped since a
/// confirmed transaction cannot be replaced on chain.
fn add_pending_tx(&mut self, tx: Transaction) {
if tx.input.iter().any(|i| self.is_outpoint_spent(&i.previous_output)) {
return;
}
self.pending_txs.push(tx);
}

/// Confirm pending transactions, selecting deterministically among conflicting RBF candidates.
/// Sorting by txid before confirming means the winner depends on the fuzz input (which
/// determines tx content and thus txid), while `confirm_tx` rejects double-spends so only one
/// conflicting tx confirms.
fn confirm_pending_txs(&mut self) {
let mut txs = std::mem::take(&mut self.pending_txs);
txs.sort_by_key(|tx| tx.compute_txid());
for tx in txs {
self.confirm_tx(tx);
}
}

fn block_at(&self, height: u32) -> &(Header, Vec<Transaction>) {
&self.blocks[height as usize]
}
Expand Down Expand Up @@ -853,14 +894,25 @@ fn send_mpp_hop_payment(
}

#[inline]
fn assert_action_timeout_awaiting_response(action: &msgs::ErrorAction) {
// Since sending/receiving messages may be delayed, `timer_tick_occurred` may cause a node to
// disconnect their counterparty if they're expecting a timely response.
assert!(matches!(
action,
fn assert_action_disconnect_with_warning(action: &msgs::ErrorAction) {
match action {
msgs::ErrorAction::DisconnectPeerWithWarning { msg }
if msg.data.contains("Disconnecting due to timeout awaiting response")
));
if msg.data.contains("Disconnecting due to timeout awaiting response")
|| msg.data.contains("cannot RBF")
|| msg.data.contains("before RBF")
|| msg.data.contains("needed for RBF")
|| msg.data.contains("splice to RBF")
|| msg.data.contains("candidates to RBF")
|| msg.data.contains("candidates for RBF")
|| msg.data.contains("before spliced")
|| msg.data.contains("needed to splice")
|| msg.data.contains("a splice pending")
|| msg.data.contains("cannot be spliced")
|| msg.data.contains("Splicing requested on a channel that is not live")
|| msg.data.contains("funding negotiation already in progress") =>
{},
_ => panic!("Unexpected error action: {:?}", action),
}
}

enum ChanType {
Expand Down Expand Up @@ -1608,12 +1660,13 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
},
MessageSendEvent::SendChannelReady { .. } => continue,
MessageSendEvent::SendAnnouncementSignatures { .. } => continue,
MessageSendEvent::BroadcastChannelUpdate { .. } => continue,
MessageSendEvent::SendChannelUpdate { ref node_id, .. } => {
if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
*node_id == a_id
},
MessageSendEvent::HandleError { ref action, ref node_id } => {
assert_action_timeout_awaiting_response(action);
assert_action_disconnect_with_warning(action);
if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); }
*node_id == a_id
},
Expand Down Expand Up @@ -1843,7 +1896,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
}
},
MessageSendEvent::HandleError { ref action, .. } => {
assert_action_timeout_awaiting_response(action);
assert_action_disconnect_with_warning(action);
},
MessageSendEvent::SendChannelReady { .. } => {
// Can be generated as a reestablish response
Expand Down Expand Up @@ -1896,7 +1949,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
MessageSendEvent::SendAnnouncementSignatures { .. } => {},
MessageSendEvent::SendChannelUpdate { .. } => {},
MessageSendEvent::HandleError { ref action, .. } => {
assert_action_timeout_awaiting_response(action);
assert_action_disconnect_with_warning(action);
},
_ => panic!("Unhandled message event"),
}
Expand All @@ -1918,7 +1971,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
MessageSendEvent::SendAnnouncementSignatures { .. } => {},
MessageSendEvent::SendChannelUpdate { .. } => {},
MessageSendEvent::HandleError { ref action, .. } => {
assert_action_timeout_awaiting_response(action);
assert_action_disconnect_with_warning(action);
},
_ => panic!("Unhandled message event"),
}
Expand Down Expand Up @@ -2025,15 +2078,16 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
assert!(txs.len() >= 1);
let splice_tx = txs.remove(0);
assert_eq!(new_funding_txo.txid, splice_tx.compute_txid());
chain_state.confirm_tx(splice_tx);
chain_state.add_pending_tx(splice_tx);
},
events::Event::SpliceFailed { .. } => {},
events::Event::DiscardFunding {
funding_info: events::FundingInfo::Contribution { .. },
funding_info: events::FundingInfo::Contribution { .. }
| events::FundingInfo::Tx { .. },
..
} => {},

_ => panic!("Unhandled event"),
_ => panic!("Unhandled event: {:?}", event),
}
}
while nodes[$node].needs_pending_htlc_processing() {
Expand Down Expand Up @@ -2477,13 +2531,31 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
},

// Sync node by 1 block to cover confirmation of a transaction.
0xa8 => sync_with_chain_state(&mut chain_state, &nodes[0], &mut node_height_a, Some(1)),
0xa9 => sync_with_chain_state(&mut chain_state, &nodes[1], &mut node_height_b, Some(1)),
0xaa => sync_with_chain_state(&mut chain_state, &nodes[2], &mut node_height_c, Some(1)),
0xa8 => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[0], &mut node_height_a, Some(1));
},
0xa9 => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[1], &mut node_height_b, Some(1));
},
0xaa => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[2], &mut node_height_c, Some(1));
},
// Sync node to chain tip to cover confirmation of a transaction post-reorg-risk.
0xab => sync_with_chain_state(&mut chain_state, &nodes[0], &mut node_height_a, None),
0xac => sync_with_chain_state(&mut chain_state, &nodes[1], &mut node_height_b, None),
0xad => sync_with_chain_state(&mut chain_state, &nodes[2], &mut node_height_c, None),
0xab => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[0], &mut node_height_a, None);
},
0xac => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[1], &mut node_height_b, None);
},
0xad => {
chain_state.confirm_pending_txs();
sync_with_chain_state(&mut chain_state, &nodes[2], &mut node_height_c, None);
},

0xb0 | 0xb1 | 0xb2 => {
// Restart node A, picking among the in-flight `ChannelMonitor`s to use based on
Expand Down
Loading