Skip to content

Commit 0a113d6

Browse files
authored
feat: support logout timeouts (#252)
* Add logout timeout to awaiting logout state * Add test case for logout initiated by counterparty * Support reconnect flag in logout state so it can be carried forward to disconnected state * Add test case for counterparty exceeding logout timeout * Implement new cleanup logic required to terminate sessions cleanly in session tests
1 parent 2364df9 commit 0a113d6

16 files changed

Lines changed: 202 additions & 56 deletions

File tree

crates/hotfix/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ fn default_logon_timeout() -> u64 {
5252
10
5353
}
5454

55+
fn default_logout_timeout() -> u64 {
56+
2
57+
}
58+
5559
/// The configuration of a single FIX session.
5660
#[derive(Clone, Debug, Deserialize)]
5761
pub struct SessionConfig {
@@ -87,6 +91,10 @@ pub struct SessionConfig {
8791
#[serde(default = "default_logon_timeout")]
8892
pub logon_timeout: u64,
8993

94+
/// The time we wait in seconds for Logon responses before timing out.
95+
#[serde(default = "default_logout_timeout")]
96+
pub logout_timeout: u64,
97+
9098
/// The interval we should attempt to reconnect at in seconds.
9199
#[serde(default = "default_reconnect_interval")]
92100
pub reconnect_interval: u64,

crates/hotfix/src/message/verification.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ mod tests {
191191
tls_config: None,
192192
heartbeat_interval: 0,
193193
logon_timeout: 0,
194+
logout_timeout: 0,
194195
reconnect_interval: 0,
195196
reset_on_logon: false,
196197
schedule: None,

crates/hotfix/src/session.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
293293
SessionState::Disconnected { .. } => {
294294
warn!("disconnect message was received, but the session is already disconnected")
295295
}
296-
SessionState::AwaitingLogout { .. } => {
297-
// this is unexpected because the other side should send a logout before disconnecting,
298-
// which would move this session out of the ShuttingDown state
299-
// TODO: is this actually true? need to review the spec carefully
300-
warn!("disconnect message was received, but the session is still shutting down")
296+
SessionState::AwaitingLogout { reconnect, .. } => {
297+
self.state = SessionState::new_disconnected(reconnect, &reason);
301298
}
302299
}
303300
}
@@ -782,8 +779,11 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
782779
self.state.disconnect_writer().await;
783780
}
784781

785-
async fn initiate_graceful_logout(&mut self, reason: &str) {
786-
if self.state.try_transition_to_awaiting_logout() {
782+
async fn initiate_graceful_logout(&mut self, reason: &str, reconnect: bool) {
783+
if self.state.try_transition_to_awaiting_logout(
784+
Duration::from_secs(self.config.logout_timeout),
785+
reconnect,
786+
) {
787787
self.send_logout(reason).await;
788788
}
789789
}
@@ -826,8 +826,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
826826
match request {
827827
AdminRequest::InitiateGracefulShutdown { reconnect } => {
828828
warn!("initiating shutdown on request from admin..");
829-
self.logout_and_terminate("shutdown requested").await;
830-
self.state = SessionState::new_disconnected(reconnect, "shutdown requested");
829+
self.initiate_graceful_logout("explicitly requested", reconnect)
830+
.await;
831831
}
832832
AdminRequest::RequestSessionInfo(responder) => {
833833
info!("session info requested");
@@ -853,6 +853,9 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
853853
} else if self.state.is_awaiting_logon() {
854854
warn!("peer didn't respond to our Logon, disconnecting..");
855855
self.state.disconnect_writer().await;
856+
} else if self.state.is_awaiting_logout() {
857+
warn!("peer didn't respond to our Logout, disconnecting..");
858+
self.state.disconnect_writer().await;
856859
} else {
857860
let req_id = format!("TEST_{}", self.store.next_target_seq_number());
858861
info!("sending TestRequest due to peer timer expiring");
@@ -892,7 +895,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
892895
}
893896
} else if self.state.is_connected() {
894897
// we are currently outside scheduled session time
895-
self.initiate_graceful_logout("End of session time").await;
898+
self.initiate_graceful_logout("End of session time", true)
899+
.await;
896900
}
897901

898902
// we always need to reschedule the check, otherwise we won't be able to resume an inactive session

crates/hotfix/src/session/state.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ pub enum SessionState {
2424
/// We are awaiting the target to resend the gap we have.
2525
AwaitingResend(AwaitingResendState),
2626
/// We are in the process of gracefully logging out
27-
AwaitingLogout { writer: WriterRef }, // we need the writer so we can disconnect it on successful logout
27+
AwaitingLogout {
28+
writer: WriterRef, // we need the writer so we can disconnect it on successful logout
29+
logout_timeout: Instant,
30+
reconnect: bool, // we carry this forward for the subsequent disconnected state
31+
},
2832
/// The session is active, we have connected and mutually logged on.
2933
Active(ActiveState),
3034
/// The TCP connection has been dropped.
@@ -87,7 +91,7 @@ impl SessionState {
8791
_ => error!("invalid outgoing message for AwaitingLogon state"),
8892
}
8993
}
90-
Self::AwaitingLogout { writer } => {
94+
Self::AwaitingLogout { writer, .. } => {
9195
// Logout messages are allowed because we first transition into AwaitingLogout
9296
// and only then send the logout message
9397
if message_type == b"5" {
@@ -102,7 +106,7 @@ impl SessionState {
102106
match self {
103107
Self::Active(ActiveState { writer, .. })
104108
| Self::AwaitingLogon { writer, .. }
105-
| Self::AwaitingLogout { writer }
109+
| Self::AwaitingLogout { writer, .. }
106110
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => writer.disconnect().await,
107111
_ => debug!("disconnecting an already disconnected session"),
108112
}
@@ -112,13 +116,17 @@ impl SessionState {
112116
match self {
113117
Self::Active(ActiveState { writer, .. })
114118
| Self::AwaitingLogon { writer, .. }
115-
| Self::AwaitingLogout { writer }
119+
| Self::AwaitingLogout { writer, .. }
116120
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => Some(writer),
117121
_ => None,
118122
}
119123
}
120124

121-
pub fn try_transition_to_awaiting_logout(&mut self) -> bool {
125+
pub fn try_transition_to_awaiting_logout(
126+
&mut self,
127+
logout_timeout: Duration,
128+
reconnect: bool,
129+
) -> bool {
122130
if matches!(self, SessionState::AwaitingLogout { .. }) {
123131
debug!("already in awaiting logout state");
124132
return false;
@@ -127,6 +135,8 @@ impl SessionState {
127135
if let Some(writer) = self.get_writer() {
128136
*self = SessionState::AwaitingLogout {
129137
writer: writer.clone(),
138+
logout_timeout: Instant::now() + logout_timeout,
139+
reconnect,
130140
};
131141
true
132142
} else {
@@ -220,6 +230,7 @@ impl SessionState {
220230
match self {
221231
Self::Active(ActiveState { peer_deadline, .. }) => Some(peer_deadline),
222232
Self::AwaitingLogon { logon_timeout, .. } => Some(logon_timeout),
233+
Self::AwaitingLogout { logout_timeout, .. } => Some(logout_timeout),
223234
_ => None,
224235
}
225236
}
@@ -268,6 +279,10 @@ impl SessionState {
268279
matches!(self, SessionState::AwaitingLogon { .. })
269280
}
270281

282+
pub fn is_awaiting_logout(&self) -> bool {
283+
matches!(self, SessionState::AwaitingLogout { .. })
284+
}
285+
271286
pub fn as_status(&self) -> SessionInfoStatus {
272287
match self {
273288
SessionState::AwaitingLogon { .. } => SessionInfoStatus::AwaitingLogon,
@@ -427,6 +442,8 @@ mod tests {
427442
fn test_awaiting_resend_transition_when_awaiting_logout_is_prevented() {
428443
let mut state = SessionState::AwaitingLogout {
429444
writer: create_writer_ref(),
445+
logout_timeout: Instant::now(),
446+
reconnect: false,
430447
};
431448

432449
let result = state.try_transition_to_awaiting_resend(1, 5);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use crate::common::assertions::{DEFAULT_TIMEOUT, assert_msg_type};
2+
use crate::common::fakes::{FakeCounterparty, SessionSpy};
3+
use crate::common::test_messages::TestMessage;
4+
use hotfix::message::logout::Logout;
5+
use hotfix_message::fix44::MsgType;
6+
7+
pub struct Finally<'a> {
8+
session: &'a SessionSpy,
9+
counterparty: &'a mut FakeCounterparty<TestMessage>,
10+
}
11+
12+
pub fn finally<'a>(
13+
session: &'a SessionSpy,
14+
counterparty: &'a mut FakeCounterparty<TestMessage>,
15+
) -> Finally<'a> {
16+
Finally {
17+
session,
18+
counterparty,
19+
}
20+
}
21+
22+
impl<'a> Finally<'a> {
23+
pub async fn disconnect(self) {
24+
// initiate disconnect from our side
25+
self.session.session_handle().shutdown(false).await.unwrap();
26+
27+
// counterparty receives our logout message
28+
self.counterparty
29+
.assert_next_with_timeout(|msg| assert_msg_type(msg, MsgType::Logout), DEFAULT_TIMEOUT)
30+
.await;
31+
32+
// counterparty responds with logout acknowledgement
33+
self.counterparty.send_message(Logout::default()).await;
34+
35+
// verify disconnection occurs
36+
self.counterparty
37+
.assert_disconnected_with_timeout(DEFAULT_TIMEOUT)
38+
.await;
39+
}
40+
}

crates/hotfix/tests/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod actions;
22
pub mod assertions;
3+
pub mod cleanup;
34
pub mod fakes;
45
pub mod setup;
56
pub mod test_messages;

crates/hotfix/tests/common/setup.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use hotfix_message::fix44::MSG_TYPE;
1111

1212
pub const HEARTBEAT_INTERVAL: u64 = 30;
1313
pub const LOGON_TIMEOUT: u64 = 10;
14+
pub const LOGOUT_TIMEOUT: u64 = 2;
1415

1516
pub const COUNTERPARTY_COMP_ID: &str = "dummy-acceptor";
1617
pub const OUR_COMP_ID: &str = "dummy-initiator";
@@ -58,6 +59,7 @@ pub fn create_session_config() -> SessionConfig {
5859
tls_config: None,
5960
heartbeat_interval: HEARTBEAT_INTERVAL,
6061
logon_timeout: LOGON_TIMEOUT,
62+
logout_timeout: LOGOUT_TIMEOUT,
6163
reconnect_interval: 30,
6264
reset_on_logon: false,
6365
schedule: None,

crates/hotfix/tests/session_test_cases/admin_request_tests.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::{assert_msg_type, then};
3+
use crate::common::cleanup::finally;
34
use crate::common::setup::given_an_active_session;
45
use hotfix::session::Status;
56
use hotfix_message::Part;
@@ -40,8 +41,7 @@ async fn test_reset_sequence_numbers_once() {
4041
.expect("reset request to succeed");
4142

4243
// the counterparty is disconnected
43-
when(&session).requests_disconnect().await;
44-
then(&mut counterparty).gets_disconnected().await;
44+
finally(&session, &mut counterparty).disconnect().await;
4545

4646
// a new connection is established to the counterparty
4747
when(&mut counterparty).gets_reconnected(true).await;
@@ -70,6 +70,5 @@ async fn test_reset_sequence_numbers_once() {
7070
"target sequence number should be 2 (after receiving logon)"
7171
);
7272

73-
when(&session).requests_disconnect().await;
74-
then(&mut counterparty).gets_disconnected().await;
73+
finally(&session, &mut counterparty).disconnect().await;
7574
}

crates/hotfix/tests/session_test_cases/business_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::then;
3+
use crate::common::cleanup::finally;
34
use crate::common::setup::given_an_active_session;
45
use crate::common::test_messages::TestMessage;
56
use hotfix::message::FixMessage;
@@ -27,6 +28,5 @@ async fn test_new_order_single() {
2728
.receives(|msg| assert_eq!(msg.message_type(), MsgType::ExecutionReport.to_string()))
2829
.await;
2930

30-
when(&session).requests_disconnect().await;
31-
then(&mut counterparty).gets_disconnected().await;
31+
finally(&session, &mut counterparty).disconnect().await;
3232
}

crates/hotfix/tests/session_test_cases/heartbeat_tests.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::common::actions::when;
22
use crate::common::assertions::{assert_msg_type, then};
3+
use crate::common::cleanup::finally;
34
use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session};
45
use hotfix::message::test_request::TestRequest;
56
use hotfix_message::Part;
@@ -27,8 +28,7 @@ async fn test_heartbeats() {
2728
.receives(|msg| assert_msg_type(msg, MsgType::Heartbeat))
2829
.await;
2930

30-
when(&session).requests_disconnect().await;
31-
then(&mut counterparty).gets_disconnected().await;
31+
finally(&session, &mut counterparty).disconnect().await;
3232
}
3333

3434
/// Tests the peer timeout and disconnection mechanism:
@@ -83,6 +83,5 @@ async fn test_heartbeat_in_response_to_test_request() {
8383
})
8484
.await;
8585

86-
when(&session).requests_disconnect().await;
87-
then(&mut counterparty).gets_disconnected().await;
86+
finally(&session, &mut counterparty).disconnect().await;
8887
}

0 commit comments

Comments
 (0)