Skip to content

Commit e10124b

Browse files
authored
refactor: convert resend logic to free functions outside the session code (#322)
* Add prepare_message to SessionCtx * Convert resend_messages and send_sequence_reset to free functions * Add test case for resending garbled messages
1 parent d358f40 commit e10124b

4 files changed

Lines changed: 271 additions & 149 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 10 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod ctx;
33
pub mod error;
44
pub(crate) mod event;
55
mod info;
6+
mod outbound;
67
mod session_handle;
78
pub mod session_ref;
89
mod state;
@@ -15,14 +16,13 @@ use std::pin::Pin;
1516
use tokio::select;
1617
use tokio::sync::mpsc;
1718
use tokio::time::{Duration, Instant, Sleep, sleep, sleep_until};
18-
use tracing::{debug, enabled, error, info, warn};
19+
use tracing::{debug, error, info, warn};
1920

2021
use crate::Application;
2122
use crate::application::{InboundDecision, OutboundDecision};
2223
use crate::config::SessionConfig;
2324
use crate::message::OutboundMessage;
2425
use crate::message::business_reject::BusinessReject;
25-
use crate::message::generate_message;
2626
use crate::message::heartbeat::Heartbeat;
2727
use crate::message::logon::{Logon, ResetSeqNumConfig};
2828
use crate::message::logout::Logout;
@@ -33,7 +33,6 @@ use crate::message::sequence_reset::SequenceReset;
3333
use crate::message::test_request::TestRequest;
3434
use crate::message::verification::verify_message;
3535
use crate::message::verification_error::{CompIdType, MessageVerificationError};
36-
use crate::message::{is_admin, prepare_message_for_resend};
3736
use crate::session::admin_request::AdminRequest;
3837
use crate::session::ctx::SessionCtx;
3938
use crate::session::error::SessionCreationError;
@@ -526,8 +525,11 @@ where
526525
self.ctx.store.increment_target_seq_number().await?;
527526
}
528527

529-
self.resend_messages(begin_seq_number, end_seq_number, message)
530-
.await?;
528+
if let Some(writer) = self.state.get_writer() {
529+
outbound::resend_messages(&mut self.ctx, writer, begin_seq_number, end_seq_number)
530+
.await?;
531+
self.reset_heartbeat_timer();
532+
}
531533

532534
Ok(())
533535
}
@@ -779,90 +781,6 @@ where
779781
};
780782
}
781783

782-
async fn resend_messages(
783-
&mut self,
784-
begin: u64,
785-
end: u64,
786-
_message: &Message,
787-
) -> Result<(), SessionOperationError> {
788-
info!(begin, end, "resending messages as requested");
789-
let messages = self
790-
.ctx
791-
.store
792-
.get_slice(begin as usize, end as usize)
793-
.await?;
794-
795-
let no = messages.len();
796-
debug!(number_of_messages = no, "number of messages");
797-
798-
let mut reset_start: Option<u64> = None;
799-
let mut sequence_number = 0;
800-
801-
for msg in messages {
802-
let mut message = self
803-
.ctx
804-
.message_builder
805-
.build(msg.as_slice())
806-
.into_message()
807-
.ok_or_else(|| {
808-
SessionOperationError::StoredMessageParse(format!(
809-
"failed to build message for raw message: {msg:?}"
810-
))
811-
})?;
812-
sequence_number = get_msg_seq_num(&message);
813-
let message_type: String = message
814-
.header()
815-
.get::<&str>(MSG_TYPE)
816-
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?
817-
.to_string();
818-
819-
if is_admin(&message_type) {
820-
if reset_start.is_none() {
821-
reset_start = Some(sequence_number);
822-
}
823-
continue;
824-
}
825-
826-
if let Some(begin) = reset_start {
827-
let end = sequence_number;
828-
Self::log_skipped_admin_messages(begin, end);
829-
self.send_sequence_reset(begin, end).await?;
830-
reset_start = None;
831-
}
832-
833-
if let Err(e) = prepare_message_for_resend(&mut message) {
834-
error!(
835-
error = e,
836-
"failed to prepare message for resend, sending original"
837-
);
838-
}
839-
self.send_raw(&message_type, message.encode(&self.ctx.message_config)?)
840-
.await;
841-
842-
if enabled!(tracing::Level::DEBUG)
843-
&& let Ok(m) = String::from_utf8(msg.clone())
844-
{
845-
debug!(sequence_number, message = m, "resent message");
846-
}
847-
}
848-
849-
if let Some(begin) = reset_start {
850-
// the final reset if needed
851-
let end = sequence_number;
852-
Self::log_skipped_admin_messages(begin, end);
853-
self.send_sequence_reset(begin, end).await?;
854-
}
855-
856-
Ok(())
857-
}
858-
859-
fn log_skipped_admin_messages(begin: u64, end: u64) {
860-
info!(
861-
begin,
862-
end, "skipped admin message(s) during resend, requesting reset for these"
863-
);
864-
}
865-
866784
fn reset_heartbeat_timer(&mut self) {
867785
self.state
868786
.reset_heartbeat_timer(self.ctx.config.heartbeat_interval);
@@ -899,67 +817,11 @@ where
899817
&mut self,
900818
message: impl OutboundMessage,
901819
) -> Result<u64, InternalSendError> {
902-
let seq_num = self.ctx.store.next_sender_seq_number();
903820
let msg_type = message.message_type().to_string();
904-
let msg = generate_message(
905-
&self.ctx.config.begin_string,
906-
&self.ctx.config.sender_comp_id,
907-
&self.ctx.config.target_comp_id,
908-
seq_num,
909-
message,
910-
)
911-
.map_err(|e| {
912-
InternalSendError::Persist(crate::store::StoreError::PersistMessage {
913-
sequence_number: seq_num,
914-
source: e.into(),
915-
})
916-
})?;
917-
918-
self.ctx
919-
.store
920-
.increment_sender_seq_number()
921-
.await
922-
.map_err(InternalSendError::SequenceNumber)?;
923-
924-
self.ctx
925-
.store
926-
.add(seq_num, &msg)
927-
.await
928-
.map_err(InternalSendError::Persist)?;
929-
930-
self.send_raw(&msg_type, msg).await;
931-
932-
Ok(seq_num)
933-
}
934-
935-
async fn send_raw(&mut self, message_type: &str, data: Vec<u8>) {
936-
self.state
937-
.send_message(message_type, RawFixMessage::new(data))
938-
.await;
821+
let prepared = self.ctx.prepare_message(message).await?;
822+
self.state.send_message(&msg_type, prepared.raw).await;
939823
self.reset_heartbeat_timer();
940-
}
941-
942-
async fn send_sequence_reset(
943-
&mut self,
944-
begin: u64,
945-
end: u64,
946-
) -> Result<(), SessionOperationError> {
947-
let sequence_reset = SequenceReset {
948-
gap_fill: true,
949-
new_seq_no: end,
950-
};
951-
let raw_message = generate_message(
952-
&self.ctx.config.begin_string,
953-
&self.ctx.config.sender_comp_id,
954-
&self.ctx.config.target_comp_id,
955-
begin,
956-
sequence_reset,
957-
)?;
958-
959-
self.send_raw(SequenceReset::MSG_TYPE, raw_message).await;
960-
debug!(begin, end, "sent reset sequence");
961-
962-
Ok(())
824+
Ok(prepared.seq_num)
963825
}
964826

965827
async fn send_resend_request(

crates/hotfix/src/session/ctx.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
use hotfix_message::MessageBuilder;
22
use hotfix_message::message::Config as MessageConfig;
3+
use hotfix_store::MessageStore;
34

45
use crate::config::SessionConfig;
6+
use crate::message::OutboundMessage;
7+
use crate::message::generate_message;
8+
use crate::message::parser::RawFixMessage;
9+
use crate::session::error::InternalSendError;
10+
use crate::store::StoreError;
511

612
pub(crate) struct SessionCtx<A, S> {
713
pub config: SessionConfig,
@@ -10,3 +16,45 @@ pub(crate) struct SessionCtx<A, S> {
1016
pub message_builder: MessageBuilder,
1117
pub message_config: MessageConfig,
1218
}
19+
20+
pub(crate) struct PreparedMessage {
21+
pub seq_num: u64,
22+
pub raw: RawFixMessage,
23+
}
24+
25+
impl<A, S: MessageStore> SessionCtx<A, S> {
26+
pub async fn prepare_message(
27+
&mut self,
28+
message: impl OutboundMessage,
29+
) -> Result<PreparedMessage, InternalSendError> {
30+
let seq_num = self.store.next_sender_seq_number();
31+
let msg = generate_message(
32+
&self.config.begin_string,
33+
&self.config.sender_comp_id,
34+
&self.config.target_comp_id,
35+
seq_num,
36+
message,
37+
)
38+
.map_err(|e| {
39+
InternalSendError::Persist(StoreError::PersistMessage {
40+
sequence_number: seq_num,
41+
source: e.into(),
42+
})
43+
})?;
44+
45+
self.store
46+
.increment_sender_seq_number()
47+
.await
48+
.map_err(InternalSendError::SequenceNumber)?;
49+
50+
self.store
51+
.add(seq_num, &msg)
52+
.await
53+
.map_err(InternalSendError::Persist)?;
54+
55+
Ok(PreparedMessage {
56+
seq_num,
57+
raw: RawFixMessage::new(msg),
58+
})
59+
}
60+
}

0 commit comments

Comments
 (0)