Skip to content

Commit 10f9f32

Browse files
authored
refactor: centralise message verification and extract shared inbound handlers (#325)
* Centralise verification call for inbound messages * Make session message handlers thin * Move verification flag logic to verification module
1 parent 3f97c0b commit 10f9f32

8 files changed

Lines changed: 325 additions & 324 deletions

File tree

crates/hotfix/src/message/verification.rs

Lines changed: 120 additions & 40 deletions
Large diffs are not rendered by default.

crates/hotfix/src/session.rs

Lines changed: 56 additions & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::message::reject::Reject;
3434
use crate::message::resend_request::ResendRequest;
3535
use crate::message::sequence_reset::SequenceReset;
3636
use crate::message::test_request::TestRequest;
37+
use crate::message::verification::VerificationFlags;
3738
use crate::session::admin_request::AdminRequest;
3839
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
3940
use crate::session::error::SessionCreationError;
@@ -53,10 +54,7 @@ use crate::store::MessageStore;
5354
use crate::transport::writer::WriterRef;
5455
use event::SessionEvent;
5556
use hotfix_message::parsed_message::{InvalidReason, ParsedMessage};
56-
use hotfix_message::session_fields::{
57-
BEGIN_SEQ_NO, END_SEQ_NO, GAP_FILL_FLAG, MSG_SEQ_NUM, MSG_TYPE, NEW_SEQ_NO,
58-
SessionRejectReason, TEST_REQ_ID,
59-
};
57+
use hotfix_message::session_fields::{MSG_SEQ_NUM, MSG_TYPE, SessionRejectReason, TEST_REQ_ID};
6058

6159
const SCHEDULE_CHECK_INTERVAL: u64 = 1;
6260

@@ -209,12 +207,24 @@ where
209207
}
210208
}
211209

212-
if let SessionState::AwaitingLogon(_) = &mut self.state {
213-
// TODO: should this (and all inbound message processing) logic be pushed into the state?
214-
if message_type != Logon::MSG_TYPE {
215-
self.state.disconnect_writer().await;
216-
return Ok(());
217-
}
210+
// TODO: add state-level pre-process check that validates whether the message type
211+
// is acceptable in the current state (e.g. AwaitingLogon rejects non-Logon,
212+
// unexpected Logon in Active should be rejected per FIX spec).
213+
if let SessionState::AwaitingLogon(_) = &mut self.state
214+
&& message_type != Logon::MSG_TYPE
215+
{
216+
self.state.disconnect_writer().await;
217+
return Ok(());
218+
}
219+
220+
let flags = VerificationFlags::for_message(&message)?;
221+
if let VerificationResult::Issue(result) = self
222+
.state
223+
.handle_verification_issue(&mut self.ctx, &message, flags)
224+
.await?
225+
{
226+
self.apply_transition(result);
227+
return Ok(());
218228
}
219229

220230
match message_type {
@@ -228,16 +238,16 @@ where
228238
self.on_resend_request(&message).await?;
229239
}
230240
Reject::MSG_TYPE => {
231-
self.on_reject(&message).await?;
241+
self.on_reject().await?;
232242
}
233243
SequenceReset::MSG_TYPE => {
234244
self.on_sequence_reset(&message).await?;
235245
}
236246
Logout::MSG_TYPE => {
237-
self.on_logout(&message).await?;
247+
self.on_logout().await?;
238248
}
239249
Logon::MSG_TYPE => {
240-
self.on_logon(&message).await?;
250+
self.on_logon().await?;
241251
}
242252
_ => self.process_app_message(&message).await?,
243253
}
@@ -249,39 +259,28 @@ where
249259
&mut self,
250260
message: &Message,
251261
) -> Result<(), SessionOperationError> {
252-
match self
253-
.state
254-
.handle_verification_issue(&mut self.ctx, message, true, true)
255-
.await?
256-
{
257-
VerificationResult::Issue(result) => {
258-
self.apply_transition(result);
259-
}
260-
VerificationResult::Passed => {
261-
match self.ctx.application.on_inbound_message(message).await {
262-
InboundDecision::Accept => {}
263-
InboundDecision::Reject { reason, text } => {
264-
let msg_type: &str = message
265-
.header()
266-
.get(MSG_TYPE)
267-
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
268-
let mut reject = BusinessReject::new(msg_type, reason)
269-
.ref_seq_num(get_msg_seq_num(message));
270-
if let Some(text) = text {
271-
reject = reject.text(&text);
272-
}
273-
self.send_message(reject)
274-
.await
275-
.with_send_context("business message reject")?;
276-
}
277-
InboundDecision::TerminateSession => {
278-
error!("failed to send inbound message to application");
279-
self.state.disconnect_writer().await;
280-
}
262+
match self.ctx.application.on_inbound_message(message).await {
263+
InboundDecision::Accept => {}
264+
InboundDecision::Reject { reason, text } => {
265+
let msg_type: &str = message
266+
.header()
267+
.get(MSG_TYPE)
268+
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
269+
let mut reject =
270+
BusinessReject::new(msg_type, reason).ref_seq_num(get_msg_seq_num(message));
271+
if let Some(text) = text {
272+
reject = reject.text(&text);
281273
}
282-
self.ctx.store.increment_target_seq_number().await?;
274+
self.send_message(reject)
275+
.await
276+
.with_send_context("business message reject")?;
277+
}
278+
InboundDecision::TerminateSession => {
279+
error!("failed to send inbound message to application");
280+
self.state.disconnect_writer().await;
283281
}
284282
}
283+
self.ctx.store.increment_target_seq_number().await?;
285284

286285
Ok(())
287286
}
@@ -357,42 +356,21 @@ where
357356
}
358357
}
359358

360-
async fn on_logon(&mut self, message: &Message) -> Result<(), SessionOperationError> {
359+
async fn on_logon(&mut self) -> Result<(), SessionOperationError> {
361360
if let SessionState::AwaitingLogon(AwaitingLogonState { writer, .. }) = &self.state {
362361
let writer = writer.clone();
363-
match self
364-
.state
365-
.handle_verification_issue(&mut self.ctx, message, true, true)
366-
.await?
367-
{
368-
VerificationResult::Issue(result) => {
369-
self.apply_transition(result);
370-
}
371-
VerificationResult::Passed => {
372-
// happy logon flow, the session is now active
373-
self.state =
374-
SessionState::new_active(writer, self.ctx.config.heartbeat_interval);
375-
self.ctx.application.on_logon().await;
376-
self.ctx.store.increment_target_seq_number().await?;
377-
}
378-
}
362+
// happy logon flow, the session is now active
363+
self.state = SessionState::new_active(writer, self.ctx.config.heartbeat_interval);
364+
self.ctx.application.on_logon().await;
365+
self.ctx.store.increment_target_seq_number().await?;
379366
} else {
380367
error!("received unexpected logon message");
381368
}
382369

383370
Ok(())
384371
}
385372

386-
async fn on_logout(&mut self, message: &Message) -> Result<(), SessionOperationError> {
387-
if let VerificationResult::Issue(result) = self
388-
.state
389-
.handle_verification_issue(&mut self.ctx, message, false, false)
390-
.await?
391-
{
392-
self.apply_transition(result);
393-
return Ok(());
394-
}
395-
373+
async fn on_logout(&mut self) -> Result<(), SessionOperationError> {
396374
if self.state.is_logged_on() {
397375
self.state
398376
.send_logout(&mut self.ctx, "Logout acknowledged")
@@ -424,15 +402,6 @@ where
424402
}
425403

426404
async fn on_heartbeat(&mut self, message: &Message) -> Result<(), SessionOperationError> {
427-
if let VerificationResult::Issue(result) = self
428-
.state
429-
.handle_verification_issue(&mut self.ctx, message, true, true)
430-
.await?
431-
{
432-
self.apply_transition(result);
433-
return Ok(());
434-
}
435-
436405
if let (Some(expected_req_id), Ok(message_req_id)) = (
437406
&self.state.expected_test_response_id(),
438407
message.get::<&str>(TEST_REQ_ID),
@@ -447,26 +416,10 @@ where
447416
}
448417

449418
async fn on_test_request(&mut self, message: &Message) -> Result<(), SessionOperationError> {
450-
if let VerificationResult::Issue(result) = self
451-
.state
452-
.handle_verification_issue(&mut self.ctx, message, true, true)
453-
.await?
454-
{
455-
self.apply_transition(result);
456-
return Ok(());
419+
if let Some(writer) = self.state.get_writer() {
420+
inbound::on_test_request(&mut self.ctx, writer, message).await?;
421+
self.reset_heartbeat_timer();
457422
}
458-
459-
let req_id: &str = message.get(TEST_REQ_ID).unwrap_or_else(|_| {
460-
// TODO: send reject?
461-
todo!()
462-
});
463-
464-
self.ctx.store.increment_target_seq_number().await?;
465-
466-
self.send_message(Heartbeat::for_request(req_id.to_string()))
467-
.await
468-
.with_send_context("heartbeat response")?;
469-
470423
Ok(())
471424
}
472425

@@ -476,19 +429,6 @@ where
476429
return Ok(());
477430
}
478431

479-
// Verify with check_too_high=false so ResendRequest is never blocked by seq-too-high.
480-
// This is the key part of the QFJ-673 deadlock fix: when both sides send ResendRequest
481-
// simultaneously, each side's ResendRequest will have a seq number higher than expected.
482-
// By not treating that as an error, we allow the ResendRequest to be processed.
483-
if let VerificationResult::Issue(result) = self
484-
.state
485-
.handle_verification_issue(&mut self.ctx, message, false, true)
486-
.await?
487-
{
488-
self.apply_transition(result);
489-
return Ok(());
490-
}
491-
492432
let msg_seq_num = get_msg_seq_num(message);
493433
let expected = self.ctx.store.next_target_seq_number();
494434

@@ -500,119 +440,25 @@ where
500440
state.inbound_queue.push_back(message.clone());
501441
}
502442

503-
let begin_seq_number: u64 = match message.get(BEGIN_SEQ_NO) {
504-
Ok(seq_number) => seq_number,
505-
Err(_) => {
506-
let reject = Reject::new(msg_seq_num)
507-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
508-
.text("missing begin sequence number for resend request");
509-
self.send_message(reject)
510-
.await
511-
.with_send_context("reject for missing BEGIN_SEQ_NO")?;
512-
return Ok(());
513-
}
514-
};
515-
516-
let end_seq_number: u64 = match message.get(END_SEQ_NO) {
517-
Ok(seq_number) => {
518-
let last_seq_number = self.ctx.store.next_sender_seq_number() - 1;
519-
if seq_number == 0 {
520-
last_seq_number
521-
} else {
522-
std::cmp::min(seq_number, last_seq_number)
523-
}
524-
}
525-
Err(_) => {
526-
let reject = Reject::new(msg_seq_num)
527-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
528-
.text("missing end sequence number for resend request");
529-
self.send_message(reject)
530-
.await
531-
.with_send_context("reject for missing END_SEQ_NO")?;
532-
return Ok(());
533-
}
534-
};
535-
536-
// Only increment target seq if seq matches expected
537-
if msg_seq_num == expected {
538-
self.ctx.store.increment_target_seq_number().await?;
539-
}
540-
541443
if let Some(writer) = self.state.get_writer() {
542-
outbound::resend_messages(&mut self.ctx, writer, begin_seq_number, end_seq_number)
543-
.await?;
444+
inbound::on_resend_request(&mut self.ctx, writer, message).await?;
544445
self.reset_heartbeat_timer();
545446
}
546447

547448
Ok(())
548449
}
549450

550451
/// Handle Reject messages.
551-
async fn on_reject(&mut self, message: &Message) -> Result<(), SessionOperationError> {
552-
if let VerificationResult::Issue(result) = self
553-
.state
554-
.handle_verification_issue(&mut self.ctx, message, false, true)
555-
.await?
556-
{
557-
self.apply_transition(result);
558-
return Ok(());
559-
}
560-
452+
async fn on_reject(&mut self) -> Result<(), SessionOperationError> {
561453
self.ctx.store.increment_target_seq_number().await?;
562454
Ok(())
563455
}
564456

565457
async fn on_sequence_reset(&mut self, message: &Message) -> Result<(), SessionOperationError> {
566-
let msg_seq_num = get_msg_seq_num(message);
567-
let is_gap_fill: bool = message.get(GAP_FILL_FLAG).unwrap_or(false);
568-
if let VerificationResult::Issue(result) = self
569-
.state
570-
.handle_verification_issue(&mut self.ctx, message, is_gap_fill, is_gap_fill)
571-
.await?
572-
{
573-
self.apply_transition(result);
574-
return Ok(());
575-
}
576-
577-
let end: u64 = match message.get(NEW_SEQ_NO) {
578-
Ok(new_seq_no) => new_seq_no,
579-
Err(err) => {
580-
error!(
581-
"received sequence reset message without new sequence number: {:?}",
582-
err
583-
);
584-
let reject = Reject::new(msg_seq_num)
585-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
586-
.text("missing NewSeqNo tag in sequence reset message");
587-
self.send_message(reject)
588-
.await
589-
.with_send_context("reject for missing NEW_SEQ_NO")?;
590-
591-
// note: we don't increment the target seq number here
592-
// this is an ambiguous case in the specification, but leaving the
593-
// sequence number as is feels the safest
594-
return Ok(());
595-
}
596-
};
597-
598-
// sequence resets cannot move the target seq number backwards
599-
// regardless of whether the message is a gap fill or not
600-
if end <= self.ctx.store.next_target_seq_number() {
601-
error!(
602-
"received sequence reset message which would move target seq number backwards: {end}",
603-
);
604-
let text =
605-
format!("attempt to lower sequence number, invalid value NewSeqNo(36)={end}");
606-
let reject = Reject::new(msg_seq_num)
607-
.session_reject_reason(SessionRejectReason::ValueIsIncorrect)
608-
.text(&text);
609-
self.send_message(reject)
610-
.await
611-
.with_send_context("reject for invalid sequence reset")?;
612-
return Ok(());
458+
if let Some(writer) = self.state.get_writer() {
459+
inbound::on_sequence_reset(&mut self.ctx, writer, message).await?;
460+
self.reset_heartbeat_timer();
613461
}
614-
615-
self.ctx.store.set_target_seq_number(end - 1).await?;
616462
Ok(())
617463
}
618464

0 commit comments

Comments
 (0)