Skip to content

Commit 8154494

Browse files
committed
Introduce pre_process_inbound function to delegate pre-processing decisions to state variants
1 parent 4caa702 commit 8154494

5 files changed

Lines changed: 70 additions & 22 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::message::sequence_reset::SequenceReset;
3636
use crate::message::test_request::TestRequest;
3737
use crate::message::verification::VerificationFlags;
3838
use crate::session::admin_request::AdminRequest;
39-
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
39+
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
4040
use crate::session::error::SessionCreationError;
4141
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
4242
pub use crate::session::error::{SendError, SendOutcome};
@@ -194,29 +194,20 @@ where
194194
}
195195

196196
async fn process_message(&mut self, message: Message) -> Result<(), SessionOperationError> {
197+
let message = match self.state.pre_process_inbound(message) {
198+
PreProcessDecision::Accept(msg) => msg,
199+
PreProcessDecision::Queued => return Ok(()),
200+
PreProcessDecision::Disconnect => {
201+
self.state.disconnect_writer().await;
202+
return Ok(());
203+
}
204+
};
205+
197206
let message_type: &str = message
198207
.header()
199208
.get(MSG_TYPE)
200209
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
201210

202-
if let SessionState::AwaitingResend(state) = &mut self.state {
203-
let seq_number = get_msg_seq_num(&message);
204-
if seq_number > state.end_seq_number && message_type != ResendRequest::MSG_TYPE {
205-
state.inbound_queue.push_back(message);
206-
return Ok(());
207-
}
208-
}
209-
210-
// TODO: add state-level pre-process check that validates whether the message type
211-
// is acceptable in the current state (e.g. AwaitingLogon rejects non-Logon,
212-
// unexpected Logon in Active should be rejected per FIX spec).
213-
if let SessionState::AwaitingLogon(_) = &mut self.state
214-
&& message_type != Logon::MSG_TYPE
215-
{
216-
self.state.disconnect_writer().await;
217-
return Ok(());
218-
}
219-
220211
let flags = VerificationFlags::for_message(&message)?;
221212
if let VerificationResult::Issue(result) = self
222213
.state

crates/hotfix/src/session/ctx.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,26 @@ use crate::message::parser::RawFixMessage;
99
use crate::session::error::InternalSendError;
1010
use crate::session::state::SessionState;
1111
use crate::store::StoreError;
12+
use hotfix_message::message::Message;
1213

1314
pub(crate) enum TransitionResult {
1415
Stay,
1516
TransitionTo(SessionState),
1617
}
1718

19+
/// The result of the pre-processing step for an inbound message.
20+
///
21+
/// Before verification and dispatch, the current state gets a chance to
22+
/// decide whether a message should be processed, queued, or rejected.
23+
pub(crate) enum PreProcessDecision {
24+
/// Continue processing this message through verification and dispatch.
25+
Accept(Message),
26+
/// The message has been queued for later processing (e.g. AwaitingResend backlog).
27+
Queued,
28+
/// The message is not acceptable in this state — disconnect the writer.
29+
Disconnect,
30+
}
31+
1832
/// The result of verifying an inbound message via a state variant's
1933
/// `handle_verification_issue` method.
2034
pub(crate) enum VerificationResult {

crates/hotfix/src/session/state.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::message::OutboundMessage;
1515
use crate::message::logon::Logon;
1616
use crate::message::logout::Logout;
1717
use crate::message::verification::VerificationFlags;
18-
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
18+
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
1919
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
2020
use crate::session::event::ScheduleResponse;
2121
use crate::session::info::Status as SessionInfoStatus;
@@ -63,6 +63,16 @@ impl SessionState {
6363
})
6464
}
6565

66+
/// Let the current state decide whether an inbound message should be processed,
67+
/// queued for later, or rejected before verification and dispatch.
68+
pub(crate) fn pre_process_inbound(&mut self, message: Message) -> PreProcessDecision {
69+
match self {
70+
Self::AwaitingResend(state) => state.pre_process_inbound(message),
71+
Self::AwaitingLogon(state) => state.pre_process_inbound(message),
72+
_ => PreProcessDecision::Accept(message),
73+
}
74+
}
75+
6676
pub fn should_reconnect(&self) -> bool {
6777
match self {
6878
SessionState::Disconnected(DisconnectedState { reconnect, .. }) => *reconnect,

crates/hotfix/src/session/state/awaiting_logon.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::Application;
2+
use crate::message::logon::Logon;
23
use crate::message::resend_request::ResendRequest;
34
use crate::message::verification::VerificationFlags;
4-
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
5+
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
56
use crate::session::error::{InternalSendResultExt, SessionOperationError};
67
use crate::session::inbound::{self, VerificationOutcome};
78
use crate::session::outbound;
89
use crate::session::state::{AwaitingResendState, SessionState};
910
use crate::transport::writer::WriterRef;
11+
use hotfix_message::Part;
1012
use hotfix_message::message::Message;
13+
use hotfix_message::session_fields::MSG_TYPE;
1114
use hotfix_store::MessageStore;
1215
use tokio::time::Instant;
1316
use tracing::debug;
@@ -22,6 +25,19 @@ pub(crate) struct AwaitingLogonState {
2225
}
2326

2427
impl AwaitingLogonState {
28+
pub(crate) fn pre_process_inbound(&self, message: Message) -> PreProcessDecision {
29+
let is_logon = message
30+
.header()
31+
.get::<&str>(MSG_TYPE)
32+
.is_ok_and(|t| t == Logon::MSG_TYPE);
33+
34+
if is_logon {
35+
PreProcessDecision::Accept(message)
36+
} else {
37+
PreProcessDecision::Disconnect
38+
}
39+
}
40+
2541
pub(crate) async fn handle_verification_issue<A: Application, S: MessageStore>(
2642
&self,
2743
ctx: &mut SessionCtx<A, S>,

crates/hotfix/src/session/state/awaiting_resend.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::Application;
22
use crate::message::resend_request::ResendRequest;
33
use crate::message::verification::VerificationFlags;
4-
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
4+
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
55
use crate::session::error::{InternalSendResultExt, SessionOperationError};
6+
use crate::session::get_msg_seq_num;
67
use crate::session::inbound::{self, VerificationOutcome};
78
use crate::session::outbound;
89
use crate::session::state::SessionState;
910
use crate::transport::writer::WriterRef;
11+
use hotfix_message::Part;
1012
use hotfix_message::message::Message;
13+
use hotfix_message::session_fields::MSG_TYPE;
1114
use hotfix_store::MessageStore;
1215
use std::collections::VecDeque;
1316
use tracing::debug;
@@ -39,6 +42,20 @@ impl AwaitingResendState {
3942
}
4043
}
4144

45+
pub(crate) fn pre_process_inbound(&mut self, message: Message) -> PreProcessDecision {
46+
let dominated_by_resend = message
47+
.header()
48+
.get::<&str>(MSG_TYPE)
49+
.is_ok_and(|t| t != ResendRequest::MSG_TYPE);
50+
51+
if get_msg_seq_num(&message) > self.end_seq_number && dominated_by_resend {
52+
self.inbound_queue.push_back(message);
53+
PreProcessDecision::Queued
54+
} else {
55+
PreProcessDecision::Accept(message)
56+
}
57+
}
58+
4259
pub(crate) async fn handle_verification_issue<A: Application, S: MessageStore>(
4360
&mut self,
4461
ctx: &mut SessionCtx<A, S>,

0 commit comments

Comments
 (0)