Skip to content

Commit c493e5a

Browse files
committed
Move handle_invalid_msg_type to inbound module
1 parent 930ca0b commit c493e5a

2 files changed

Lines changed: 63 additions & 37 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,15 @@ where
161161
warn!("received invalid component");
162162
}
163163
InvalidReason::InvalidMsgType(msg_type) => {
164-
self.handle_invalid_msg_type(message, &msg_type).await;
164+
if let Some(writer) = self.state.get_writer() {
165+
inbound::handle_invalid_msg_type(
166+
&mut self.ctx,
167+
writer,
168+
&message,
169+
&msg_type,
170+
)
171+
.await;
172+
}
165173
}
166174
InvalidReason::InvalidOrderInGroup { tag, .. } => {
167175
match message.header().get(MSG_SEQ_NUM) {
@@ -609,32 +617,46 @@ where
609617
MessageVerificationError::SendingTimeAccuracyIssue { msg_seq_num } => {
610618
if let Some(writer) = self.state.get_writer() {
611619
inbound::handle_sending_time_accuracy_problem(
612-
&mut self.ctx, writer, msg_seq_num, "unexpected sending time",
613-
).await;
620+
&mut self.ctx,
621+
writer,
622+
msg_seq_num,
623+
"unexpected sending time",
624+
)
625+
.await;
614626
}
615627
}
616628
MessageVerificationError::SendingTimeMissing { msg_seq_num } => {
617629
if let Some(writer) = self.state.get_writer() {
618630
inbound::handle_sending_time_accuracy_problem(
619-
&mut self.ctx, writer, msg_seq_num, "sending time missing",
620-
).await;
631+
&mut self.ctx,
632+
writer,
633+
msg_seq_num,
634+
"sending time missing",
635+
)
636+
.await;
621637
}
622638
}
623639
MessageVerificationError::OriginalSendingTimeMissing { msg_seq_num } => {
624640
if let Some(writer) = self.state.get_writer() {
625641
inbound::handle_original_sending_time_missing(
626-
&mut self.ctx, writer, msg_seq_num,
627-
).await;
642+
&mut self.ctx,
643+
writer,
644+
msg_seq_num,
645+
)
646+
.await;
628647
}
629648
}
630649
MessageVerificationError::OriginalSendingTimeAfterSendingTime {
631650
msg_seq_num, ..
632651
} => {
633652
if let Some(writer) = self.state.get_writer() {
634653
inbound::handle_sending_time_accuracy_problem(
635-
&mut self.ctx, writer, msg_seq_num,
654+
&mut self.ctx,
655+
writer,
656+
msg_seq_num,
636657
"original sending time is after sending time",
637-
).await;
658+
)
659+
.await;
638660
}
639661
}
640662
}
@@ -731,33 +753,6 @@ where
731753
Ok(())
732754
}
733755

734-
async fn handle_invalid_msg_type(&mut self, message: Message, msg_type: &str) {
735-
match message.header().get(MSG_SEQ_NUM) {
736-
Ok(msg_seq_num) => {
737-
let reject = Reject::new(msg_seq_num)
738-
.session_reject_reason(SessionRejectReason::InvalidMsgtype)
739-
.text(&format!("invalid message type {msg_type}"));
740-
if let Err(err) = self.send_message(reject).await {
741-
error!("failed to send reject message for invalid msgtype: {err}");
742-
};
743-
744-
#[allow(clippy::collapsible_if)]
745-
if let Ok(seq_num) = message.header().get::<u64>(MSG_SEQ_NUM)
746-
&& self.ctx.store.next_target_seq_number() == seq_num
747-
{
748-
if let Err(err) = self.ctx.store.increment_target_seq_number().await {
749-
error!("failed to increment target seq number: {:?}", err);
750-
};
751-
}
752-
}
753-
Err(err) => {
754-
error!("failed to get message seq num: {:?}", err);
755-
}
756-
}
757-
}
758-
759-
760-
761756
fn reset_heartbeat_timer(&mut self) {
762757
self.state
763758
.reset_heartbeat_timer(self.ctx.config.heartbeat_interval);

crates/hotfix/src/session/inbound.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use crate::message::verification_error::MessageVerificationError;
44
use crate::session::ctx::SessionCtx;
55
use crate::session::outbound;
66
use crate::transport::writer::WriterRef;
7+
use hotfix_message::Part;
78
use hotfix_message::message::Message;
8-
use hotfix_message::session_fields::SessionRejectReason;
9+
use hotfix_message::session_fields::{MSG_SEQ_NUM, SessionRejectReason};
910
use hotfix_store::MessageStore;
1011
use tracing::error;
1112

@@ -46,6 +47,36 @@ pub(crate) async fn handle_sending_time_accuracy_problem<A, S: MessageStore>(
4647
}
4748
}
4849

50+
pub(crate) async fn handle_invalid_msg_type<A, S: MessageStore>(
51+
ctx: &mut SessionCtx<A, S>,
52+
writer: &WriterRef,
53+
message: &Message,
54+
msg_type: &str,
55+
) {
56+
match message.header().get(MSG_SEQ_NUM) {
57+
Ok(msg_seq_num) => {
58+
let reject = Reject::new(msg_seq_num)
59+
.session_reject_reason(SessionRejectReason::InvalidMsgtype)
60+
.text(&format!("invalid message type {msg_type}"));
61+
if let Err(err) = outbound::send_message(ctx, writer, reject).await {
62+
error!("failed to send reject message for invalid msgtype: {err}");
63+
}
64+
65+
#[allow(clippy::collapsible_if)]
66+
if let Ok(seq_num) = message.header().get::<u64>(MSG_SEQ_NUM)
67+
&& ctx.store.next_target_seq_number() == seq_num
68+
{
69+
if let Err(err) = ctx.store.increment_target_seq_number().await {
70+
error!("failed to increment target seq number: {:?}", err);
71+
}
72+
}
73+
}
74+
Err(err) => {
75+
error!("failed to get message seq num: {:?}", err);
76+
}
77+
}
78+
}
79+
4980
pub(crate) async fn handle_original_sending_time_missing<A, S: MessageStore>(
5081
ctx: &mut SessionCtx<A, S>,
5182
writer: &WriterRef,

0 commit comments

Comments
 (0)