Skip to content

Commit c5ebd77

Browse files
ben-kaufmanclaude
andcommitted
fix: deduplicate multi-wallet BDK events and correct onchain payment direction
When a transaction touches multiple address-type wallets, each wallet independently emits a BdkWalletEvent, causing duplicate onchain events per sync cycle. Add per-type HashSet deduplication in process_wallet_events so each txid produces at most one event of each type per sync. Also fix PaymentDetails::update to apply direction corrections as secondary wallets sync. When the primary wallet sees only a change output it records the payment as Inbound; once secondary wallets reveal the spent inputs the direction must update to Outbound. Add a never-downgrade guard so a change-only wallet syncing before an input-holding wallet cannot transiently flip a correct Outbound back to Inbound. Add unit tests for the direction logic and integration tests covering event deduplication, direction correction, send_all direction, secondary wallet receives, and reorg deduplication. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f0b58a3 commit c5ebd77

3 files changed

Lines changed: 381 additions & 1 deletion

File tree

src/chain/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ pub(super) fn apply_additional_sync_results(
227227
}
228228

229229
// Process BDK wallet events and emit corresponding ldk-node events via the event queue.
230+
// When a transaction touches multiple address-type wallets, each wallet emits its own
231+
// BdkWalletEvent, so we deduplicate by txid before forwarding to the event queue.
230232
async fn process_wallet_events<L2: Deref>(
231233
wallet_events: Vec<BdkWalletEvent>, wallet: &crate::wallet::Wallet,
232234
event_queue: &EventQueue<L2>, logger: &Arc<Logger>,
@@ -235,9 +237,19 @@ async fn process_wallet_events<L2: Deref>(
235237
where
236238
L2::Target: LdkLogger,
237239
{
240+
// Use per-type sets so that two wallets with different prior state can each contribute
241+
// their event type for the same txid without suppressing the other.
242+
let mut seen_received_txids = std::collections::HashSet::new();
243+
let mut seen_confirmed_txids = std::collections::HashSet::new();
244+
let mut seen_reorged_txids = std::collections::HashSet::new();
245+
let mut seen_replaced_txids = std::collections::HashSet::new();
246+
238247
for wallet_event in wallet_events {
239248
match wallet_event {
240249
BdkWalletEvent::TxConfirmed { txid, block_time, .. } => {
250+
if !seen_confirmed_txids.insert(txid) {
251+
continue;
252+
}
241253
let details = get_transaction_details(&txid, wallet, channel_manager)
242254
.unwrap_or_else(|| {
243255
log_error!(logger, "Transaction {} not found in wallet", txid);
@@ -270,6 +282,9 @@ where
270282
BdkWalletEvent::TxUnconfirmed { txid, old_block_time, .. } => {
271283
match old_block_time {
272284
Some(_) => {
285+
if !seen_reorged_txids.insert(txid) {
286+
continue;
287+
}
273288
// Transaction was previously confirmed but is now unconfirmed (reorg)
274289
log_info!(
275290
logger,
@@ -283,6 +298,9 @@ where
283298
})?;
284299
},
285300
None => {
301+
if !seen_received_txids.insert(txid) {
302+
continue;
303+
}
286304
// New unconfirmed transaction detected in mempool
287305
let details = get_transaction_details(&txid, wallet, channel_manager)
288306
.unwrap_or_else(|| {
@@ -321,6 +339,9 @@ where
321339
// We don't emit an event for chain tip changes as this is too noisy
322340
},
323341
BdkWalletEvent::TxReplaced { txid, conflicts, .. } => {
342+
if !seen_replaced_txids.insert(txid) {
343+
continue;
344+
}
324345
let conflict_txids: Vec<Txid> =
325346
conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect();
326347
log_info!(
@@ -336,7 +357,7 @@ where
336357
})?;
337358
},
338359
_ => {
339-
// Ignore other event types
360+
// TxDropped is handled via check_and_emit_evicted_transactions; skip here.
340361
},
341362
}
342363
}

src/payment/store.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,16 @@ impl StorableObject for PaymentDetails {
293293
}
294294
}
295295

296+
// Direction can change for onchain payments as wallets sync; never downgrade from
297+
// Outbound since a pure receive always has sent=0 and can never compute as Outbound.
298+
if let Some(direction) = update.direction {
299+
let downgrade = self.direction == PaymentDirection::Outbound
300+
&& direction == PaymentDirection::Inbound;
301+
if !downgrade {
302+
update_if_necessary!(self.direction, direction);
303+
}
304+
}
305+
296306
if updated {
297307
self.latest_update_timestamp = SystemTime::now()
298308
.duration_since(UNIX_EPOCH)
@@ -619,6 +629,7 @@ impl StorableObjectUpdate<PaymentDetails> for PaymentDetailsUpdate {
619629

620630
#[cfg(test)]
621631
mod tests {
632+
use bitcoin::hashes::Hash;
622633
use bitcoin::io::Cursor;
623634
use lightning::util::ser::Readable;
624635

@@ -790,4 +801,65 @@ mod tests {
790801
}
791802
}
792803
}
804+
805+
#[test]
806+
fn onchain_direction_never_downgrades_from_outbound() {
807+
// A change-only wallet syncing before an input-holding wallet can temporarily
808+
// make received > partial-sent, producing a spurious Inbound update. The guard
809+
// must reject it and leave direction as Outbound throughout.
810+
let txid = Txid::from_slice(&[1u8; 32]).unwrap();
811+
let id = PaymentId(txid.to_byte_array());
812+
let kind = PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed };
813+
814+
// Primary wallet has inputs, so initial direction is Outbound.
815+
let mut payment = PaymentDetails::new(
816+
id,
817+
kind.clone(),
818+
Some(5_000 * 1000),
819+
Some(100 * 1000),
820+
PaymentDirection::Outbound,
821+
PaymentStatus::Pending,
822+
);
823+
assert_eq!(payment.direction, PaymentDirection::Outbound);
824+
825+
// Change-only secondary syncs: partial view would produce Inbound; guard blocks it.
826+
let mut update = PaymentDetailsUpdate::new(id);
827+
update.direction = Some(PaymentDirection::Inbound);
828+
update.amount_msat = Some(Some(8_000 * 1000));
829+
payment.update(&update);
830+
assert_eq!(payment.direction, PaymentDirection::Outbound);
831+
832+
// Input secondary syncs: full picture, Outbound is confirmed.
833+
let mut update = PaymentDetailsUpdate::new(id);
834+
update.direction = Some(PaymentDirection::Outbound);
835+
update.amount_msat = Some(Some(1_000 * 1000));
836+
payment.update(&update);
837+
assert_eq!(payment.direction, PaymentDirection::Outbound);
838+
}
839+
840+
#[test]
841+
fn onchain_direction_corrects_inbound_to_outbound() {
842+
// Primary wallet sees only the change output initially (Inbound); once a
843+
// secondary wallet syncs and reveals the spent inputs, direction must update.
844+
let txid = Txid::from_slice(&[2u8; 32]).unwrap();
845+
let id = PaymentId(txid.to_byte_array());
846+
let kind = PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed };
847+
848+
let mut payment = PaymentDetails::new(
849+
id,
850+
kind.clone(),
851+
Some(9_000 * 1000),
852+
Some(0),
853+
PaymentDirection::Inbound,
854+
PaymentStatus::Pending,
855+
);
856+
assert_eq!(payment.direction, PaymentDirection::Inbound);
857+
858+
// Secondary wallet syncs with full view; direction must be corrected.
859+
let mut update = PaymentDetailsUpdate::new(id);
860+
update.direction = Some(PaymentDirection::Outbound);
861+
update.amount_msat = Some(Some(1_000 * 1000));
862+
payment.update(&update);
863+
assert_eq!(payment.direction, PaymentDirection::Outbound);
864+
}
793865
}

0 commit comments

Comments
 (0)