Skip to content

Commit b2cdde7

Browse files
committed
Merge remote-tracking branch 'upstream/main' into refactor/temp-safety-refactor-2
2 parents c1adc5a + 26be457 commit b2cdde7

5 files changed

Lines changed: 185 additions & 213 deletions

File tree

src/event.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub trait StateMachineExecutor<
2828
let event = Event::with(endpoints, runtime.identity(), source, request);
2929
let sm_display = sm.to_string();
3030
let sm_name = sm.name();
31+
let sm_log_level = sm.log_level();
3132
if let Some(new_sm) = sm.next(event, runtime)? {
3233
let new_sm_display = new_sm.to_string();
3334
// relegate state transitions staying the same to debug
@@ -38,7 +39,8 @@ pub trait StateMachineExecutor<
3839
new_sm.bright_green_bold()
3940
);
4041
} else {
41-
info!(
42+
log!(
43+
new_sm.log_level(),
4244
"{} state transition {} -> {}",
4345
new_sm.name(),
4446
sm_display.red_bold(),
@@ -47,7 +49,8 @@ pub trait StateMachineExecutor<
4749
}
4850
Ok(Some(new_sm))
4951
} else {
50-
info!(
52+
log!(
53+
sm_log_level,
5154
"{} state machine ended {} -> {}",
5255
sm_name,
5356
sm_display.red_bold(),
@@ -70,7 +73,13 @@ where
7073
where
7174
Self: Sized;
7275

76+
/// Return the display name of the state machine
7377
fn name(&self) -> String;
78+
79+
/// Return the log level to use for state transitions. Info by default
80+
fn log_level(&self) -> log::Level {
81+
log::Level::Info
82+
}
7483
}
7584

7685
/// Event changing state machine state, consisting of a certain P2P or RPC `request` sent from some

src/farcasterd/syncer_state_machine.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ impl StateMachine<Runtime, Error> for SyncerStateMachine {
9090
fn name(&self) -> String {
9191
"Syncer".to_string()
9292
}
93+
94+
fn log_level(&self) -> log::Level {
95+
log::Level::Debug
96+
}
9397
}
9498

9599
pub struct SyncerStateMachineExecutor {}

src/swapd/runtime.rs

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use super::{
1010
temporal_safety::TemporalSafety,
1111
StateReport,
1212
};
13-
use crate::service::{Endpoints, Reporter};
1413
use crate::swapd::temporal_safety::SWEEP_MONERO_THRESHOLD;
1514
use crate::swapd::Opts;
1615
use crate::syncerd::bitcoin_syncer::p2wpkh_signed_tx_fee;
@@ -23,6 +22,10 @@ use crate::{
2322
bus::{BusMsg, Outcome, ServiceBus},
2423
syncerd::{HeightChanged, TransactionRetrieved, XmrAddressAddendum},
2524
};
25+
use crate::{
26+
service::{Endpoints, Reporter},
27+
syncerd::AddressTransaction,
28+
};
2629
use crate::{CtlServer, Error, LogStyle, Service, ServiceConfig, ServiceId};
2730

2831
use std::time::{Duration, SystemTime};
@@ -107,9 +110,7 @@ pub fn run(config: ServiceConfig, opts: Opts) -> Result<(), Error> {
107110
monero_height: 0,
108111
bitcoin_height: 0,
109112
confirmation_bound: 50000,
110-
lock_tx_confs: None,
111-
cancel_tx_confs: None,
112-
buy_tx_confs: None,
113+
last_tx_event: none!(),
113114
network,
114115
bitcoin_syncer: ServiceId::Syncer(Blockchain::Bitcoin, network),
115116
monero_syncer: ServiceId::Syncer(Blockchain::Monero, network),
@@ -542,9 +543,23 @@ impl Runtime {
542543
self.temporal_safety.acc_finality,
543544
endpoints,
544545
);
546+
547+
// saving requests of interest for later replaying latest event
548+
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
549+
self.syncer_state
550+
.last_tx_event
551+
.insert(*txlabel, request.clone());
552+
}
545553
}
546554

547-
Event::AddressTransaction(_) => {}
555+
Event::AddressTransaction(AddressTransaction { id, .. }) => {
556+
// saving requests of interest for later replaying latest event
557+
if let Some(txlabel) = self.syncer_state.tasks.watched_addrs.get(id) {
558+
self.syncer_state
559+
.last_tx_event
560+
.insert(*txlabel, request.clone());
561+
}
562+
}
548563

549564
Event::SweepSuccess(_) => {}
550565

@@ -596,19 +611,11 @@ impl Runtime {
596611
self.temporal_safety.arb_finality,
597612
endpoints,
598613
);
599-
let txlabel = self.syncer_state.tasks.watched_txs.get(id);
600614
// saving requests of interest for later replaying latest event
601-
match txlabel {
602-
Some(&TxLabel::Lock) => {
603-
self.syncer_state.lock_tx_confs = Some(request.clone());
604-
}
605-
Some(&TxLabel::Cancel) => {
606-
self.syncer_state.cancel_tx_confs = Some(request.clone());
607-
}
608-
Some(&TxLabel::Buy) => {
609-
self.syncer_state.buy_tx_confs = Some(request.clone())
610-
}
611-
_ => {}
615+
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
616+
self.syncer_state
617+
.last_tx_event
618+
.insert(*txlabel, request.clone());
612619
}
613620
}
614621

@@ -624,13 +631,27 @@ impl Runtime {
624631
self.temporal_safety.arb_finality,
625632
endpoints,
626633
);
634+
// saving requests of interest for later replaying latest event
635+
if let Some(txlabel) = self.syncer_state.tasks.watched_txs.get(id) {
636+
self.syncer_state
637+
.last_tx_event
638+
.insert(*txlabel, request.clone());
639+
}
627640
}
628641

629642
Event::TransactionBroadcasted(event) => {
630643
self.syncer_state.transaction_broadcasted(event);
631644
}
632645

633-
Event::AddressTransaction(_) => {}
646+
Event::AddressTransaction(AddressTransaction { id, .. }) => {
647+
// saving requests of interest for later replaying latest event
648+
if let Some(txlabel) = self.syncer_state.tasks.watched_addrs.get(id) {
649+
self.syncer_state
650+
.last_tx_event
651+
.insert(*txlabel, request.clone());
652+
}
653+
self.log_debug(event);
654+
}
634655

635656
Event::TaskAborted(event) => {
636657
self.log_debug(event);
@@ -705,15 +726,9 @@ impl Runtime {
705726
if let Some(peer_msg) = self.unhandled_peer_message.clone() {
706727
self.handle_msg(endpoints, source.clone(), peer_msg)?;
707728
}
708-
// Replay confirmation events to ensure we immediately advance through states that can be skipped
709-
if let Some(buy_tx_confs_req) = self.syncer_state.buy_tx_confs.clone() {
710-
self.handle_sync(endpoints, source.clone(), buy_tx_confs_req)?;
711-
}
712-
if let Some(lock_tx_confs_req) = self.syncer_state.lock_tx_confs.clone() {
713-
self.handle_sync(endpoints, source.clone(), lock_tx_confs_req)?;
714-
}
715-
if let Some(cancel_tx_confs_req) = self.syncer_state.cancel_tx_confs.clone() {
716-
self.handle_sync(endpoints, source, cancel_tx_confs_req)?;
729+
// Replay syncer events to ensure we immediately advance through states that can be skipped
730+
for event in self.syncer_state.last_tx_event.clone().values() {
731+
self.handle_sync(endpoints, source.clone(), event.clone())?;
717732
}
718733
} else if let BusMsg::P2p(peer_msg) = msg {
719734
self.unhandled_peer_message = Some(peer_msg);

0 commit comments

Comments
 (0)