Skip to content

Commit 4fd0ad7

Browse files
authored
chore: break out session state variants into their own modules (#319)
* Refactor each session state into its own module * Rename active session awaitor to schedule awaiter to avoid confusion with the Active session state
1 parent b984ecd commit 4fd0ad7

10 files changed

Lines changed: 278 additions & 236 deletions

File tree

crates/hotfix/src/initiator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ async fn establish_connection<Outbound: OutboundMessage>(
107107
completion_tx: watch::Sender<bool>,
108108
) {
109109
loop {
110-
if session_ref.await_active_session_time().await.is_err() {
111-
warn!("session task terminated when checking active session time");
110+
if session_ref.await_in_schedule().await.is_err() {
111+
warn!("session task terminated when checking schedule");
112112
break;
113113
}
114114

crates/hotfix/src/session.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ pub(crate) use crate::session::session_ref::InternalSessionRef;
4545
pub use crate::session::session_ref::InternalSessionRef;
4646
use crate::session::session_ref::OutboundRequest;
4747
use crate::session::state::SessionState;
48-
use crate::session::state::{AwaitingResendTransitionOutcome, TestRequestId};
48+
use crate::session::state::{
49+
AwaitingLogonState, AwaitingLogoutState, AwaitingResendTransitionOutcome, TestRequestId,
50+
};
4951
use crate::session_schedule::{SessionPeriodComparison, SessionSchedule};
5052
use crate::store::MessageStore;
5153
use crate::transport::writer::WriterRef;
@@ -200,7 +202,7 @@ where
200202
}
201203
}
202204

203-
if let SessionState::AwaitingLogon { .. } = &mut self.state {
205+
if let SessionState::AwaitingLogon(_) = &mut self.state {
204206
// TODO: should this (and all inbound message processing) logic be pushed into the state?
205207
if message_type != Logon::MSG_TYPE {
206208
self.state.disconnect_writer().await;
@@ -332,11 +334,11 @@ where
332334
}
333335

334336
async fn on_connect(&mut self, writer: WriterRef) -> Result<(), SessionOperationError> {
335-
self.state = SessionState::AwaitingLogon {
337+
self.state = SessionState::AwaitingLogon(AwaitingLogonState {
336338
writer,
337339
logon_sent: false,
338340
logon_timeout: Instant::now() + Duration::from_secs(self.config.logon_timeout),
339-
};
341+
});
340342
self.reset_peer_timer(None);
341343
self.send_logon().await?;
342344

@@ -345,23 +347,23 @@ where
345347

346348
async fn on_disconnect(&mut self, reason: String) {
347349
match self.state {
348-
SessionState::Active { .. }
349-
| SessionState::AwaitingLogon { .. }
350+
SessionState::Active(_)
351+
| SessionState::AwaitingLogon(_)
350352
| SessionState::AwaitingResend(_) => {
351353
self.state.disconnect_writer().await;
352354
self.state = SessionState::new_disconnected(true, &reason);
353355
}
354-
SessionState::Disconnected { .. } => {
356+
SessionState::Disconnected(_) => {
355357
warn!("disconnect message was received, but the session is already disconnected")
356358
}
357-
SessionState::AwaitingLogout { reconnect, .. } => {
359+
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
358360
self.state = SessionState::new_disconnected(reconnect, &reason);
359361
}
360362
}
361363
}
362364

363365
async fn on_logon(&mut self, message: &Message) -> Result<(), SessionOperationError> {
364-
if let SessionState::AwaitingLogon { writer, .. } = &self.state {
366+
if let SessionState::AwaitingLogon(AwaitingLogonState { writer, .. }) = &self.state {
365367
match self.verify_message(message, true, true) {
366368
Ok(_) => {
367369
// happy logon flow, the session is now active
@@ -395,7 +397,7 @@ where
395397
// if the session is already disconnected, we have nothing else to do
396398
SessionState::Disconnected(..) => {}
397399
// if we initiated the logout, preserve the reconnect flag
398-
SessionState::AwaitingLogout { reconnect, .. } => {
400+
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
399401
self.state.disconnect_writer().await;
400402
self.state = SessionState::new_disconnected(reconnect, "logout completed");
401403
}
@@ -1039,8 +1041,8 @@ where
10391041
warn!("tried to respond to ShouldReconnect query but the receiver is gone");
10401042
}
10411043
}
1042-
SessionEvent::AwaitingActiveSession(responder) => {
1043-
self.state.register_session_awaiter(responder);
1044+
SessionEvent::AwaitSchedule(responder) => {
1045+
self.state.register_schedule_awaiter(responder);
10441046
}
10451047
}
10461048
}
@@ -1117,7 +1119,7 @@ where
11171119
let is_active = self.schedule.is_active_at(&now);
11181120

11191121
if is_active {
1120-
self.state.notify_session_awaiter();
1122+
self.state.notify_schedule_awaiter();
11211123
match self
11221124
.schedule
11231125
.is_same_session_period(&self.store.creation_time(), &now)

crates/hotfix/src/session/event.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ pub enum SessionEvent {
1313
Connected(WriterRef),
1414
/// Ask the session whether we should attempt to reconnect.
1515
ShouldReconnect(oneshot::Sender<bool>),
16-
/// Ask the session to notify us when the session is active.
17-
AwaitingActiveSession(oneshot::Sender<AwaitingActiveSessionResponse>),
16+
/// Ask the session to notify us when the schedule indicates we should connect.
17+
AwaitSchedule(oneshot::Sender<ScheduleResponse>),
1818
}
1919

20-
/// The response sent by the session to AwaitingActiveSession messages.
20+
/// The response sent by the session to AwaitSchedule messages.
2121
///
22-
/// This doesn't include an Inactive variant, as the session won't respond until
23-
/// it's active or in a state that indicates it should just be shut down due to an
24-
/// unrecoverable error.
22+
/// This doesn't include an out-of-schedule variant, as the session won't respond
23+
/// until the schedule indicates we should connect or the session is in a state that
24+
/// indicates it should just be shut down due to an unrecoverable error.
2525
#[derive(Debug, Clone, Copy)]
26-
pub enum AwaitingActiveSessionResponse {
27-
/// The session is now active and ready to connect.
28-
Active,
26+
pub enum ScheduleResponse {
27+
/// The schedule indicates we should connect.
28+
InSchedule,
2929
/// The session should be shut down due to an unrecoverable error.
3030
Shutdown,
3131
}

crates/hotfix/src/session/session_ref.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::message::{OutboundMessage, RawFixMessage};
77
use crate::session::Session;
88
use crate::session::admin_request::AdminRequest;
99
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
10-
use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent};
10+
use crate::session::event::{ScheduleResponse, SessionEvent};
1111
use crate::store::MessageStore;
1212
use crate::transport::writer::WriterRef;
1313
use crate::{Application, session};
@@ -82,15 +82,15 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
8282
Ok(receiver.await?)
8383
}
8484

85-
pub async fn await_active_session_time(&self) -> Result<(), SessionGone> {
86-
debug!("awaiting active session time");
87-
let (sender, receiver) = oneshot::channel::<AwaitingActiveSessionResponse>();
85+
pub async fn await_in_schedule(&self) -> Result<(), SessionGone> {
86+
debug!("awaiting in-schedule time");
87+
let (sender, receiver) = oneshot::channel::<ScheduleResponse>();
8888
self.event_sender
89-
.send(SessionEvent::AwaitingActiveSession(sender))
89+
.send(SessionEvent::AwaitSchedule(sender))
9090
.await?;
9191
receiver.await?;
9292

93-
debug!("resuming connection as session is active");
93+
debug!("resuming connection as schedule is active");
9494
Ok(())
9595
}
9696
}

0 commit comments

Comments
 (0)