Skip to content

Commit 710c9fe

Browse files
joostjagerclaude
andcommitted
Implement deferred monitor write queueing and flushing
Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent dcfb304 commit 710c9fe

4 files changed

Lines changed: 508 additions & 16 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,17 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
775775
/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
776776
/// no time is available, some features may be disabled, however the node will still operate fine.
777777
///
778+
/// Note that when deferred monitor writes are enabled on [`ChainMonitor`], this function flushes
779+
/// pending writes after persisting the [`ChannelManager`]. If the [`Persist`] implementation
780+
/// performs blocking I/O and returns [`Completed`] synchronously rather than returning
781+
/// [`InProgress`], this will block the async executor.
782+
///
783+
/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
784+
/// [`Persist`]: lightning::chain::chainmonitor::Persist
785+
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
786+
/// [`Completed`]: lightning::chain::ChannelMonitorUpdateStatus::Completed
787+
/// [`InProgress`]: lightning::chain::ChannelMonitorUpdateStatus::InProgress
788+
///
778789
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
779790
/// could setup `process_events_async` like this:
780791
/// ```
@@ -1118,9 +1129,18 @@ where
11181129
None => {},
11191130
}
11201131

1132+
// We capture pending_operation_count inside the persistence branch to
1133+
// avoid a race: ChannelManager handlers queue deferred monitor ops
1134+
// before the persistence flag is set. Capturing outside would let us
1135+
// observe pending ops while the flag is still unset, causing us to
1136+
// flush monitor writes without persisting the ChannelManager.
1137+
// Declared before futures so it outlives the Joiner (drop order).
1138+
let pending_monitor_writes;
1139+
11211140
let mut futures = Joiner::new();
11221141

11231142
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1143+
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
11241144
log_trace!(logger, "Persisting ChannelManager...");
11251145

11261146
let fut = async {
@@ -1131,7 +1151,12 @@ where
11311151
CHANNEL_MANAGER_PERSISTENCE_KEY,
11321152
channel_manager.get_cm().encode(),
11331153
)
1134-
.await
1154+
.await?;
1155+
1156+
// Flush monitor operations that were pending before we persisted. New updates
1157+
// that arrived after are left for the next iteration.
1158+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1159+
Ok(())
11351160
};
11361161
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
11371162
let mut fut = Box::pin(fut);
@@ -1373,6 +1398,7 @@ where
13731398
// After we exit, ensure we persist the ChannelManager one final time - this avoids
13741399
// some races where users quit while channel updates were in-flight, with
13751400
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1401+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
13761402
kv_store
13771403
.write(
13781404
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1381,6 +1407,10 @@ where
13811407
channel_manager.get_cm().encode(),
13821408
)
13831409
.await?;
1410+
1411+
// Flush monitor operations that were pending before final persistence.
1412+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1413+
13841414
if let Some(ref scorer) = scorer {
13851415
kv_store
13861416
.write(
@@ -1684,7 +1714,15 @@ impl BackgroundProcessor {
16841714
channel_manager.get_cm().timer_tick_occurred();
16851715
last_freshness_call = Instant::now();
16861716
}
1717+
16871718
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1719+
// We capture pending_operation_count inside the persistence
1720+
// branch to avoid a race: ChannelManager handlers queue
1721+
// deferred monitor ops before the persistence flag is set.
1722+
// Capturing outside would let us observe pending ops while
1723+
// the flag is still unset, causing us to flush monitor
1724+
// writes without persisting the ChannelManager.
1725+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
16881726
log_trace!(logger, "Persisting ChannelManager...");
16891727
(kv_store.write(
16901728
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1693,6 +1731,10 @@ impl BackgroundProcessor {
16931731
channel_manager.get_cm().encode(),
16941732
))?;
16951733
log_trace!(logger, "Done persisting ChannelManager.");
1734+
1735+
// Flush monitor operations that were pending before we persisted.
1736+
// New updates that arrived after are left for the next iteration.
1737+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
16961738
}
16971739

16981740
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
@@ -1809,12 +1851,17 @@ impl BackgroundProcessor {
18091851
// After we exit, ensure we persist the ChannelManager one final time - this avoids
18101852
// some races where users quit while channel updates were in-flight, with
18111853
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1854+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
18121855
kv_store.write(
18131856
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
18141857
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
18151858
CHANNEL_MANAGER_PERSISTENCE_KEY,
18161859
channel_manager.get_cm().encode(),
18171860
)?;
1861+
1862+
// Flush monitor operations that were pending before final persistence.
1863+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1864+
18181865
if let Some(ref scorer) = scorer {
18191866
kv_store.write(
18201867
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1896,9 +1943,10 @@ mod tests {
18961943
use bitcoin::transaction::{Transaction, TxOut};
18971944
use bitcoin::{Amount, ScriptBuf, Txid};
18981945
use core::sync::atomic::{AtomicBool, Ordering};
1946+
use lightning::chain::chainmonitor;
18991947
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19001948
use lightning::chain::transaction::OutPoint;
1901-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1949+
use lightning::chain::{BestBlock, Confirm, Filter};
19021950
use lightning::events::{Event, PathFailure, ReplayEvent};
19031951
use lightning::ln::channelmanager;
19041952
use lightning::ln::channelmanager::{
@@ -2444,6 +2492,7 @@ mod tests {
24442492
Arc::clone(&kv_store),
24452493
Arc::clone(&keys_manager),
24462494
keys_manager.get_peer_storage_key(),
2495+
true,
24472496
));
24482497
let best_block = BestBlock::from_network(network);
24492498
let params = ChainParameters { network, best_block };
@@ -2567,6 +2616,8 @@ mod tests {
25672616
(persist_dir, nodes)
25682617
}
25692618

2619+
/// Opens a channel between two nodes without a running `BackgroundProcessor`,
2620+
/// so deferred monitor operations are flushed manually at each step.
25702621
macro_rules! open_channel {
25712622
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
25722623
begin_open_channel!($node_a, $node_b, $channel_value);
@@ -2582,19 +2633,31 @@ mod tests {
25822633
tx.clone(),
25832634
)
25842635
.unwrap();
2636+
// funding_transaction_generated does not call watch_channel, so no
2637+
// deferred op is queued and FundingCreated is available immediately.
25852638
let msg_a = get_event_msg!(
25862639
$node_a,
25872640
MessageSendEvent::SendFundingCreated,
25882641
$node_b.node.get_our_node_id()
25892642
);
25902643
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2644+
// Flush node_b's new monitor (watch_channel) so it releases the
2645+
// FundingSigned message.
2646+
$node_b
2647+
.chain_monitor
2648+
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
25912649
get_event!($node_b, Event::ChannelPending);
25922650
let msg_b = get_event_msg!(
25932651
$node_b,
25942652
MessageSendEvent::SendFundingSigned,
25952653
$node_a.node.get_our_node_id()
25962654
);
25972655
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2656+
// Flush node_a's new monitor (watch_channel) queued by
2657+
// handle_funding_signed.
2658+
$node_a
2659+
.chain_monitor
2660+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25982661
get_event!($node_a, Event::ChannelPending);
25992662
tx
26002663
}};
@@ -2720,6 +2783,20 @@ mod tests {
27202783
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
27212784
}
27222785

2786+
/// Waits until the background processor has flushed all pending deferred monitor
2787+
/// operations for the given node. Panics if the pending count does not reach zero
2788+
/// within `EVENT_DEADLINE`.
2789+
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
2790+
let start = std::time::Instant::now();
2791+
while chain_monitor.pending_operation_count() > 0 {
2792+
assert!(
2793+
start.elapsed() < EVENT_DEADLINE,
2794+
"Pending monitor operations were not flushed within deadline"
2795+
);
2796+
std::thread::sleep(Duration::from_millis(10));
2797+
}
2798+
}
2799+
27232800
#[test]
27242801
fn test_background_processor() {
27252802
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -3060,11 +3137,21 @@ mod tests {
30603137
.node
30613138
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30623139
.unwrap();
3140+
// funding_transaction_generated does not call watch_channel, so no deferred op is
3141+
// queued and the FundingCreated message is available immediately.
30633142
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30643143
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3144+
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
3145+
// events and FundingSigned are released.
3146+
nodes[1]
3147+
.chain_monitor
3148+
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
30653149
get_event!(nodes[1], Event::ChannelPending);
30663150
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30673151
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3152+
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
3153+
// handle_funding_signed.
3154+
wait_for_flushed(&nodes[0].chain_monitor);
30683155
channel_pending_recv
30693156
.recv_timeout(EVENT_DEADLINE)
30703157
.expect("ChannelPending not handled within deadline");
@@ -3125,6 +3212,9 @@ mod tests {
31253212
error_message.to_string(),
31263213
)
31273214
.unwrap();
3215+
// Wait for the bg processor to flush the monitor update triggered by force close
3216+
// so the commitment tx is broadcast.
3217+
wait_for_flushed(&nodes[0].chain_monitor);
31283218
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31293219
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31303220

0 commit comments

Comments
 (0)