diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index 9255163218..47e88d2d78 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -5,14 +5,15 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate}; -use bdk_wallet::Update; +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; +use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, TxUpdate}; +use bdk_wallet::{KeychainKind, Update}; use bip157::chain::{BlockHeaderChanges, ChainState}; use bip157::error::FetchBlockError; use bip157::{ @@ -27,7 +28,7 @@ use lightning::util::ser::Writeable; use tokio::sync::{mpsc, oneshot}; use super::{FeeSourceConfig, WalletSyncStatus}; -use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; +use crate::config::{CbfSyncConfig, Config}; use crate::error::Error; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, @@ -92,7 +93,7 @@ pub(super) struct CbfChainSource { /// Serializes concurrent filter scans (on-chain and lightning). scan_lock: tokio::sync::Mutex<()>, /// Scripts registered by LDK's Filter trait for lightning channel monitoring. - registered_scripts: Mutex>, + registered_scripts: Mutex>, /// Deduplicates concurrent on-chain wallet sync requests. onchain_wallet_sync_status: Mutex, /// Deduplicates concurrent lightning wallet sync requests. @@ -150,7 +151,7 @@ impl CbfChainSource { let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); let sync_completion_tx = Arc::new(Mutex::new(None)); let filter_skip_height = Arc::new(AtomicU32::new(0)); - let registered_scripts = Mutex::new(Vec::new()); + let registered_scripts = Mutex::new(HashSet::new()); let scan_lock = tokio::sync::Mutex::new(()); let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); @@ -509,12 +510,12 @@ impl CbfChainSource { /// Register a transaction script for Lightning channel monitoring. pub(crate) fn register_tx(&self, _txid: &Txid, script_pubkey: &Script) { - self.registered_scripts.lock().expect("lock").push(script_pubkey.to_owned()); + self.registered_scripts.lock().expect("lock").insert(script_pubkey.to_owned()); } /// Register a watched output script for Lightning channel monitoring. pub(crate) fn register_output(&self, output: WatchedOutput) { - self.registered_scripts.lock().expect("lock").push(output.script_pubkey.clone()); + self.registered_scripts.lock().expect("lock").insert(output.script_pubkey.clone()); } /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for @@ -580,17 +581,23 @@ impl CbfChainSource { let requester = self.requester()?; let now = Instant::now(); - let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); - if scripts.is_empty() { - log_debug!(self.logger, "No wallet scripts to sync via CBF."); - return Ok(()); - } + // Seed a sync-local IndexedTxGraph from a clone of the wallet's spk_index. + // This carries descriptors, the configured lookahead, and the currently-revealed + // range so we can both derive the SPK set to scan and observe new "used" + // indices via `apply_block_relevant`. Mirrors bdk-kyoto's `UpdateBuilder`. + let mut graph: IndexedTxGraph> = + IndexedTxGraph::new(onchain_wallet.spk_index_clone()); + + // Skip height: walk back from the wallet's persisted checkpoint by + // REORG_SAFETY_BLOCKS. Survives restarts since BDK persists the chain. + let skip_height = + onchain_wallet.latest_checkpoint().height().checked_sub(REORG_SAFETY_BLOCKS); let timeout_fut = tokio::time::timeout( Duration::from_secs( self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs, ), - self.sync_onchain_wallet_op(requester, &onchain_wallet, scripts), + self.sync_onchain_wallet_op(requester, &mut graph, skip_height), ); let (tx_update, sync_update) = match timeout_fut.await { @@ -601,6 +608,10 @@ impl CbfChainSource { }, }; + // Pull the high-water marks of observed derivation indices so BDK can + // advance its reveal cursor past addresses that received funds. + let last_active_indices = graph.index.last_used_indices(); + // Build chain checkpoint extending from the wallet's current tip, // using `insert` (not `push`) so that reorgs are handled correctly. // `insert` detects conflicting hashes and purges stale blocks, @@ -614,8 +625,7 @@ impl CbfChainSource { let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; cp = cp.insert(tip_block_id); - let update = - Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; + let update = Update { last_active_indices, tx_update, chain: Some(cp) }; onchain_wallet.apply_update(update)?; @@ -644,25 +654,22 @@ impl CbfChainSource { } async fn sync_onchain_wallet_op( - &self, requester: Requester, onchain_wallet: &Wallet, scripts: Vec, + &self, requester: Requester, + graph: &mut IndexedTxGraph>, + skip_height: Option, ) -> Result<(TxUpdate, SyncUpdate), Error> { - // Derive skip height from BDK's persisted checkpoint, walked back by - // REORG_SAFETY_BLOCKS for reorg safety (same approach as bdk-kyoto). - // This survives restarts since BDK persists its checkpoint chain. - // - // We include LDK-registered scripts (e.g., channel funding output - // scripts) alongside the wallet scripts. This ensures the on-chain - // wallet scan also fetches blocks containing channel funding - // transactions, whose outputs are needed by BDK's TxGraph to - // calculate fees for subsequent spends such as splice transactions. - // Without these, BDK's `calculate_fee` would fail with - // `MissingTxOut` because the parent transaction's outputs are - // unknown. This mirrors what the Bitcoind chain source does in - // `Wallet::block_connected` by inserting registered tx outputs. - let mut all_scripts = scripts; + // Derive the SPK set from the indexer: every revealed key plus the configured + // lookahead window per keychain. Mirrors bdk-kyoto's `peek_scripts` and ensures + // we don't miss deposits one stop-gap past the current reveal frontier. + let mut all_scripts: Vec = peek_keychain_scripts(&graph.index); + + // Include LDK-registered scripts (e.g. channel funding output scripts) so the + // scan also fetches blocks containing those transactions; BDK needs their + // outputs in its TxGraph to compute fees for subsequent spends (splices). + // Without this, `calculate_fee` would fail with `MissingTxOut`. Mirrors what + // the Bitcoind chain source does in `Wallet::block_connected`. all_scripts.extend(self.registered_scripts.lock().expect("lock").iter().cloned()); - let skip_height = - onchain_wallet.latest_checkpoint().height().checked_sub(REORG_SAFETY_BLOCKS); + let (sync_update, matched) = self.run_filter_scan(all_scripts, skip_height).await?; log_debug!( @@ -671,10 +678,12 @@ impl CbfChainSource { matched.len() ); - // Fetch matching blocks and include all their transactions. - // The compact block filter already matched our scripts (covering both - // created outputs and spent inputs), so we include every transaction - // from matched blocks and let BDK determine relevance. + // Fetch matched blocks. Feed each one to the IndexedTxGraph: this records + // observed derivation indices in the keychain index (so the wallet can + // advance its reveal cursor on apply_update) and also captures wallet-relevant + // txs. We additionally collect *every* tx from matched blocks into `tx_update` + // so LDK-registered txs (channel funding etc.) are known to BDK's graph for + // later fee computation, even though they don't match the wallet keychain. let mut tx_update = TxUpdate::default(); let per_request_timeout = Duration::from_secs(self.sync_config.timeouts_config.per_request_timeout_secs.into()); @@ -694,6 +703,9 @@ impl CbfChainSource { let block_id = BlockId { height: *height, hash: block.header.block_hash() }; let conf_time = ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 }; + + let _ = graph.apply_block_relevant(&block, *height); + for tx in &block.txdata { let txid = tx.compute_txid(); tx_update.txs.push(Arc::new(tx.clone())); @@ -726,7 +738,8 @@ impl CbfChainSource { let requester = self.requester()?; let now = Instant::now(); - let scripts: Vec = self.registered_scripts.lock().expect("lock").clone(); + let scripts: Vec = + self.registered_scripts.lock().expect("lock").iter().cloned().collect(); if scripts.is_empty() { log_debug!(self.logger, "No registered scripts for CBF lightning sync."); } else { @@ -1182,6 +1195,21 @@ impl CbfChainSource { } } +/// SPKs to scan for on-chain wallet sync: every revealed key plus the configured +/// lookahead window per keychain. Mirrors bdk-kyoto's `UpdateBuilder::peek_scripts`. +fn peek_keychain_scripts(index: &KeychainTxOutIndex) -> Vec { + let mut scripts = Vec::new(); + let last_revealed = index.last_revealed_indices(); + let lookahead = index.lookahead(); + for keychain in [KeychainKind::External, KeychainKind::Internal] { + let Some(spk_iter) = index.unbounded_spk_iter(keychain) else { continue }; + let frontier = last_revealed.get(&keychain).copied().unwrap_or(0); + let bound = (frontier + lookahead) as usize; + scripts.extend(spk_iter.take(bound).map(|(_, spk)| spk)); + } + scripts +} + /// Record the current timestamp in a `NodeMetrics` field and persist the metrics. fn update_node_metrics_timestamp( node_metrics: &RwLock, kv_store: &DynStore, logger: &Logger, diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 770bb19f1e..3aa0607e4e 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -10,6 +10,7 @@ use std::ops::Deref; use std::str::FromStr; use std::sync::{Arc, Mutex}; +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_wallet::descriptor::ExtendedDescriptor; use bdk_wallet::error::{BuildFeeBumpError, CreateTxError}; @@ -122,22 +123,12 @@ impl Wallet { self.inner.lock().expect("lock").start_sync_with_revealed_spks().build() } - pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec { - let wallet = self.inner.lock().expect("lock"); - let mut scripts: Vec = - wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect(); - - // For first sync when no scripts have been revealed yet, generate - // lookahead scripts up to the stop gap for both keychains. - if scripts.is_empty() { - for keychain in [KeychainKind::External, KeychainKind::Internal] { - for idx in 0..stop_gap as u32 { - scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey()); - } - } - } - - scripts + /// Clone the wallet's keychain SPK index for use as the indexer of a sync-local + /// `IndexedTxGraph`. The clone preserves descriptors, currently-revealed range, + /// and the configured lookahead, so the chain source can compute the SPK set to + /// scan and the resulting `last_used_indices` independently of the live wallet. + pub(crate) fn spk_index_clone(&self) -> KeychainTxOutIndex { + self.inner.lock().expect("lock").spk_index().clone() } pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint {