Skip to content

Commit e0fd9a7

Browse files
committed
Move logout functions into session state
1 parent e10124b commit e0fd9a7

2 files changed

Lines changed: 120 additions & 69 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ where
392392
}
393393

394394
if self.state.is_logged_on() {
395-
self.send_logout("Logout acknowledged").await?;
395+
self.state
396+
.send_logout(&mut self.ctx, "Logout acknowledged")
397+
.await?;
396398
}
397399

398400
self.ctx
@@ -649,10 +651,12 @@ where
649651
}
650652

651653
async fn handle_incorrect_begin_string(&mut self, received_begin_string: String) {
652-
self.logout_and_terminate(&format!(
653-
"beginString={received_begin_string} is not supported"
654-
))
655-
.await;
654+
self.state
655+
.logout_and_terminate(
656+
&mut self.ctx,
657+
&format!("beginString={received_begin_string} is not supported"),
658+
)
659+
.await;
656660
}
657661

658662
async fn handle_incorrect_comp_id(
@@ -671,7 +675,8 @@ where
671675
error!("failed to send reject message with invalid comp ID: {err}");
672676
};
673677

674-
self.logout_and_terminate("incorrect comp ID received")
678+
self.state
679+
.logout_and_terminate(&mut self.ctx, "incorrect comp ID received")
675680
.await;
676681
}
677682

@@ -691,7 +696,9 @@ where
691696
"we expected {expected} sequence number, but target sent lower ({actual}), terminating..."
692697
);
693698
let reason = format!("sequence number too low (actual {actual}, expected {expected})");
694-
self.logout_and_terminate(&reason).await;
699+
self.state
700+
.logout_and_terminate(&mut self.ctx, &reason)
701+
.await;
695702
self.state = SessionState::new_disconnected(false, &reason);
696703
}
697704

@@ -817,11 +824,7 @@ where
817824
&mut self,
818825
message: impl OutboundMessage,
819826
) -> Result<u64, InternalSendError> {
820-
let msg_type = message.message_type().to_string();
821-
let prepared = self.ctx.prepare_message(message).await?;
822-
self.state.send_message(&msg_type, prepared.raw).await;
823-
self.reset_heartbeat_timer();
824-
Ok(prepared.seq_num)
827+
self.state.send_message(&mut self.ctx, message).await
825828
}
826829

827830
async fn send_resend_request(
@@ -850,48 +853,6 @@ where
850853
Ok(())
851854
}
852855

853-
async fn send_logout(&mut self, reason: &str) -> Result<(), SessionOperationError> {
854-
let logout = Logout::with_reason(reason.to_string());
855-
self.send_message(logout)
856-
.await
857-
.with_send_context("logout")?;
858-
Ok(())
859-
}
860-
861-
/// Sends a logout message and immediately disconnects the counterparty.
862-
///
863-
/// This should be used sparingly in scenarios where there is a major issue
864-
/// requiring operational intervention, such as the sequence number being lower
865-
/// than expected, or some other key header field containing an invalid value.
866-
///
867-
/// In other scenarios, [`initiate_graceful_logout`] should be preferred.
868-
async fn logout_and_terminate(&mut self, reason: &str) {
869-
if let Err(err) = self.send_logout(reason).await {
870-
warn!("failed to send logout during session termination: {}", err);
871-
}
872-
self.state.disconnect_writer().await;
873-
}
874-
875-
/// Sends a logout message and puts the session state into an [`AwaitingLogout`] state.
876-
///
877-
/// The session waits for a configurable timeout period for the counterparty to
878-
/// respond with a `Logout` message. If no response is received within the timeout
879-
/// period, it disconnects the counterparty.
880-
async fn initiate_graceful_logout(
881-
&mut self,
882-
reason: &str,
883-
reconnect: bool,
884-
) -> Result<(), SessionOperationError> {
885-
if self.state.try_transition_to_awaiting_logout(
886-
Duration::from_secs(self.ctx.config.logout_timeout),
887-
reconnect,
888-
) {
889-
self.send_logout(reason).await?;
890-
}
891-
892-
Ok(())
893-
}
894-
895856
async fn handle_session_event(&mut self, event: SessionEvent) {
896857
self.handle_schedule_check().await;
897858

@@ -900,7 +861,9 @@ where
900861
if let Err(err) = self.on_incoming(fix_message).await {
901862
let reason = err.to_string();
902863
error!(reason, "fatal error in message processing");
903-
self.logout_and_terminate("internal error").await;
864+
self.state
865+
.logout_and_terminate(&mut self.ctx, "internal error")
866+
.await;
904867
self.state = SessionState::new_disconnected(true, &reason);
905868
}
906869
}
@@ -945,7 +908,8 @@ where
945908
AdminRequest::InitiateGracefulShutdown { reconnect } => {
946909
warn!("initiating shutdown on request from admin..");
947910
if let Err(err) = self
948-
.initiate_graceful_logout("explicitly requested", reconnect)
911+
.state
912+
.initiate_graceful_logout(&mut self.ctx, "explicitly requested", reconnect)
949913
.await
950914
{
951915
error!(err = ?err, "initiating graceful shutdown");
@@ -973,7 +937,9 @@ where
973937
async fn handle_peer_timeout(&mut self) {
974938
if self.state.is_expecting_test_response() {
975939
warn!("peer didn't respond, terminating..");
976-
self.logout_and_terminate("peer timeout").await;
940+
self.state
941+
.logout_and_terminate(&mut self.ctx, "peer timeout")
942+
.await;
977943
} else if self.state.is_awaiting_logon() {
978944
warn!("peer didn't respond to our Logon, disconnecting..");
979945
self.state.disconnect_writer().await;
@@ -1007,7 +973,9 @@ where
1007973
Ok(SessionPeriodComparison::DifferentPeriod) => {
1008974
// the message store is for a previous session,
1009975
// we need to terminate this session, reset the store, and reestablish the session
1010-
self.logout_and_terminate("session period changed").await;
976+
self.state
977+
.logout_and_terminate(&mut self.ctx, "session period changed")
978+
.await;
1011979
if let Err(err) = self.ctx.store.reset().await {
1012980
error!("error resetting session store: {err:}");
1013981
self.state =
@@ -1018,7 +986,8 @@ where
1018986
// the creation_time was recorded outside the session schedule,
1019987
// treat this similarly to a different period - reset the store
1020988
warn!("store creation time is outside session schedule, resetting store");
1021-
self.logout_and_terminate("creation time outside schedule")
989+
self.state
990+
.logout_and_terminate(&mut self.ctx, "creation time outside schedule")
1022991
.await;
1023992
if let Err(err) = self.ctx.store.reset().await {
1024993
error!("error resetting session store: {err:}");
@@ -1029,13 +998,16 @@ where
1029998
Err(err) => {
1030999
// actual schedule calculation error (e.g., DST transition, date overflow)
10311000
error!("error checking session period: {err:?}");
1032-
self.logout_and_terminate("internal error").await;
1001+
self.state
1002+
.logout_and_terminate(&mut self.ctx, "internal error")
1003+
.await;
10331004
}
10341005
}
10351006
} else if self.state.is_connected() {
10361007
// we are currently outside scheduled session time
10371008
if let Err(err) = self
1038-
.initiate_graceful_logout("End of session time", true)
1009+
.state
1010+
.initiate_graceful_logout(&mut self.ctx, "End of session time", true)
10391011
.await
10401012
{
10411013
error!(err = ?err, "failed to initiate graceful logout");

crates/hotfix/src/session/state.rs

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,20 @@ pub(crate) use awaiting_logout::AwaitingLogoutState;
1010
pub(crate) use awaiting_resend::{AwaitingResendState, AwaitingResendTransitionOutcome};
1111
pub(crate) use disconnected::DisconnectedState;
1212

13+
use crate::Application;
14+
use crate::message::OutboundMessage;
1315
use crate::message::logon::Logon;
1416
use crate::message::logout::Logout;
15-
use crate::message::parser::RawFixMessage;
17+
use crate::session::ctx::SessionCtx;
18+
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
1619
use crate::session::event::ScheduleResponse;
1720
use crate::session::info::Status as SessionInfoStatus;
1821
use crate::transport::writer::WriterRef;
22+
use hotfix_store::MessageStore;
1923
use std::time::Duration;
2024
use tokio::sync::oneshot;
2125
use tokio::time::Instant;
22-
use tracing::{debug, error};
26+
use tracing::{debug, error, warn};
2327

2428
const TEST_REQUEST_THRESHOLD: f64 = 1.2;
2529

@@ -64,41 +68,57 @@ impl SessionState {
6468
}
6569
}
6670

67-
pub async fn send_message(&mut self, message_type: &str, message: RawFixMessage) {
71+
pub async fn send_message<A, S>(
72+
&mut self,
73+
ctx: &mut SessionCtx<A, S>,
74+
message: impl OutboundMessage,
75+
) -> Result<u64, InternalSendError>
76+
where
77+
A: Application,
78+
S: MessageStore,
79+
{
80+
let message_type = message.message_type().to_string();
81+
let prepared = ctx.prepare_message(message).await?;
82+
let raw = prepared.raw;
83+
6884
match self {
6985
Self::Active(ActiveState { writer, .. })
7086
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => {
7187
if message_type == Logon::MSG_TYPE {
7288
error!("logon message is invalid for active sessions")
7389
} else {
74-
writer.send_raw_message(message).await
90+
writer.send_raw_message(raw).await
7591
}
7692
}
7793
Self::AwaitingLogon(AwaitingLogonState {
7894
writer, logon_sent, ..
79-
}) => match message_type {
95+
}) => match message_type.as_str() {
8096
Logon::MSG_TYPE => {
8197
if *logon_sent {
8298
error!("trying to send logon twice");
8399
} else {
84-
writer.send_raw_message(message).await;
100+
writer.send_raw_message(raw).await;
85101
*logon_sent = true;
86102
}
87103
}
88104
Logout::MSG_TYPE => {
89-
writer.send_raw_message(message).await;
105+
writer.send_raw_message(raw).await;
90106
}
91107
_ => error!("invalid outgoing message for AwaitingLogon state"),
92108
},
93109
Self::AwaitingLogout(AwaitingLogoutState { writer, .. }) => {
94110
// Logout messages are allowed because we first transition into AwaitingLogout
95111
// and only then send the logout message
96112
if message_type == Logout::MSG_TYPE {
97-
writer.send_raw_message(message).await
113+
writer.send_raw_message(raw).await
98114
}
99115
}
100116
_ => error!("trying to write without an established connection"),
101117
}
118+
119+
self.reset_heartbeat_timer(ctx.config.heartbeat_interval);
120+
121+
Ok(prepared.seq_num)
102122
}
103123

104124
pub async fn disconnect_writer(&self) {
@@ -167,6 +187,65 @@ impl SessionState {
167187
}
168188
}
169189

190+
/// Sends a logout message and puts the session state into an [`AwaitingLogout`] state.
191+
///
192+
/// The session waits for a configurable timeout period for the counterparty to
193+
/// respond with a `Logout` message. If no response is received within the timeout
194+
/// period, it disconnects the counterparty.
195+
pub async fn initiate_graceful_logout<A, S>(
196+
&mut self,
197+
ctx: &mut SessionCtx<A, S>,
198+
reason: &str,
199+
reconnect: bool,
200+
) -> Result<(), SessionOperationError>
201+
where
202+
A: Application,
203+
S: MessageStore,
204+
{
205+
if self.try_transition_to_awaiting_logout(
206+
Duration::from_secs(ctx.config.logout_timeout),
207+
reconnect,
208+
) {
209+
self.send_logout(ctx, reason).await?;
210+
}
211+
212+
Ok(())
213+
}
214+
215+
/// Sends a logout message and immediately disconnects the counterparty.
216+
///
217+
/// This should be used sparingly in scenarios where there is a major issue
218+
/// requiring operational intervention, such as the sequence number being lower
219+
/// than expected, or some other key header field containing an invalid value.
220+
///
221+
/// In other scenarios, [`initiate_graceful_logout`] should be preferred.
222+
pub async fn logout_and_terminate<A, S>(&mut self, ctx: &mut SessionCtx<A, S>, reason: &str)
223+
where
224+
A: Application,
225+
S: MessageStore,
226+
{
227+
if let Err(err) = self.send_logout(ctx, reason).await {
228+
warn!("failed to send logout during session termination: {}", err);
229+
}
230+
self.disconnect_writer().await;
231+
}
232+
233+
pub async fn send_logout<A, S>(
234+
&mut self,
235+
ctx: &mut SessionCtx<A, S>,
236+
reason: &str,
237+
) -> Result<(), SessionOperationError>
238+
where
239+
A: Application,
240+
S: MessageStore,
241+
{
242+
let logout = Logout::with_reason(reason.to_string());
243+
self.send_message(ctx, logout)
244+
.await
245+
.with_send_context("logout")?;
246+
Ok(())
247+
}
248+
170249
pub fn register_schedule_awaiter(&mut self, responder: oneshot::Sender<ScheduleResponse>) {
171250
match self {
172251
SessionState::Disconnected(state) => {

0 commit comments

Comments
 (0)