Skip to content

Commit 81bccf4

Browse files
committed
Implement CbfChainSource lifecycle and P2P broadcast
1 parent 4cb6618 commit 81bccf4

4 files changed

Lines changed: 248 additions & 11 deletions

File tree

src/chain/cbf.rs

Lines changed: 210 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use std::sync::{Arc, RwLock};
8+
use std::net::SocketAddr;
9+
use std::sync::{Arc, Mutex, RwLock};
10+
use std::time::Duration;
911

12+
use bip157::{BlockHash, Builder, Client, Event, Info, Requester, TrustedPeer, Warning};
1013
use bitcoin::{Script, Transaction, Txid};
1114
use lightning::chain::WatchedOutput;
15+
use lightning::util::ser::Writeable;
16+
use tokio::sync::mpsc;
1217

1318
use crate::config::{CbfSyncConfig, Config};
1419
use crate::fee_estimator::OnchainFeeEstimator;
15-
use crate::logger::{log_error, LdkLogger, Logger};
20+
use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
1621
use crate::runtime::Runtime;
1722
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
1823
use crate::{Error, NodeMetrics};
@@ -22,6 +27,10 @@ pub(super) struct CbfChainSource {
2227
peers: Vec<String>,
2328
/// User-provided sync configuration (timeouts, background sync intervals).
2429
pub(super) sync_config: CbfSyncConfig,
30+
/// Tracks whether the bip157 node is running and holds the command handle.
31+
cbf_runtime_status: Mutex<CbfRuntimeStatus>,
32+
/// Latest chain tip hash, updated by the background event processing task.
33+
latest_tip: Arc<Mutex<Option<BlockHash>>>,
2534
/// Shared fee rate estimator, updated by this chain source.
2635
fee_estimator: Arc<OnchainFeeEstimator>,
2736
/// Persistent key-value store for node metrics.
@@ -34,23 +43,169 @@ pub(super) struct CbfChainSource {
3443
node_metrics: Arc<RwLock<NodeMetrics>>,
3544
}
3645

46+
enum CbfRuntimeStatus {
47+
Started { requester: Requester },
48+
Stopped,
49+
}
50+
3751
impl CbfChainSource {
3852
pub(crate) fn new(
3953
peers: Vec<String>, sync_config: CbfSyncConfig, fee_estimator: Arc<OnchainFeeEstimator>,
4054
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
4155
node_metrics: Arc<RwLock<NodeMetrics>>,
4256
) -> Self {
43-
Self { peers, sync_config, fee_estimator, kv_store, config, logger, node_metrics }
57+
let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped);
58+
let latest_tip = Arc::new(Mutex::new(None));
59+
Self {
60+
peers,
61+
sync_config,
62+
cbf_runtime_status,
63+
latest_tip,
64+
fee_estimator,
65+
kv_store,
66+
config,
67+
logger,
68+
node_metrics,
69+
}
4470
}
4571

4672
/// Start the bip157 node and spawn background tasks for event processing.
47-
pub(crate) fn start(&self, _runtime: Arc<Runtime>) {
48-
log_error!(self.logger, "CBF chain source start is not yet implemented.");
73+
pub(crate) fn start(&self, runtime: Arc<Runtime>) {
74+
let mut status = self.cbf_runtime_status.lock().unwrap();
75+
if matches!(*status, CbfRuntimeStatus::Started { .. }) {
76+
debug_assert!(false, "We shouldn't call start if we're already started");
77+
return;
78+
}
79+
80+
let network = self.config.network;
81+
82+
let mut builder = Builder::new(network);
83+
84+
// Configure data directory under the node's storage path.
85+
let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data");
86+
builder = builder.data_dir(data_dir);
87+
88+
// Add configured peers.
89+
let peers: Vec<TrustedPeer> = self
90+
.peers
91+
.iter()
92+
.filter_map(|peer_str| {
93+
peer_str.parse::<SocketAddr>().ok().map(TrustedPeer::from_socket_addr)
94+
})
95+
.collect();
96+
if !peers.is_empty() {
97+
builder = builder.add_peers(peers);
98+
}
99+
100+
// Request witness data so segwit transactions include full witnesses,
101+
// required for Lightning channel operations.
102+
builder = builder.fetch_witness_data();
103+
104+
// Increase peer response timeout from the default 5 seconds to avoid
105+
// disconnecting slow peers during block downloads.
106+
builder = builder.response_timeout(Duration::from_secs(30));
107+
108+
let (node, client) = builder.build();
109+
110+
let Client { requester, info_rx, warn_rx, event_rx } = client;
111+
112+
// Spawn the bip157 node in the background.
113+
runtime.spawn_background_task(async move {
114+
let _ = node.run().await;
115+
});
116+
117+
// Spawn a task to log info messages.
118+
let info_logger = Arc::clone(&self.logger);
119+
runtime
120+
.spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger));
121+
122+
// Spawn a task to log warning messages.
123+
let warn_logger = Arc::clone(&self.logger);
124+
runtime
125+
.spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger));
126+
127+
// Spawn a task to process events.
128+
let event_logger = Arc::clone(&self.logger);
129+
let event_tip = Arc::clone(&self.latest_tip);
130+
runtime.spawn_cancellable_background_task(Self::process_events(
131+
event_rx,
132+
event_tip,
133+
event_logger,
134+
));
135+
136+
log_info!(self.logger, "CBF chain source started.");
137+
138+
*status = CbfRuntimeStatus::Started { requester };
49139
}
50140

51141
/// Shut down the bip157 node and stop all background tasks.
52142
pub(crate) fn stop(&self) {
53-
log_error!(self.logger, "CBF chain source stop is not yet implemented.");
143+
let mut status = self.cbf_runtime_status.lock().unwrap();
144+
match &*status {
145+
CbfRuntimeStatus::Started { requester } => {
146+
let _ = requester.shutdown();
147+
log_info!(self.logger, "CBF chain source stopped.");
148+
},
149+
CbfRuntimeStatus::Stopped => {},
150+
}
151+
*status = CbfRuntimeStatus::Stopped;
152+
}
153+
154+
async fn process_info_messages(mut info_rx: mpsc::Receiver<Info>, logger: Arc<Logger>) {
155+
while let Some(info) = info_rx.recv().await {
156+
log_debug!(logger, "CBF node info: {}", info);
157+
}
158+
}
159+
160+
async fn process_warn_messages(
161+
mut warn_rx: mpsc::UnboundedReceiver<Warning>, logger: Arc<Logger>,
162+
) {
163+
while let Some(warning) = warn_rx.recv().await {
164+
log_debug!(logger, "CBF node warning: {}", warning);
165+
}
166+
}
167+
168+
async fn process_events(
169+
mut event_rx: mpsc::UnboundedReceiver<Event>, latest_tip: Arc<Mutex<Option<BlockHash>>>,
170+
logger: Arc<Logger>,
171+
) {
172+
while let Some(event) = event_rx.recv().await {
173+
match event {
174+
Event::FiltersSynced(sync_update) => {
175+
let tip = sync_update.tip();
176+
*latest_tip.lock().unwrap() = Some(tip.hash);
177+
log_info!(
178+
logger,
179+
"CBF filters synced to tip: height={}, hash={}",
180+
tip.height,
181+
tip.hash,
182+
);
183+
},
184+
Event::Block(indexed_block) => {
185+
log_trace!(logger, "CBF received block at height {}", indexed_block.height,);
186+
},
187+
Event::ChainUpdate(header_changes) => {
188+
log_debug!(logger, "CBF chain update: {:?}", header_changes);
189+
},
190+
Event::IndexedFilter(indexed_filter) => {
191+
log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),);
192+
},
193+
}
194+
}
195+
}
196+
197+
fn requester(&self) -> Result<Requester, Error> {
198+
let status = self.cbf_runtime_status.lock().unwrap();
199+
match &*status {
200+
CbfRuntimeStatus::Started { requester } => Ok(requester.clone()),
201+
CbfRuntimeStatus::Stopped => {
202+
debug_assert!(
203+
false,
204+
"We should have started the chain source before using the requester"
205+
);
206+
Err(Error::ConnectionFailed)
207+
},
208+
}
54209
}
55210

56211
/// Sync the on-chain wallet by scanning compact block filters for relevant transactions.
@@ -77,8 +232,55 @@ impl CbfChainSource {
77232
}
78233

79234
/// Broadcast a package of transactions via the P2P network.
80-
pub(crate) async fn process_broadcast_package(&self, _package: Vec<Transaction>) {
81-
log_error!(self.logger, "Transaction broadcasting via CBF is not yet implemented.");
235+
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
236+
let Ok(requester) = self.requester() else { return };
237+
238+
for tx in package {
239+
let txid = tx.compute_txid();
240+
let tx_bytes = tx.encode();
241+
let timeout_fut = tokio::time::timeout(
242+
Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs),
243+
requester.broadcast_tx(tx),
244+
);
245+
match timeout_fut.await {
246+
Ok(res) => match res {
247+
Ok(wtxid) => {
248+
log_trace!(
249+
self.logger,
250+
"Successfully broadcast transaction {} (wtxid: {})",
251+
txid,
252+
wtxid
253+
);
254+
},
255+
Err(e) => {
256+
log_error!(
257+
self.logger,
258+
"Failed to broadcast transaction {}: {:?}",
259+
txid,
260+
e
261+
);
262+
log_trace!(
263+
self.logger,
264+
"Failed broadcast transaction bytes: {}",
265+
log_bytes!(tx_bytes)
266+
);
267+
},
268+
},
269+
Err(e) => {
270+
log_error!(
271+
self.logger,
272+
"Failed to broadcast transaction due to timeout {}: {}",
273+
txid,
274+
e
275+
);
276+
log_trace!(
277+
self.logger,
278+
"Failed broadcast transaction bytes: {}",
279+
log_bytes!(tx_bytes)
280+
);
281+
},
282+
}
283+
}
82284
}
83285

84286
/// Register a transaction script for Lightning channel monitoring.

src/chain/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl ChainSource {
252252
ChainSourceKind::Esplora(_) => true,
253253
ChainSourceKind::Electrum { .. } => true,
254254
ChainSourceKind::Bitcoind { .. } => false,
255-
ChainSourceKind::Cbf { .. } => false,
255+
ChainSourceKind::Cbf { .. } => true,
256256
}
257257
}
258258

tests/common/mod.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use bitcoin::{
2626
use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD};
2727
use electrsd::{corepc_node, ElectrsD};
2828
use electrum_client::ElectrumApi;
29-
use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyncConfig};
29+
use ldk_node::config::{
30+
AsyncPaymentsRole, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
31+
};
3032
use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy};
3133
use ldk_node::io::sqlite_store::SqliteStore;
3234
use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
@@ -222,6 +224,11 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) {
222224
let mut bitcoind_conf = corepc_node::Conf::default();
223225
bitcoind_conf.network = "regtest";
224226
bitcoind_conf.args.push("-rest");
227+
228+
bitcoind_conf.p2p = corepc_node::P2P::Yes;
229+
bitcoind_conf.args.push("-blockfilterindex=1");
230+
bitcoind_conf.args.push("-peerblockfilters=1");
231+
225232
let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap();
226233

227234
let electrs_exe = env::var("ELECTRS_EXE")
@@ -326,6 +333,7 @@ pub(crate) enum TestChainSource<'a> {
326333
Electrum(&'a ElectrsD),
327334
BitcoindRpcSync(&'a BitcoinD),
328335
BitcoindRestSync(&'a BitcoinD),
336+
Cbf(&'a BitcoinD),
329337
}
330338

331339
#[derive(Clone, Copy)]
@@ -463,6 +471,13 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) ->
463471
rpc_password,
464472
);
465473
},
474+
TestChainSource::Cbf(bitcoind) => {
475+
let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF");
476+
let peer_addr = format!("{}", p2p_socket);
477+
let sync_config =
478+
CbfSyncConfig { background_sync_config: None, timeouts_config: Default::default() };
479+
builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config));
480+
},
466481
}
467482

468483
match &config.log_writer {
@@ -497,7 +512,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) ->
497512

498513
node.start().unwrap();
499514
assert!(node.status().is_running);
500-
assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some());
515+
516+
if !matches!(chain_source, TestChainSource::Cbf(_)) {
517+
assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some());
518+
}
501519
node
502520
}
503521

tests/integration_tests_rust.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2805,3 +2805,20 @@ async fn splice_in_with_all_balance() {
28052805
node_a.stop().unwrap();
28062806
node_b.stop().unwrap();
28072807
}
2808+
2809+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2810+
async fn start_stop_cbf() {
2811+
let (bitcoind, _electrsd) = setup_bitcoind_and_electrsd();
2812+
let chain_source = TestChainSource::Cbf(&bitcoind);
2813+
let node = setup_node(&chain_source, random_config(true));
2814+
2815+
assert!(node.status().is_running);
2816+
node.stop().unwrap();
2817+
assert_eq!(node.stop(), Err(NodeError::NotRunning));
2818+
2819+
node.start().unwrap();
2820+
assert_eq!(node.start(), Err(NodeError::AlreadyRunning));
2821+
assert!(node.status().is_running);
2822+
2823+
node.stop().unwrap();
2824+
}

0 commit comments

Comments
 (0)