Skip to content

Commit a12dfb0

Browse files
committed
Add prepare_message to SessionCtx
1 parent d358f40 commit a12dfb0

2 files changed

Lines changed: 52 additions & 30 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -899,37 +899,11 @@ where
899899
&mut self,
900900
message: impl OutboundMessage,
901901
) -> Result<u64, InternalSendError> {
902-
let seq_num = self.ctx.store.next_sender_seq_number();
903902
let msg_type = message.message_type().to_string();
904-
let msg = generate_message(
905-
&self.ctx.config.begin_string,
906-
&self.ctx.config.sender_comp_id,
907-
&self.ctx.config.target_comp_id,
908-
seq_num,
909-
message,
910-
)
911-
.map_err(|e| {
912-
InternalSendError::Persist(crate::store::StoreError::PersistMessage {
913-
sequence_number: seq_num,
914-
source: e.into(),
915-
})
916-
})?;
917-
918-
self.ctx
919-
.store
920-
.increment_sender_seq_number()
921-
.await
922-
.map_err(InternalSendError::SequenceNumber)?;
923-
924-
self.ctx
925-
.store
926-
.add(seq_num, &msg)
927-
.await
928-
.map_err(InternalSendError::Persist)?;
929-
930-
self.send_raw(&msg_type, msg).await;
931-
932-
Ok(seq_num)
903+
let prepared = self.ctx.prepare_message(message).await?;
904+
self.state.send_message(&msg_type, prepared.raw).await;
905+
self.reset_heartbeat_timer();
906+
Ok(prepared.seq_num)
933907
}
934908

935909
async fn send_raw(&mut self, message_type: &str, data: Vec<u8>) {

crates/hotfix/src/session/ctx.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
use hotfix_message::MessageBuilder;
22
use hotfix_message::message::Config as MessageConfig;
3+
use hotfix_store::MessageStore;
34

45
use crate::config::SessionConfig;
6+
use crate::message::OutboundMessage;
7+
use crate::message::generate_message;
8+
use crate::message::parser::RawFixMessage;
9+
use crate::session::error::InternalSendError;
10+
use crate::store::StoreError;
511

612
pub(crate) struct SessionCtx<A, S> {
713
pub config: SessionConfig,
@@ -10,3 +16,45 @@ pub(crate) struct SessionCtx<A, S> {
1016
pub message_builder: MessageBuilder,
1117
pub message_config: MessageConfig,
1218
}
19+
20+
pub(crate) struct PreparedMessage {
21+
pub seq_num: u64,
22+
pub raw: RawFixMessage,
23+
}
24+
25+
impl<A, S: MessageStore> SessionCtx<A, S> {
26+
pub async fn prepare_message(
27+
&mut self,
28+
message: impl OutboundMessage,
29+
) -> Result<PreparedMessage, InternalSendError> {
30+
let seq_num = self.store.next_sender_seq_number();
31+
let msg = generate_message(
32+
&self.config.begin_string,
33+
&self.config.sender_comp_id,
34+
&self.config.target_comp_id,
35+
seq_num,
36+
message,
37+
)
38+
.map_err(|e| {
39+
InternalSendError::Persist(StoreError::PersistMessage {
40+
sequence_number: seq_num,
41+
source: e.into(),
42+
})
43+
})?;
44+
45+
self.store
46+
.increment_sender_seq_number()
47+
.await
48+
.map_err(InternalSendError::SequenceNumber)?;
49+
50+
self.store
51+
.add(seq_num, &msg)
52+
.await
53+
.map_err(InternalSendError::Persist)?;
54+
55+
Ok(PreparedMessage {
56+
seq_num,
57+
raw: RawFixMessage::new(msg),
58+
})
59+
}
60+
}

0 commit comments

Comments
 (0)