@@ -106,7 +106,7 @@ use std::collections::HashMap;
106106use std:: default:: Default ;
107107use std:: net:: ToSocketAddrs ;
108108use std:: ops:: Deref ;
109- use std:: sync:: atomic:: AtomicU32 ;
109+ use std:: sync:: atomic:: { AtomicBool , AtomicU32 , Ordering } ;
110110use std:: sync:: { Arc , Mutex , RwLock } ;
111111use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
112112
@@ -143,15 +143,15 @@ use lightning::events::bump_transaction::{Input, Wallet as LdkWallet};
143143use lightning:: impl_writeable_tlv_based;
144144use lightning:: ln:: chan_utils:: { make_funding_redeemscript, FUNDING_TRANSACTION_WITNESS_WEIGHT } ;
145145use lightning:: ln:: channel_state:: { ChannelDetails as LdkChannelDetails , ChannelShutdownState } ;
146- use lightning:: ln:: channelmanager:: PaymentId ;
146+ use lightning:: ln:: channelmanager:: { PaymentId , RecipientOnionFields , Retry } ;
147147use lightning:: ln:: funding:: SpliceContribution ;
148148use lightning:: ln:: msgs:: SocketAddress ;
149149use lightning:: ln:: types:: ChannelId ;
150150use lightning:: routing:: gossip:: NodeAlias ;
151- use lightning:: routing:: router:: { Path , RouteHop } ;
151+ use lightning:: routing:: router:: { PaymentParameters , RouteParameters } ;
152+ use lightning:: sign:: EntropySource ;
152153use lightning:: util:: persist:: KVStoreSync ;
153154use lightning_background_processor:: process_events_async;
154- use lightning_types:: features:: { ChannelFeatures , NodeFeatures } ;
155155use liquidity:: { LSPS1Liquidity , LiquiditySource } ;
156156use logger:: { log_debug, log_error, log_info, log_trace, LdkLogger , Logger } ;
157157use payment:: asynchronous:: om_mailbox:: OnionMessageMailbox ;
@@ -219,7 +219,7 @@ pub struct Node {
219219 runtime_sync_intervals : Arc < RwLock < RuntimeSyncIntervals > > ,
220220 /// Shared RGS timestamp used by LocalGraphStore to persist the timestamp alongside the graph.
221221 local_rgs_timestamp : Arc < AtomicU32 > ,
222- accept_stale_channel_monitors : bool ,
222+ accept_stale_channel_monitors : AtomicBool ,
223223}
224224
225225impl Node {
@@ -261,7 +261,7 @@ impl Node {
261261 // When recovering stale monitors, defer chain sync until after the background
262262 // processor and peer connections have had time to heal the monitors via a
263263 // commitment round-trip (triggered by fee update after timer_tick).
264- let defer_chain_sync = self . accept_stale_channel_monitors ;
264+ let defer_chain_sync = self . accept_stale_channel_monitors . load ( Ordering :: Relaxed ) ;
265265 if !defer_chain_sync {
266266 self . spawn_chain_sync_task ( ) ;
267267 }
@@ -670,6 +670,7 @@ impl Node {
670670
671671 let channel_manager = Arc :: clone ( & self . channel_manager ) ;
672672 let chain_monitor = Arc :: clone ( & self . chain_monitor ) ;
673+ let keys_manager = Arc :: clone ( & self . keys_manager ) ;
673674 let heal_logger = Arc :: clone ( & self . logger ) ;
674675 let mut stop_healing = self . stop_sender . subscribe ( ) ;
675676 self . runtime . block_on ( async move {
@@ -679,10 +680,9 @@ impl Node {
679680 . iter ( )
680681 . filter ( |c| c. is_channel_ready )
681682 . filter_map ( |c| {
682- chain_monitor
683- . get_monitor ( c. channel_id )
684- . ok ( )
685- . map ( |m| ( c. channel_id , m. get_latest_update_id ( ) ) )
683+ chain_monitor. get_monitor ( c. channel_id ) . ok ( ) . map ( |m| {
684+ ( c. channel_id , c. counterparty . node_id , m. get_latest_update_id ( ) )
685+ } )
686686 } )
687687 . collect ( ) ;
688688
@@ -706,69 +706,59 @@ impl Node {
706706 _ = tokio:: time:: sleep( Duration :: from_secs( 5 ) ) => { }
707707 }
708708
709- // Build a single-hop probe path to the given counterparty.
710- let build_probe_path =
711- |counterparty_node_id : bitcoin:: secp256k1:: PublicKey , scid : u64 | -> Path {
712- Path {
713- hops : vec ! [ RouteHop {
714- pubkey: counterparty_node_id,
715- node_features: NodeFeatures :: empty( ) ,
716- short_channel_id: scid,
717- channel_features: ChannelFeatures :: empty( ) ,
718- fee_msat: 1000 ,
719- cltv_expiry_delta: 144 ,
720- maybe_announced_channel: true ,
721- } ] ,
722- blinded_tail : None ,
723- }
724- } ;
725-
726- // Send probes to force commitment round-trips on all channels.
727- // Probes work regardless of who the channel funder is.
728- for channel in channel_manager. list_channels ( ) {
729- if !channel. is_channel_ready {
730- continue ;
731- }
732-
733- let scid = match channel. short_channel_id {
734- Some ( scid) => scid,
735- None => continue ,
736- } ;
709+ // Send 1-sat keysend payments to trigger commitment round-trips.
710+ // We use real payments (not probes) because LDK rejects single-hop probes.
711+ // The HTLC add/fail cycle triggers commitment_signed exchanges that heal
712+ // the monitor. Cost: 1 sat per counterparty if keysend succeeds.
713+ let send_heal_payment = |node_id : bitcoin:: secp256k1:: PublicKey | {
714+ let payment_id = PaymentId ( keys_manager. get_secure_random_bytes ( ) ) ;
715+ let route_params = RouteParameters :: from_payment_params_and_value (
716+ PaymentParameters :: from_node_id ( node_id, 144 ) ,
717+ 1_000 , // 1 sat
718+ ) ;
719+ channel_manager. send_spontaneous_payment (
720+ None ,
721+ RecipientOnionFields :: spontaneous_empty ( ) ,
722+ payment_id,
723+ route_params,
724+ Retry :: Attempts ( 0 ) ,
725+ )
726+ } ;
737727
738- let path = build_probe_path ( channel . counterparty . node_id , scid ) ;
739- match channel_manager . send_probe ( path ) {
728+ for ( _ , counterparty_node_id , _ ) in & initial_update_ids {
729+ match send_heal_payment ( * counterparty_node_id ) {
740730 Ok ( _) => {
741731 log_info ! (
742732 heal_logger,
743- "Stale monitor recovery: sent probe on channel {}" ,
744- channel . channel_id
733+ "Stale monitor recovery: sent healing payment to {}" ,
734+ counterparty_node_id
745735 ) ;
746736 } ,
747737 Err ( e) => {
748738 log_error ! (
749739 heal_logger,
750- "Stale monitor recovery: failed to send probe on channel {}: {:?}" ,
751- channel . channel_id ,
740+ "Stale monitor recovery: failed to send healing payment to {}: {:?}" ,
741+ counterparty_node_id ,
752742 e
753743 ) ;
754744 } ,
755745 }
756746 }
757747
758748 // Poll monitor update_ids until all have advanced (healed) or timeout.
759- // Retry probes every 10s for channels that haven't healed yet (peer may
760- // have connected late).
749+ // Retry payments every 10s for channels that haven't healed yet (peer
750+ // may have connected late).
761751 let poll_interval = Duration :: from_secs ( 1 ) ;
762- let probe_retry_interval = Duration :: from_secs ( 10 ) ;
752+ let retry_interval = Duration :: from_secs ( 10 ) ;
763753 let max_wait = Duration :: from_secs ( 60 ) ;
764754 let start = tokio:: time:: Instant :: now ( ) ;
765- let mut last_probe_time = tokio:: time:: Instant :: now ( ) ;
755+ let mut last_retry_time = tokio:: time:: Instant :: now ( ) ;
766756
767757 loop {
768758 if start. elapsed ( ) >= max_wait {
769759 let unhealed_count = initial_update_ids
770760 . iter ( )
771- . filter ( |( ch_id, initial_id) | {
761+ . filter ( |( ch_id, _ , initial_id) | {
772762 chain_monitor
773763 . get_monitor ( * ch_id)
774764 . ok ( )
@@ -790,7 +780,7 @@ impl Node {
790780 break ;
791781 }
792782
793- let all_healed = initial_update_ids. iter ( ) . all ( |( ch_id, initial_id) | {
783+ let all_healed = initial_update_ids. iter ( ) . all ( |( ch_id, _ , initial_id) | {
794784 chain_monitor
795785 . get_monitor ( * ch_id)
796786 . ok ( )
@@ -807,10 +797,10 @@ impl Node {
807797 break ;
808798 }
809799
810- // Retry probes on unhealed channels (peer may have connected since last attempt) .
811- if last_probe_time . elapsed ( ) >= probe_retry_interval {
812- last_probe_time = tokio:: time:: Instant :: now ( ) ;
813- for ( ch_id, initial_id) in & initial_update_ids {
800+ // Retry healing payments for unhealed channels.
801+ if last_retry_time . elapsed ( ) >= retry_interval {
802+ last_retry_time = tokio:: time:: Instant :: now ( ) ;
803+ for ( ch_id, counterparty_node_id , initial_id) in & initial_update_ids {
814804 let healed = chain_monitor
815805 . get_monitor ( * ch_id)
816806 . ok ( )
@@ -820,23 +810,12 @@ impl Node {
820810 continue ;
821811 }
822812
823- // Find the channel details to construct and send a probe.
824- if let Some ( channel) = channel_manager
825- . list_channels ( )
826- . iter ( )
827- . find ( |c| c. channel_id == * ch_id && c. is_channel_ready )
828- . cloned ( )
829- {
830- if let Some ( scid) = channel. short_channel_id {
831- let path = build_probe_path ( channel. counterparty . node_id , scid) ;
832- if channel_manager. send_probe ( path) . is_ok ( ) {
833- log_info ! (
834- heal_logger,
835- "Stale monitor recovery: retried probe on channel {}" ,
836- ch_id
837- ) ;
838- }
839- }
813+ if send_heal_payment ( * counterparty_node_id) . is_ok ( ) {
814+ log_info ! (
815+ heal_logger,
816+ "Stale monitor recovery: retried healing payment for channel {}" ,
817+ ch_id
818+ ) ;
840819 }
841820 }
842821 }
@@ -851,8 +830,16 @@ impl Node {
851830 }
852831 } ) ;
853832
854- self . spawn_chain_sync_task ( ) ;
855- log_info ! ( self . logger, "Startup complete." ) ;
833+ // Clear the flag so subsequent start()/stop()/start() cycles don't re-trigger.
834+ self . accept_stale_channel_monitors . store ( false , Ordering :: Relaxed ) ;
835+
836+ // Only start chain sync if the node wasn't stopped during healing.
837+ if * self . is_running . read ( ) . unwrap ( ) {
838+ self . spawn_chain_sync_task ( ) ;
839+ log_info ! ( self . logger, "Startup complete." ) ;
840+ } else {
841+ log_info ! ( self . logger, "Node was stopped during stale monitor recovery." ) ;
842+ }
856843 return Ok ( ( ) ) ;
857844 }
858845
0 commit comments