Skip to content

Commit e1777c7

Browse files
committed
Add test case for resending garbled messages
1 parent 9b24905 commit e1777c7

File tree

1 file changed

+95
-0
lines changed

1 file changed

+95
-0
lines changed

crates/hotfix/src/session/outbound.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,13 @@ pub async fn resend_messages<A, S: MessageStore>(
9494
.send_raw_message(RawFixMessage::new(message.encode(&ctx.message_config)?))
9595
.await;
9696

97+
// LCOV_EXCL_START
9798
if enabled!(tracing::Level::DEBUG)
9899
&& let Ok(m) = String::from_utf8(msg.clone())
99100
{
100101
debug!(sequence_number, message = m, "resent message");
101102
}
103+
// LCOV_EXCL_STOP
102104
}
103105

104106
if let Some(begin) = reset_start {
@@ -117,3 +119,96 @@ fn log_skipped_admin_messages(begin: u64, end: u64) {
117119
end, "skipped admin message(s) during resend, requesting reset for these"
118120
);
119121
}
122+
123+
#[cfg(test)]
124+
mod tests {
125+
use super::*;
126+
use crate::config::SessionConfig;
127+
use crate::session::ctx::SessionCtx;
128+
use crate::store::Result as StoreResult;
129+
use chrono::{DateTime, Utc};
130+
use hotfix_message::MessageBuilder;
131+
use hotfix_message::dict::Dictionary;
132+
use hotfix_message::message::Config as MessageConfig;
133+
use tokio::sync::mpsc;
134+
135+
#[derive(Clone)]
136+
struct GarbledMessageStore {
137+
messages: Vec<Vec<u8>>,
138+
}
139+
140+
#[async_trait::async_trait]
141+
impl MessageStore for GarbledMessageStore {
142+
async fn add(&mut self, _: u64, _: &[u8]) -> StoreResult<()> {
143+
Ok(())
144+
}
145+
async fn get_slice(&self, _: usize, _: usize) -> StoreResult<Vec<Vec<u8>>> {
146+
Ok(self.messages.clone())
147+
}
148+
fn next_sender_seq_number(&self) -> u64 {
149+
1
150+
}
151+
fn next_target_seq_number(&self) -> u64 {
152+
1
153+
}
154+
async fn increment_sender_seq_number(&mut self) -> StoreResult<()> {
155+
Ok(())
156+
}
157+
async fn increment_target_seq_number(&mut self) -> StoreResult<()> {
158+
Ok(())
159+
}
160+
async fn set_target_seq_number(&mut self, _: u64) -> StoreResult<()> {
161+
Ok(())
162+
}
163+
async fn reset(&mut self) -> StoreResult<()> {
164+
Ok(())
165+
}
166+
fn creation_time(&self) -> DateTime<Utc> {
167+
Utc::now()
168+
}
169+
}
170+
171+
fn create_test_ctx(store: GarbledMessageStore) -> SessionCtx<(), GarbledMessageStore> {
172+
let message_config = MessageConfig::default();
173+
let dictionary = Dictionary::fix44();
174+
let message_builder = MessageBuilder::new(dictionary, message_config).unwrap();
175+
SessionCtx {
176+
config: SessionConfig {
177+
begin_string: "FIX.4.4".to_string(),
178+
sender_comp_id: "SENDER".to_string(),
179+
target_comp_id: "TARGET".to_string(),
180+
data_dictionary_path: None,
181+
connection_host: "localhost".to_string(),
182+
connection_port: 9876,
183+
tls_config: None,
184+
heartbeat_interval: 30,
185+
logon_timeout: 10,
186+
logout_timeout: 2,
187+
reconnect_interval: 30,
188+
reset_on_logon: false,
189+
schedule: None,
190+
},
191+
store,
192+
application: (),
193+
message_builder,
194+
message_config,
195+
}
196+
}
197+
198+
#[tokio::test]
199+
async fn resend_messages_returns_error_for_garbled_stored_message() {
200+
let store = GarbledMessageStore {
201+
messages: vec![b"not a valid FIX message".to_vec()],
202+
};
203+
let mut ctx = create_test_ctx(store);
204+
let (sender, _receiver) = mpsc::channel(10);
205+
let writer = WriterRef::new(sender);
206+
207+
let result = resend_messages(&mut ctx, &writer, 1, 1).await;
208+
209+
assert!(
210+
matches!(result, Err(SessionOperationError::StoredMessageParse(_))),
211+
"expected StoredMessageParse error, got: {result:?}"
212+
);
213+
}
214+
}

0 commit comments

Comments
 (0)