Skip to content

Commit 9b24905

Browse files
committed
Convert resend_messages and send_sequence_reset to free functions
1 parent a12dfb0 commit 9b24905

3 files changed

Lines changed: 127 additions & 120 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 7 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod ctx;
33
pub mod error;
44
pub(crate) mod event;
55
mod info;
6+
mod outbound;
67
mod session_handle;
78
pub mod session_ref;
89
mod state;
@@ -15,14 +16,13 @@ use std::pin::Pin;
1516
use tokio::select;
1617
use tokio::sync::mpsc;
1718
use tokio::time::{Duration, Instant, Sleep, sleep, sleep_until};
18-
use tracing::{debug, enabled, error, info, warn};
19+
use tracing::{debug, error, info, warn};
1920

2021
use crate::Application;
2122
use crate::application::{InboundDecision, OutboundDecision};
2223
use crate::config::SessionConfig;
2324
use crate::message::OutboundMessage;
2425
use crate::message::business_reject::BusinessReject;
25-
use crate::message::generate_message;
2626
use crate::message::heartbeat::Heartbeat;
2727
use crate::message::logon::{Logon, ResetSeqNumConfig};
2828
use crate::message::logout::Logout;
@@ -33,7 +33,6 @@ use crate::message::sequence_reset::SequenceReset;
3333
use crate::message::test_request::TestRequest;
3434
use crate::message::verification::verify_message;
3535
use crate::message::verification_error::{CompIdType, MessageVerificationError};
36-
use crate::message::{is_admin, prepare_message_for_resend};
3736
use crate::session::admin_request::AdminRequest;
3837
use crate::session::ctx::SessionCtx;
3938
use crate::session::error::SessionCreationError;
@@ -526,8 +525,11 @@ where
526525
self.ctx.store.increment_target_seq_number().await?;
527526
}
528527

529-
self.resend_messages(begin_seq_number, end_seq_number, message)
530-
.await?;
528+
if let Some(writer) = self.state.get_writer() {
529+
outbound::resend_messages(&mut self.ctx, writer, begin_seq_number, end_seq_number)
530+
.await?;
531+
self.reset_heartbeat_timer();
532+
}
531533

532534
Ok(())
533535
}
@@ -779,90 +781,6 @@ where
779781
};
780782
}
781783

782-
async fn resend_messages(
783-
&mut self,
784-
begin: u64,
785-
end: u64,
786-
_message: &Message,
787-
) -> Result<(), SessionOperationError> {
788-
info!(begin, end, "resending messages as requested");
789-
let messages = self
790-
.ctx
791-
.store
792-
.get_slice(begin as usize, end as usize)
793-
.await?;
794-
795-
let no = messages.len();
796-
debug!(number_of_messages = no, "number of messages");
797-
798-
let mut reset_start: Option<u64> = None;
799-
let mut sequence_number = 0;
800-
801-
for msg in messages {
802-
let mut message = self
803-
.ctx
804-
.message_builder
805-
.build(msg.as_slice())
806-
.into_message()
807-
.ok_or_else(|| {
808-
SessionOperationError::StoredMessageParse(format!(
809-
"failed to build message for raw message: {msg:?}"
810-
))
811-
})?;
812-
sequence_number = get_msg_seq_num(&message);
813-
let message_type: String = message
814-
.header()
815-
.get::<&str>(MSG_TYPE)
816-
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?
817-
.to_string();
818-
819-
if is_admin(&message_type) {
820-
if reset_start.is_none() {
821-
reset_start = Some(sequence_number);
822-
}
823-
continue;
824-
}
825-
826-
if let Some(begin) = reset_start {
827-
let end = sequence_number;
828-
Self::log_skipped_admin_messages(begin, end);
829-
self.send_sequence_reset(begin, end).await?;
830-
reset_start = None;
831-
}
832-
833-
if let Err(e) = prepare_message_for_resend(&mut message) {
834-
error!(
835-
error = e,
836-
"failed to prepare message for resend, sending original"
837-
);
838-
}
839-
self.send_raw(&message_type, message.encode(&self.ctx.message_config)?)
840-
.await;
841-
842-
if enabled!(tracing::Level::DEBUG)
843-
&& let Ok(m) = String::from_utf8(msg.clone())
844-
{
845-
debug!(sequence_number, message = m, "resent message");
846-
}
847-
}
848-
849-
if let Some(begin) = reset_start {
850-
// the final reset if needed
851-
let end = sequence_number;
852-
Self::log_skipped_admin_messages(begin, end);
853-
self.send_sequence_reset(begin, end).await?;
854-
}
855-
856-
Ok(())
857-
}
858-
859-
fn log_skipped_admin_messages(begin: u64, end: u64) {
860-
info!(
861-
begin,
862-
end, "skipped admin message(s) during resend, requesting reset for these"
863-
);
864-
}
865-
866784
fn reset_heartbeat_timer(&mut self) {
867785
self.state
868786
.reset_heartbeat_timer(self.ctx.config.heartbeat_interval);
@@ -906,36 +824,6 @@ where
906824
Ok(prepared.seq_num)
907825
}
908826

909-
async fn send_raw(&mut self, message_type: &str, data: Vec<u8>) {
910-
self.state
911-
.send_message(message_type, RawFixMessage::new(data))
912-
.await;
913-
self.reset_heartbeat_timer();
914-
}
915-
916-
async fn send_sequence_reset(
917-
&mut self,
918-
begin: u64,
919-
end: u64,
920-
) -> Result<(), SessionOperationError> {
921-
let sequence_reset = SequenceReset {
922-
gap_fill: true,
923-
new_seq_no: end,
924-
};
925-
let raw_message = generate_message(
926-
&self.ctx.config.begin_string,
927-
&self.ctx.config.sender_comp_id,
928-
&self.ctx.config.target_comp_id,
929-
begin,
930-
sequence_reset,
931-
)?;
932-
933-
self.send_raw(SequenceReset::MSG_TYPE, raw_message).await;
934-
debug!(begin, end, "sent reset sequence");
935-
936-
Ok(())
937-
}
938-
939827
async fn send_resend_request(
940828
&mut self,
941829
begin: u64,
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use hotfix_message::Part;
2+
use hotfix_store::MessageStore;
3+
use tracing::{debug, enabled, error, info};
4+
5+
use crate::message::generate_message;
6+
use crate::message::parser::RawFixMessage;
7+
use crate::message::sequence_reset::SequenceReset;
8+
use crate::message::{is_admin, prepare_message_for_resend};
9+
use crate::session::ctx::SessionCtx;
10+
use crate::session::error::SessionOperationError;
11+
use crate::session::get_msg_seq_num;
12+
use crate::transport::writer::WriterRef;
13+
use hotfix_message::session_fields::MSG_TYPE;
14+
15+
pub async fn send_sequence_reset<A, S: MessageStore>(
16+
ctx: &mut SessionCtx<A, S>,
17+
writer: &WriterRef,
18+
begin: u64,
19+
end: u64,
20+
) -> Result<(), SessionOperationError> {
21+
let sequence_reset = SequenceReset {
22+
gap_fill: true,
23+
new_seq_no: end,
24+
};
25+
let raw_message = generate_message(
26+
&ctx.config.begin_string,
27+
&ctx.config.sender_comp_id,
28+
&ctx.config.target_comp_id,
29+
begin,
30+
sequence_reset,
31+
)?;
32+
33+
writer
34+
.send_raw_message(RawFixMessage::new(raw_message))
35+
.await;
36+
debug!(begin, end, "sent reset sequence");
37+
38+
Ok(())
39+
}
40+
41+
pub async fn resend_messages<A, S: MessageStore>(
42+
ctx: &mut SessionCtx<A, S>,
43+
writer: &WriterRef,
44+
begin: u64,
45+
end: u64,
46+
) -> Result<(), SessionOperationError> {
47+
info!(begin, end, "resending messages as requested");
48+
let messages = ctx.store.get_slice(begin as usize, end as usize).await?;
49+
50+
let no = messages.len();
51+
debug!(number_of_messages = no, "number of messages");
52+
53+
let mut reset_start: Option<u64> = None;
54+
let mut sequence_number = 0;
55+
56+
for msg in messages {
57+
let mut message = ctx
58+
.message_builder
59+
.build(msg.as_slice())
60+
.into_message()
61+
.ok_or_else(|| {
62+
SessionOperationError::StoredMessageParse(format!(
63+
"failed to build message for raw message: {msg:?}"
64+
))
65+
})?;
66+
sequence_number = get_msg_seq_num(&message);
67+
let message_type: String = message
68+
.header()
69+
.get::<&str>(MSG_TYPE)
70+
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?
71+
.to_string();
72+
73+
if is_admin(&message_type) {
74+
if reset_start.is_none() {
75+
reset_start = Some(sequence_number);
76+
}
77+
continue;
78+
}
79+
80+
if let Some(begin) = reset_start {
81+
let end = sequence_number;
82+
log_skipped_admin_messages(begin, end);
83+
send_sequence_reset(ctx, writer, begin, end).await?;
84+
reset_start = None;
85+
}
86+
87+
if let Err(e) = prepare_message_for_resend(&mut message) {
88+
error!(
89+
error = e,
90+
"failed to prepare message for resend, sending original"
91+
);
92+
}
93+
writer
94+
.send_raw_message(RawFixMessage::new(message.encode(&ctx.message_config)?))
95+
.await;
96+
97+
if enabled!(tracing::Level::DEBUG)
98+
&& let Ok(m) = String::from_utf8(msg.clone())
99+
{
100+
debug!(sequence_number, message = m, "resent message");
101+
}
102+
}
103+
104+
if let Some(begin) = reset_start {
105+
// the final reset if needed
106+
let end = sequence_number;
107+
log_skipped_admin_messages(begin, end);
108+
send_sequence_reset(ctx, writer, begin, end).await?;
109+
}
110+
111+
Ok(())
112+
}
113+
114+
fn log_skipped_admin_messages(begin: u64, end: u64) {
115+
info!(
116+
begin,
117+
end, "skipped admin message(s) during resend, requesting reset for these"
118+
);
119+
}

crates/hotfix/src/session/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl SessionState {
111111
}
112112
}
113113

114-
fn get_writer(&self) -> Option<&WriterRef> {
114+
pub(crate) fn get_writer(&self) -> Option<&WriterRef> {
115115
match self {
116116
Self::Active(ActiveState { writer, .. })
117117
| Self::AwaitingLogon(AwaitingLogonState { writer, .. })

0 commit comments

Comments
 (0)