Skip to content

Commit 131f296

Browse files
committed
Add test case for resending garbled messages
1 parent 9b24905 commit 131f296

1 file changed

Lines changed: 97 additions & 0 deletions

File tree

crates/hotfix/src/session/outbound.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,13 @@ pub async fn resend_messages<A, S: MessageStore>(
9494
.send_raw_message(RawFixMessage::new(message.encode(&ctx.message_config)?))
9595
.await;
9696

97+
// LCOV_EXCL_START
9798
if enabled!(tracing::Level::DEBUG)
9899
&& let Ok(m) = String::from_utf8(msg.clone())
99100
{
100101
debug!(sequence_number, message = m, "resent message");
101102
}
103+
// LCOV_EXCL_STOP
102104
}
103105

104106
if let Some(begin) = reset_start {
@@ -117,3 +119,98 @@ fn log_skipped_admin_messages(begin: u64, end: u64) {
117119
end, "skipped admin message(s) during resend, requesting reset for these"
118120
);
119121
}
122+
123+
#[cfg(test)]
124+
mod tests {
125+
use super::*;
126+
use crate::config::SessionConfig;
127+
use crate::session::ctx::SessionCtx;
128+
use crate::store::Result as StoreResult;
129+
use chrono::{DateTime, Utc};
130+
use hotfix_message::MessageBuilder;
131+
use hotfix_message::dict::Dictionary;
132+
use hotfix_message::message::Config as MessageConfig;
133+
use tokio::sync::mpsc;
134+
135+
#[derive(Clone)]
136+
struct GarbledMessageStore {
137+
messages: Vec<Vec<u8>>,
138+
}
139+
140+
// LCOV_EXCL_START
141+
#[async_trait::async_trait]
142+
impl MessageStore for GarbledMessageStore {
143+
async fn add(&mut self, _: u64, _: &[u8]) -> StoreResult<()> {
144+
Ok(())
145+
}
146+
async fn get_slice(&self, _: usize, _: usize) -> StoreResult<Vec<Vec<u8>>> {
147+
Ok(self.messages.clone())
148+
}
149+
fn next_sender_seq_number(&self) -> u64 {
150+
1
151+
}
152+
fn next_target_seq_number(&self) -> u64 {
153+
1
154+
}
155+
async fn increment_sender_seq_number(&mut self) -> StoreResult<()> {
156+
Ok(())
157+
}
158+
async fn increment_target_seq_number(&mut self) -> StoreResult<()> {
159+
Ok(())
160+
}
161+
async fn set_target_seq_number(&mut self, _: u64) -> StoreResult<()> {
162+
Ok(())
163+
}
164+
async fn reset(&mut self) -> StoreResult<()> {
165+
Ok(())
166+
}
167+
fn creation_time(&self) -> DateTime<Utc> {
168+
Utc::now()
169+
}
170+
}
171+
// LCOV_EXCL_STOP
172+
173+
fn create_test_ctx(store: GarbledMessageStore) -> SessionCtx<(), GarbledMessageStore> {
174+
let message_config = MessageConfig::default();
175+
let dictionary = Dictionary::fix44();
176+
let message_builder = MessageBuilder::new(dictionary, message_config).unwrap();
177+
SessionCtx {
178+
config: SessionConfig {
179+
begin_string: "FIX.4.4".to_string(),
180+
sender_comp_id: "SENDER".to_string(),
181+
target_comp_id: "TARGET".to_string(),
182+
data_dictionary_path: None,
183+
connection_host: "localhost".to_string(),
184+
connection_port: 9876,
185+
tls_config: None,
186+
heartbeat_interval: 30,
187+
logon_timeout: 10,
188+
logout_timeout: 2,
189+
reconnect_interval: 30,
190+
reset_on_logon: false,
191+
schedule: None,
192+
},
193+
store,
194+
application: (),
195+
message_builder,
196+
message_config,
197+
}
198+
}
199+
200+
#[tokio::test]
201+
async fn resend_messages_returns_error_for_garbled_stored_message() {
202+
let store = GarbledMessageStore {
203+
messages: vec![b"not a valid FIX message".to_vec()],
204+
};
205+
let mut ctx = create_test_ctx(store);
206+
let (sender, _receiver) = mpsc::channel(10);
207+
let writer = WriterRef::new(sender);
208+
209+
let result = resend_messages(&mut ctx, &writer, 1, 1).await;
210+
211+
assert!(
212+
matches!(result, Err(SessionOperationError::StoredMessageParse(_))),
213+
"expected StoredMessageParse error, got: {result:?}"
214+
);
215+
}
216+
}

0 commit comments

Comments
 (0)