Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 66 additions & 38 deletions src/chain/cbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use std::collections::{BTreeMap, HashMap};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate};
use bdk_wallet::Update;
use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, TxUpdate};
use bdk_wallet::{KeychainKind, Update};
use bip157::chain::{BlockHeaderChanges, ChainState};
use bip157::error::FetchBlockError;
use bip157::{
Expand All @@ -27,7 +28,7 @@ use lightning::util::ser::Writeable;
use tokio::sync::{mpsc, oneshot};

use super::{FeeSourceConfig, WalletSyncStatus};
use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP};
use crate::config::{CbfSyncConfig, Config};
use crate::error::Error;
use crate::fee_estimator::{
apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target,
Expand Down Expand Up @@ -92,7 +93,7 @@ pub(super) struct CbfChainSource {
/// Serializes concurrent filter scans (on-chain and lightning).
scan_lock: tokio::sync::Mutex<()>,
/// Scripts registered by LDK's Filter trait for lightning channel monitoring.
registered_scripts: Mutex<Vec<ScriptBuf>>,
registered_scripts: Mutex<HashSet<ScriptBuf>>,
/// Deduplicates concurrent on-chain wallet sync requests.
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
/// Deduplicates concurrent lightning wallet sync requests.
Expand Down Expand Up @@ -150,7 +151,7 @@ impl CbfChainSource {
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
let sync_completion_tx = Arc::new(Mutex::new(None));
let filter_skip_height = Arc::new(AtomicU32::new(0));
let registered_scripts = Mutex::new(Vec::new());
let registered_scripts = Mutex::new(HashSet::new());
let scan_lock = tokio::sync::Mutex::new(());
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
Expand Down Expand Up @@ -509,12 +510,12 @@ impl CbfChainSource {

/// Register a transaction script for Lightning channel monitoring.
pub(crate) fn register_tx(&self, _txid: &Txid, script_pubkey: &Script) {
self.registered_scripts.lock().expect("lock").push(script_pubkey.to_owned());
self.registered_scripts.lock().expect("lock").insert(script_pubkey.to_owned());
}

/// Register a watched output script for Lightning channel monitoring.
pub(crate) fn register_output(&self, output: WatchedOutput) {
self.registered_scripts.lock().expect("lock").push(output.script_pubkey.clone());
self.registered_scripts.lock().expect("lock").insert(output.script_pubkey.clone());
}

/// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for
Expand Down Expand Up @@ -580,17 +581,23 @@ impl CbfChainSource {
let requester = self.requester()?;
let now = Instant::now();

let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP);
if scripts.is_empty() {
log_debug!(self.logger, "No wallet scripts to sync via CBF.");
return Ok(());
}
// Seed a sync-local IndexedTxGraph from a clone of the wallet's spk_index.
// This carries descriptors, the configured lookahead, and the currently-revealed
// range so we can both derive the SPK set to scan and observe new "used"
// indices via `apply_block_relevant`. Mirrors bdk-kyoto's `UpdateBuilder`.
let mut graph: IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>> =
IndexedTxGraph::new(onchain_wallet.spk_index_clone());

// Skip height: walk back from the wallet's persisted checkpoint by
// REORG_SAFETY_BLOCKS. Survives restarts since BDK persists the chain.
let skip_height =
onchain_wallet.latest_checkpoint().height().checked_sub(REORG_SAFETY_BLOCKS);

let timeout_fut = tokio::time::timeout(
Duration::from_secs(
self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs,
),
self.sync_onchain_wallet_op(requester, &onchain_wallet, scripts),
self.sync_onchain_wallet_op(requester, &mut graph, skip_height),
);

let (tx_update, sync_update) = match timeout_fut.await {
Expand All @@ -601,6 +608,10 @@ impl CbfChainSource {
},
};

// Pull the high-water marks of observed derivation indices so BDK can
// advance its reveal cursor past addresses that received funds.
let last_active_indices = graph.index.last_used_indices();

// Build chain checkpoint extending from the wallet's current tip,
// using `insert` (not `push`) so that reorgs are handled correctly.
// `insert` detects conflicting hashes and purges stale blocks,
Expand All @@ -614,8 +625,7 @@ impl CbfChainSource {
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
cp = cp.insert(tip_block_id);

let update =
Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) };
let update = Update { last_active_indices, tx_update, chain: Some(cp) };

onchain_wallet.apply_update(update)?;

Expand Down Expand Up @@ -644,25 +654,22 @@ impl CbfChainSource {
}

async fn sync_onchain_wallet_op(
&self, requester: Requester, onchain_wallet: &Wallet, scripts: Vec<ScriptBuf>,
&self, requester: Requester,
graph: &mut IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>>,
skip_height: Option<u32>,
) -> Result<(TxUpdate<ConfirmationBlockTime>, SyncUpdate), Error> {
// Derive skip height from BDK's persisted checkpoint, walked back by
// REORG_SAFETY_BLOCKS for reorg safety (same approach as bdk-kyoto).
// This survives restarts since BDK persists its checkpoint chain.
//
// We include LDK-registered scripts (e.g., channel funding output
// scripts) alongside the wallet scripts. This ensures the on-chain
// wallet scan also fetches blocks containing channel funding
// transactions, whose outputs are needed by BDK's TxGraph to
// calculate fees for subsequent spends such as splice transactions.
// Without these, BDK's `calculate_fee` would fail with
// `MissingTxOut` because the parent transaction's outputs are
// unknown. This mirrors what the Bitcoind chain source does in
// `Wallet::block_connected` by inserting registered tx outputs.
let mut all_scripts = scripts;
// Derive the SPK set from the indexer: every revealed key plus the configured
// lookahead window per keychain. Mirrors bdk-kyoto's `peek_scripts` and ensures
// we don't miss deposits one stop-gap past the current reveal frontier.
let mut all_scripts: Vec<ScriptBuf> = peek_keychain_scripts(&graph.index);

// Include LDK-registered scripts (e.g. channel funding output scripts) so the
// scan also fetches blocks containing those transactions; BDK needs their
// outputs in its TxGraph to compute fees for subsequent spends (splices).
// Without this, `calculate_fee` would fail with `MissingTxOut`. Mirrors what
// the Bitcoind chain source does in `Wallet::block_connected`.
all_scripts.extend(self.registered_scripts.lock().expect("lock").iter().cloned());
let skip_height =
onchain_wallet.latest_checkpoint().height().checked_sub(REORG_SAFETY_BLOCKS);

let (sync_update, matched) = self.run_filter_scan(all_scripts, skip_height).await?;

log_debug!(
Expand All @@ -671,10 +678,12 @@ impl CbfChainSource {
matched.len()
);

// Fetch matching blocks and include all their transactions.
// The compact block filter already matched our scripts (covering both
// created outputs and spent inputs), so we include every transaction
// from matched blocks and let BDK determine relevance.
// Fetch matched blocks. Feed each one to the IndexedTxGraph: this records
// observed derivation indices in the keychain index (so the wallet can
// advance its reveal cursor on apply_update) and also captures wallet-relevant
// txs. We additionally collect *every* tx from matched blocks into `tx_update`
// so LDK-registered txs (channel funding etc.) are known to BDK's graph for
// later fee computation, even though they don't match the wallet keychain.
let mut tx_update = TxUpdate::default();
let per_request_timeout =
Duration::from_secs(self.sync_config.timeouts_config.per_request_timeout_secs.into());
Expand All @@ -694,6 +703,9 @@ impl CbfChainSource {
let block_id = BlockId { height: *height, hash: block.header.block_hash() };
let conf_time =
ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 };

let _ = graph.apply_block_relevant(&block, *height);

for tx in &block.txdata {
let txid = tx.compute_txid();
tx_update.txs.push(Arc::new(tx.clone()));
Expand Down Expand Up @@ -726,7 +738,8 @@ impl CbfChainSource {
let requester = self.requester()?;
let now = Instant::now();

let scripts: Vec<ScriptBuf> = self.registered_scripts.lock().expect("lock").clone();
let scripts: Vec<ScriptBuf> =
self.registered_scripts.lock().expect("lock").iter().cloned().collect();
if scripts.is_empty() {
log_debug!(self.logger, "No registered scripts for CBF lightning sync.");
} else {
Expand Down Expand Up @@ -1182,6 +1195,21 @@ impl CbfChainSource {
}
}

/// SPKs to scan for on-chain wallet sync: every revealed key plus the configured
/// lookahead window per keychain. Mirrors bdk-kyoto's `UpdateBuilder::peek_scripts`.
fn peek_keychain_scripts(index: &KeychainTxOutIndex<KeychainKind>) -> Vec<ScriptBuf> {
let mut scripts = Vec::new();
let last_revealed = index.last_revealed_indices();
let lookahead = index.lookahead();
for keychain in [KeychainKind::External, KeychainKind::Internal] {
let Some(spk_iter) = index.unbounded_spk_iter(keychain) else { continue };
let frontier = last_revealed.get(&keychain).copied().unwrap_or(0);
let bound = (frontier + lookahead) as usize;
scripts.extend(spk_iter.take(bound).map(|(_, spk)| spk));
}
scripts
}

/// Record the current timestamp in a `NodeMetrics` field and persist the metrics.
fn update_node_metrics_timestamp(
node_metrics: &RwLock<NodeMetrics>, kv_store: &DynStore, logger: &Logger,
Expand Down
23 changes: 7 additions & 16 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
use bdk_wallet::descriptor::ExtendedDescriptor;
use bdk_wallet::error::{BuildFeeBumpError, CreateTxError};
Expand Down Expand Up @@ -122,22 +123,12 @@ impl Wallet {
self.inner.lock().expect("lock").start_sync_with_revealed_spks().build()
}

pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec<ScriptBuf> {
let wallet = self.inner.lock().expect("lock");
let mut scripts: Vec<ScriptBuf> =
wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect();

// For first sync when no scripts have been revealed yet, generate
// lookahead scripts up to the stop gap for both keychains.
if scripts.is_empty() {
for keychain in [KeychainKind::External, KeychainKind::Internal] {
for idx in 0..stop_gap as u32 {
scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey());
}
}
}

scripts
/// Clone the wallet's keychain SPK index for use as the indexer of a sync-local
/// `IndexedTxGraph`. The clone preserves descriptors, currently-revealed range,
/// and the configured lookahead, so the chain source can compute the SPK set to
/// scan and the resulting `last_used_indices` independently of the live wallet.
pub(crate) fn spk_index_clone(&self) -> KeychainTxOutIndex<KeychainKind> {
self.inner.lock().expect("lock").spk_index().clone()
}

pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint {
Expand Down
Loading