Skip to content

Commit 63d7452

Browse files
committed
Make session message handlers thin
1 parent 06fec66 commit 63d7452

2 files changed

Lines changed: 133 additions & 94 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 9 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ use crate::transport::writer::WriterRef;
5454
use event::SessionEvent;
5555
use hotfix_message::parsed_message::{InvalidReason, ParsedMessage};
5656
use hotfix_message::session_fields::{
57-
BEGIN_SEQ_NO, END_SEQ_NO, GAP_FILL_FLAG, MSG_SEQ_NUM, MSG_TYPE, NEW_SEQ_NO,
58-
SessionRejectReason, TEST_REQ_ID,
57+
GAP_FILL_FLAG, MSG_SEQ_NUM, MSG_TYPE, SessionRejectReason, TEST_REQ_ID,
5958
};
6059

6160
const SCHEDULE_CHECK_INTERVAL: u64 = 1;
@@ -444,17 +443,10 @@ where
444443
}
445444

446445
async fn on_test_request(&mut self, message: &Message) -> Result<(), SessionOperationError> {
447-
let req_id: &str = message.get(TEST_REQ_ID).unwrap_or_else(|_| {
448-
// TODO: send reject?
449-
todo!()
450-
});
451-
452-
self.ctx.store.increment_target_seq_number().await?;
453-
454-
self.send_message(Heartbeat::for_request(req_id.to_string()))
455-
.await
456-
.with_send_context("heartbeat response")?;
457-
446+
if let Some(writer) = self.state.get_writer() {
447+
inbound::on_test_request(&mut self.ctx, writer, message).await?;
448+
self.reset_heartbeat_timer();
449+
}
458450
Ok(())
459451
}
460452

@@ -475,47 +467,8 @@ where
475467
state.inbound_queue.push_back(message.clone());
476468
}
477469

478-
let begin_seq_number: u64 = match message.get(BEGIN_SEQ_NO) {
479-
Ok(seq_number) => seq_number,
480-
Err(_) => {
481-
let reject = Reject::new(msg_seq_num)
482-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
483-
.text("missing begin sequence number for resend request");
484-
self.send_message(reject)
485-
.await
486-
.with_send_context("reject for missing BEGIN_SEQ_NO")?;
487-
return Ok(());
488-
}
489-
};
490-
491-
let end_seq_number: u64 = match message.get(END_SEQ_NO) {
492-
Ok(seq_number) => {
493-
let last_seq_number = self.ctx.store.next_sender_seq_number() - 1;
494-
if seq_number == 0 {
495-
last_seq_number
496-
} else {
497-
std::cmp::min(seq_number, last_seq_number)
498-
}
499-
}
500-
Err(_) => {
501-
let reject = Reject::new(msg_seq_num)
502-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
503-
.text("missing end sequence number for resend request");
504-
self.send_message(reject)
505-
.await
506-
.with_send_context("reject for missing END_SEQ_NO")?;
507-
return Ok(());
508-
}
509-
};
510-
511-
// Only increment target seq if seq matches expected
512-
if msg_seq_num == expected {
513-
self.ctx.store.increment_target_seq_number().await?;
514-
}
515-
516470
if let Some(writer) = self.state.get_writer() {
517-
outbound::resend_messages(&mut self.ctx, writer, begin_seq_number, end_seq_number)
518-
.await?;
471+
inbound::on_resend_request(&mut self.ctx, writer, message).await?;
519472
self.reset_heartbeat_timer();
520473
}
521474

@@ -529,47 +482,10 @@ where
529482
}
530483

531484
async fn on_sequence_reset(&mut self, message: &Message) -> Result<(), SessionOperationError> {
532-
let msg_seq_num = get_msg_seq_num(message);
533-
534-
let end: u64 = match message.get(NEW_SEQ_NO) {
535-
Ok(new_seq_no) => new_seq_no,
536-
Err(err) => {
537-
error!(
538-
"received sequence reset message without new sequence number: {:?}",
539-
err
540-
);
541-
let reject = Reject::new(msg_seq_num)
542-
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
543-
.text("missing NewSeqNo tag in sequence reset message");
544-
self.send_message(reject)
545-
.await
546-
.with_send_context("reject for missing NEW_SEQ_NO")?;
547-
548-
// note: we don't increment the target seq number here
549-
// this is an ambiguous case in the specification, but leaving the
550-
// sequence number as is feels the safest
551-
return Ok(());
552-
}
553-
};
554-
555-
// sequence resets cannot move the target seq number backwards
556-
// regardless of whether the message is a gap fill or not
557-
if end <= self.ctx.store.next_target_seq_number() {
558-
error!(
559-
"received sequence reset message which would move target seq number backwards: {end}",
560-
);
561-
let text =
562-
format!("attempt to lower sequence number, invalid value NewSeqNo(36)={end}");
563-
let reject = Reject::new(msg_seq_num)
564-
.session_reject_reason(SessionRejectReason::ValueIsIncorrect)
565-
.text(&text);
566-
self.send_message(reject)
567-
.await
568-
.with_send_context("reject for invalid sequence reset")?;
569-
return Ok(());
485+
if let Some(writer) = self.state.get_writer() {
486+
inbound::on_sequence_reset(&mut self.ctx, writer, message).await?;
487+
self.reset_heartbeat_timer();
570488
}
571-
572-
self.ctx.store.set_target_seq_number(end - 1).await?;
573489
Ok(())
574490
}
575491

crates/hotfix/src/session/inbound.rs

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
use crate::message::heartbeat::Heartbeat;
12
use crate::message::logout::Logout;
23
use crate::message::reject::Reject;
34
use crate::message::verification::verify_message;
45
use crate::message::verification_issue::{CompIdType, MessageError, VerificationIssue};
56
use crate::session::ctx::{SessionCtx, TransitionResult};
7+
use crate::session::error::{InternalSendResultExt, SessionOperationError};
8+
use crate::session::get_msg_seq_num;
69
use crate::session::outbound;
710
use crate::session::state::SessionState;
811
use crate::transport::writer::WriterRef;
912
use hotfix_message::Part;
1013
use hotfix_message::message::Message;
11-
use hotfix_message::session_fields::{MSG_SEQ_NUM, SessionRejectReason};
14+
use hotfix_message::session_fields::{
15+
BEGIN_SEQ_NO, END_SEQ_NO, MSG_SEQ_NUM, NEW_SEQ_NO, SessionRejectReason, TEST_REQ_ID,
16+
};
1217
use hotfix_store::MessageStore;
1318
use tracing::error;
1419
use tracing::warn;
@@ -256,6 +261,124 @@ async fn handle_verification_error<A, S: MessageStore>(
256261
}
257262
}
258263

264+
pub(crate) async fn on_test_request<A, S: MessageStore>(
265+
ctx: &mut SessionCtx<A, S>,
266+
writer: &WriterRef,
267+
message: &Message,
268+
) -> Result<(), SessionOperationError> {
269+
let req_id: &str = message.get(TEST_REQ_ID).unwrap_or_else(|_| {
270+
// TODO: send reject?
271+
todo!()
272+
});
273+
274+
ctx.store.increment_target_seq_number().await?;
275+
276+
outbound::send_message(ctx, writer, Heartbeat::for_request(req_id.to_string()))
277+
.await
278+
.with_send_context("heartbeat response")?;
279+
280+
Ok(())
281+
}
282+
283+
pub(crate) async fn on_sequence_reset<A, S: MessageStore>(
284+
ctx: &mut SessionCtx<A, S>,
285+
writer: &WriterRef,
286+
message: &Message,
287+
) -> Result<(), SessionOperationError> {
288+
let msg_seq_num = get_msg_seq_num(message);
289+
290+
let end: u64 = match message.get(NEW_SEQ_NO) {
291+
Ok(new_seq_no) => new_seq_no,
292+
Err(err) => {
293+
error!(
294+
"received sequence reset message without new sequence number: {:?}",
295+
err
296+
);
297+
let reject = Reject::new(msg_seq_num)
298+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
299+
.text("missing NewSeqNo tag in sequence reset message");
300+
outbound::send_message(ctx, writer, reject)
301+
.await
302+
.with_send_context("reject for missing NEW_SEQ_NO")?;
303+
304+
// note: we don't increment the target seq number here
305+
// this is an ambiguous case in the specification, but leaving the
306+
// sequence number as is feels the safest
307+
return Ok(());
308+
}
309+
};
310+
311+
// sequence resets cannot move the target seq number backwards
312+
// regardless of whether the message is a gap fill or not
313+
if end <= ctx.store.next_target_seq_number() {
314+
error!(
315+
"received sequence reset message which would move target seq number backwards: {end}",
316+
);
317+
let text = format!("attempt to lower sequence number, invalid value NewSeqNo(36)={end}");
318+
let reject = Reject::new(msg_seq_num)
319+
.session_reject_reason(SessionRejectReason::ValueIsIncorrect)
320+
.text(&text);
321+
outbound::send_message(ctx, writer, reject)
322+
.await
323+
.with_send_context("reject for invalid sequence reset")?;
324+
return Ok(());
325+
}
326+
327+
ctx.store.set_target_seq_number(end - 1).await?;
328+
Ok(())
329+
}
330+
331+
pub(crate) async fn on_resend_request<A, S: MessageStore>(
332+
ctx: &mut SessionCtx<A, S>,
333+
writer: &WriterRef,
334+
message: &Message,
335+
) -> Result<(), SessionOperationError> {
336+
let msg_seq_num = get_msg_seq_num(message);
337+
let expected = ctx.store.next_target_seq_number();
338+
339+
let begin_seq_number: u64 = match message.get(BEGIN_SEQ_NO) {
340+
Ok(seq_number) => seq_number,
341+
Err(_) => {
342+
let reject = Reject::new(msg_seq_num)
343+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
344+
.text("missing begin sequence number for resend request");
345+
outbound::send_message(ctx, writer, reject)
346+
.await
347+
.with_send_context("reject for missing BEGIN_SEQ_NO")?;
348+
return Ok(());
349+
}
350+
};
351+
352+
let end_seq_number: u64 = match message.get(END_SEQ_NO) {
353+
Ok(seq_number) => {
354+
let last_seq_number = ctx.store.next_sender_seq_number() - 1;
355+
if seq_number == 0 {
356+
last_seq_number
357+
} else {
358+
std::cmp::min(seq_number, last_seq_number)
359+
}
360+
}
361+
Err(_) => {
362+
let reject = Reject::new(msg_seq_num)
363+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
364+
.text("missing end sequence number for resend request");
365+
outbound::send_message(ctx, writer, reject)
366+
.await
367+
.with_send_context("reject for missing END_SEQ_NO")?;
368+
return Ok(());
369+
}
370+
};
371+
372+
// Only increment target seq if seq matches expected
373+
if msg_seq_num == expected {
374+
ctx.store.increment_target_seq_number().await?;
375+
}
376+
377+
outbound::resend_messages(ctx, writer, begin_seq_number, end_seq_number).await?;
378+
379+
Ok(())
380+
}
381+
259382
#[cfg(test)]
260383
mod tests {
261384
use super::*;

0 commit comments

Comments
 (0)