Skip to content

Commit 939f798

Browse files
joostjagerclaude
andcommitted
Extract watch_channel_internal/update_channel_internal from Watch impl
Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e03d010 commit 939f798

1 file changed

Lines changed: 156 additions & 144 deletions

File tree

lightning/src/chain/chainmonitor.rs

Lines changed: 156 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,6 +1060,160 @@ where
10601060

10611061
Ok(ChannelMonitorUpdateStatus::Completed)
10621062
}
1063+
1064+
fn watch_channel_internal(
1065+
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1066+
) -> Result<ChannelMonitorUpdateStatus, ()> {
1067+
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1068+
let mut monitors = self.monitors.write().unwrap();
1069+
let entry = match monitors.entry(channel_id) {
1070+
hash_map::Entry::Occupied(_) => {
1071+
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1072+
return Err(());
1073+
},
1074+
hash_map::Entry::Vacant(e) => e,
1075+
};
1076+
log_trace!(logger, "Got new ChannelMonitor");
1077+
let update_id = monitor.get_latest_update_id();
1078+
let mut pending_monitor_updates = Vec::new();
1079+
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1080+
match persist_res {
1081+
ChannelMonitorUpdateStatus::InProgress => {
1082+
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
1083+
pending_monitor_updates.push(update_id);
1084+
},
1085+
ChannelMonitorUpdateStatus::Completed => {
1086+
log_info!(logger, "Persistence of new ChannelMonitor completed",);
1087+
},
1088+
ChannelMonitorUpdateStatus::UnrecoverableError => {
1089+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1090+
log_error!(logger, "{}", err_str);
1091+
panic!("{}", err_str);
1092+
},
1093+
}
1094+
if let Some(ref chain_source) = self.chain_source {
1095+
monitor.load_outputs_to_watch(chain_source, &self.logger);
1096+
}
1097+
entry.insert(MonitorHolder {
1098+
monitor,
1099+
pending_monitor_updates: Mutex::new(pending_monitor_updates),
1100+
});
1101+
Ok(persist_res)
1102+
}
1103+
1104+
fn update_channel_internal(
1105+
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
1106+
) -> ChannelMonitorUpdateStatus {
1107+
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
1108+
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
1109+
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1110+
// Update the monitor that watches the channel referred to by the given outpoint.
1111+
let monitors = self.monitors.read().unwrap();
1112+
match monitors.get(&channel_id) {
1113+
None => {
1114+
let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1115+
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1116+
1117+
// We should never ever trigger this from within ChannelManager. Technically a
1118+
// user could use this object with some proxying in between which makes this
1119+
// possible, but in tests and fuzzing, this should be a panic.
1120+
#[cfg(debug_assertions)]
1121+
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1122+
#[cfg(not(debug_assertions))]
1123+
ChannelMonitorUpdateStatus::InProgress
1124+
},
1125+
Some(monitor_state) => {
1126+
let monitor = &monitor_state.monitor;
1127+
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1128+
log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,);
1129+
1130+
// We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we
1131+
// have well-ordered updates from the users' point of view. See the
1132+
// `pending_monitor_updates` docs for more.
1133+
let mut pending_monitor_updates =
1134+
monitor_state.pending_monitor_updates.lock().unwrap();
1135+
let update_res = monitor.update_monitor(
1136+
update,
1137+
&self.broadcaster,
1138+
&self.fee_estimator,
1139+
&self.logger,
1140+
);
1141+
1142+
let update_id = update.update_id;
1143+
let persist_res = if update_res.is_err() {
1144+
// Even if updating the monitor returns an error, the monitor's state will
1145+
// still be changed. Therefore, we should persist the updated monitor despite the error.
1146+
// We don't want to persist a `monitor_update` which results in a failure to apply later
1147+
// while reading `channel_monitor` with updates from storage. Instead, we should persist
1148+
// the entire `channel_monitor` here.
1149+
log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor");
1150+
self.persister.update_persisted_channel(
1151+
monitor.persistence_key(),
1152+
None,
1153+
monitor,
1154+
)
1155+
} else {
1156+
self.persister.update_persisted_channel(
1157+
monitor.persistence_key(),
1158+
Some(update),
1159+
monitor,
1160+
)
1161+
};
1162+
match persist_res {
1163+
ChannelMonitorUpdateStatus::InProgress => {
1164+
pending_monitor_updates.push(update_id);
1165+
log_debug!(
1166+
logger,
1167+
"Persistence of ChannelMonitorUpdate id {:?} in progress",
1168+
update_id,
1169+
);
1170+
},
1171+
ChannelMonitorUpdateStatus::Completed => {
1172+
log_debug!(
1173+
logger,
1174+
"Persistence of ChannelMonitorUpdate id {:?} completed",
1175+
update_id,
1176+
);
1177+
},
1178+
ChannelMonitorUpdateStatus::UnrecoverableError => {
1179+
// Take the monitors lock for writing so that we poison it and any future
1180+
// operations going forward fail immediately.
1181+
core::mem::drop(pending_monitor_updates);
1182+
core::mem::drop(monitors);
1183+
let _poison = self.monitors.write().unwrap();
1184+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1185+
log_error!(logger, "{}", err_str);
1186+
panic!("{}", err_str);
1187+
},
1188+
}
1189+
1190+
// We may need to start monitoring for any alternative funding transactions.
1191+
if let Some(ref chain_source) = self.chain_source {
1192+
for (funding_outpoint, funding_script) in
1193+
update.internal_renegotiated_funding_data()
1194+
{
1195+
log_trace!(
1196+
logger,
1197+
"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1198+
funding_outpoint
1199+
);
1200+
chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1201+
chain_source.register_output(WatchedOutput {
1202+
block_hash: None,
1203+
outpoint: funding_outpoint,
1204+
script_pubkey: funding_script,
1205+
});
1206+
}
1207+
}
1208+
1209+
if update_res.is_err() {
1210+
ChannelMonitorUpdateStatus::InProgress
1211+
} else {
1212+
persist_res
1213+
}
1214+
},
1215+
}
1216+
}
10631217
}
10641218

10651219
impl<
@@ -1274,155 +1428,13 @@ where
12741428
fn watch_channel(
12751429
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
12761430
) -> Result<ChannelMonitorUpdateStatus, ()> {
1277-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1278-
let mut monitors = self.monitors.write().unwrap();
1279-
let entry = match monitors.entry(channel_id) {
1280-
hash_map::Entry::Occupied(_) => {
1281-
log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present");
1282-
return Err(());
1283-
},
1284-
hash_map::Entry::Vacant(e) => e,
1285-
};
1286-
log_trace!(logger, "Got new ChannelMonitor");
1287-
let update_id = monitor.get_latest_update_id();
1288-
let mut pending_monitor_updates = Vec::new();
1289-
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1290-
match persist_res {
1291-
ChannelMonitorUpdateStatus::InProgress => {
1292-
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
1293-
pending_monitor_updates.push(update_id);
1294-
},
1295-
ChannelMonitorUpdateStatus::Completed => {
1296-
log_info!(logger, "Persistence of new ChannelMonitor completed",);
1297-
},
1298-
ChannelMonitorUpdateStatus::UnrecoverableError => {
1299-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1300-
log_error!(logger, "{}", err_str);
1301-
panic!("{}", err_str);
1302-
},
1303-
}
1304-
if let Some(ref chain_source) = self.chain_source {
1305-
monitor.load_outputs_to_watch(chain_source, &self.logger);
1306-
}
1307-
entry.insert(MonitorHolder {
1308-
monitor,
1309-
pending_monitor_updates: Mutex::new(pending_monitor_updates),
1310-
});
1311-
Ok(persist_res)
1431+
self.watch_channel_internal(channel_id, monitor)
13121432
}
13131433

13141434
fn update_channel(
13151435
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
13161436
) -> ChannelMonitorUpdateStatus {
1317-
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
1318-
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
1319-
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
1320-
// Update the monitor that watches the channel referred to by the given outpoint.
1321-
let monitors = self.monitors.read().unwrap();
1322-
match monitors.get(&channel_id) {
1323-
None => {
1324-
let logger = WithContext::from(&self.logger, None, Some(channel_id), None);
1325-
log_error!(logger, "Failed to update channel monitor: no such monitor registered");
1326-
1327-
// We should never ever trigger this from within ChannelManager. Technically a
1328-
// user could use this object with some proxying in between which makes this
1329-
// possible, but in tests and fuzzing, this should be a panic.
1330-
#[cfg(debug_assertions)]
1331-
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
1332-
#[cfg(not(debug_assertions))]
1333-
ChannelMonitorUpdateStatus::InProgress
1334-
},
1335-
Some(monitor_state) => {
1336-
let monitor = &monitor_state.monitor;
1337-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
1338-
log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,);
1339-
1340-
// We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we
1341-
// have well-ordered updates from the users' point of view. See the
1342-
// `pending_monitor_updates` docs for more.
1343-
let mut pending_monitor_updates =
1344-
monitor_state.pending_monitor_updates.lock().unwrap();
1345-
let update_res = monitor.update_monitor(
1346-
update,
1347-
&self.broadcaster,
1348-
&self.fee_estimator,
1349-
&self.logger,
1350-
);
1351-
1352-
let update_id = update.update_id;
1353-
let persist_res = if update_res.is_err() {
1354-
// Even if updating the monitor returns an error, the monitor's state will
1355-
// still be changed. Therefore, we should persist the updated monitor despite the error.
1356-
// We don't want to persist a `monitor_update` which results in a failure to apply later
1357-
// while reading `channel_monitor` with updates from storage. Instead, we should persist
1358-
// the entire `channel_monitor` here.
1359-
log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor");
1360-
self.persister.update_persisted_channel(
1361-
monitor.persistence_key(),
1362-
None,
1363-
monitor,
1364-
)
1365-
} else {
1366-
self.persister.update_persisted_channel(
1367-
monitor.persistence_key(),
1368-
Some(update),
1369-
monitor,
1370-
)
1371-
};
1372-
match persist_res {
1373-
ChannelMonitorUpdateStatus::InProgress => {
1374-
pending_monitor_updates.push(update_id);
1375-
log_debug!(
1376-
logger,
1377-
"Persistence of ChannelMonitorUpdate id {:?} in progress",
1378-
update_id,
1379-
);
1380-
},
1381-
ChannelMonitorUpdateStatus::Completed => {
1382-
log_debug!(
1383-
logger,
1384-
"Persistence of ChannelMonitorUpdate id {:?} completed",
1385-
update_id,
1386-
);
1387-
},
1388-
ChannelMonitorUpdateStatus::UnrecoverableError => {
1389-
// Take the monitors lock for writing so that we poison it and any future
1390-
// operations going forward fail immediately.
1391-
core::mem::drop(pending_monitor_updates);
1392-
core::mem::drop(monitors);
1393-
let _poison = self.monitors.write().unwrap();
1394-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
1395-
log_error!(logger, "{}", err_str);
1396-
panic!("{}", err_str);
1397-
},
1398-
}
1399-
1400-
// We may need to start monitoring for any alternative funding transactions.
1401-
if let Some(ref chain_source) = self.chain_source {
1402-
for (funding_outpoint, funding_script) in
1403-
update.internal_renegotiated_funding_data()
1404-
{
1405-
log_trace!(
1406-
logger,
1407-
"Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends",
1408-
funding_outpoint
1409-
);
1410-
chain_source.register_tx(&funding_outpoint.txid, &funding_script);
1411-
chain_source.register_output(WatchedOutput {
1412-
block_hash: None,
1413-
outpoint: funding_outpoint,
1414-
script_pubkey: funding_script,
1415-
});
1416-
}
1417-
}
1418-
1419-
if update_res.is_err() {
1420-
ChannelMonitorUpdateStatus::InProgress
1421-
} else {
1422-
persist_res
1423-
}
1424-
},
1425-
}
1437+
self.update_channel_internal(channel_id, update)
14261438
}
14271439

14281440
fn release_pending_monitor_events(

0 commit comments

Comments
 (0)