Skip to content

Commit 391c96f

Browse files
authored
feat: support non-gap-fill sequence resets (#255)
* Fix handling of sequence resets when they are not gap fills * Add test case for happy gap fill flow during resend * Better test cases for sequence number mismatches during sequence resets * Add test case for happy reset flow * Add test case for processing sequence reset with sequence number lower than expected * Add test case for new seq number being lower than expected in resets * Add test case for resets without the NewSeqNo tag
1 parent c88494d commit 391c96f

9 files changed

Lines changed: 437 additions & 41 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
*.db
66
/data/*
77
flamegraph.svg
8-
tracing*
8+
tracing*
9+
lcov.info

crates/hotfix/src/message/verification.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const SENDING_TIME_THRESHOLD: u64 = 120;
1313
pub(crate) fn verify_message(
1414
message: &Message,
1515
config: &SessionConfig,
16-
expected_seq_number: u64,
16+
expected_seq_number: Option<u64>,
1717
) -> Result<(), MessageVerificationError> {
1818
check_begin_string(message, config.begin_string.as_str())?;
1919
let actual_seq_number: u64 = message.header().get(fix44::MSG_SEQ_NUM).unwrap_or_default();
@@ -33,7 +33,9 @@ pub(crate) fn verify_message(
3333
check_original_sending_time(message, actual_seq_number, sending_time)?;
3434
}
3535

36-
check_sequence_number(actual_seq_number, expected_seq_number, possible_duplicate)?;
36+
if let Some(expected_seq_number) = expected_seq_number {
37+
check_sequence_number(actual_seq_number, expected_seq_number, possible_duplicate)?;
38+
}
3739

3840
Ok(())
3941
}
@@ -217,7 +219,7 @@ mod tests {
217219
let config = build_test_config();
218220
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 42);
219221

220-
let result = verify_message(&msg, &config, 42);
222+
let result = verify_message(&msg, &config, Some(42));
221223

222224
assert!(result.is_ok());
223225
}
@@ -227,7 +229,7 @@ mod tests {
227229
let config = build_test_config();
228230
let msg = build_test_message("FIX.4.2", "TARGET", "SENDER", 42);
229231

230-
let result = verify_message(&msg, &config, 42);
232+
let result = verify_message(&msg, &config, Some(42));
231233

232234
assert!(matches!(
233235
result,
@@ -243,7 +245,7 @@ mod tests {
243245
let config = build_test_config();
244246
let msg = build_test_message("FIX.4.4", "WRONG_SENDER", "SENDER", 42);
245247

246-
let result = verify_message(&msg, &config, 42);
248+
let result = verify_message(&msg, &config, Some(42));
247249

248250
assert!(matches!(
249251
result,
@@ -269,7 +271,7 @@ mod tests {
269271
let config = build_test_config();
270272
let msg = build_test_message("FIX.4.4", "TARGET", "WRONG_TARGET", 42);
271273

272-
let result = verify_message(&msg, &config, 42);
274+
let result = verify_message(&msg, &config, Some(42));
273275

274276
assert!(matches!(
275277
result,
@@ -295,7 +297,7 @@ mod tests {
295297
let config = build_test_config();
296298
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 40);
297299

298-
let result = verify_message(&msg, &config, 42);
300+
let result = verify_message(&msg, &config, Some(42));
299301

300302
assert!(matches!(
301303
result,
@@ -321,7 +323,7 @@ mod tests {
321323
msg.header_mut()
322324
.set(fix44::ORIG_SENDING_TIME, Timestamp::utc_now());
323325

324-
let result = verify_message(&msg, &config, 42);
326+
let result = verify_message(&msg, &config, Some(42));
325327

326328
assert!(matches!(
327329
result,
@@ -344,7 +346,7 @@ mod tests {
344346
let config = build_test_config();
345347
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 50);
346348

347-
let result = verify_message(&msg, &config, 42);
349+
let result = verify_message(&msg, &config, Some(42));
348350

349351
assert!(matches!(
350352
result,
@@ -363,7 +365,7 @@ mod tests {
363365
msg.header_mut().set(fix44::POSS_DUP_FLAG, true);
364366
// Don't set OrigSendingTime
365367

366-
let result = verify_message(&msg, &config, 42);
368+
let result = verify_message(&msg, &config, Some(42));
367369

368370
assert!(matches!(
369371
result,
@@ -388,7 +390,7 @@ mod tests {
388390
msg.header_mut().pop(fix44::SENDING_TIME);
389391
msg.header_mut().set(fix44::SENDING_TIME, sending_time);
390392

391-
let result = verify_message(&msg, &config, 42);
393+
let result = verify_message(&msg, &config, Some(42));
392394

393395
assert!(result.is_ok());
394396
}
@@ -407,7 +409,7 @@ mod tests {
407409
msg.header_mut().pop(fix44::SENDING_TIME);
408410
msg.header_mut().set(fix44::SENDING_TIME, sending_time);
409411

410-
let result = verify_message(&msg, &config, 42);
412+
let result = verify_message(&msg, &config, Some(42));
411413

412414
assert!(matches!(
413415
result,
@@ -437,7 +439,7 @@ mod tests {
437439
msg.header_mut().pop(fix44::SENDING_TIME);
438440
msg.header_mut().set(fix44::SENDING_TIME, timestamp);
439441

440-
let result = verify_message(&msg, &config, 42);
442+
let result = verify_message(&msg, &config, Some(42));
441443

442444
// equal timestamps should be valid (orig <= sending)
443445
assert!(result.is_ok());
@@ -455,7 +457,7 @@ mod tests {
455457
// remove begin string, which is automatically added by `Message::new`
456458
msg.header_mut().pop(fix44::BEGIN_STRING);
457459

458-
let result = verify_message(&msg, &config, 42);
460+
let result = verify_message(&msg, &config, Some(42));
459461

460462
assert!(matches!(
461463
result,
@@ -471,7 +473,7 @@ mod tests {
471473
msg.set(fix44::MSG_SEQ_NUM, 42u64);
472474
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
473475

474-
let result = verify_message(&msg, &config, 42);
476+
let result = verify_message(&msg, &config, Some(42));
475477

476478
assert!(matches!(
477479
result,
@@ -490,7 +492,7 @@ mod tests {
490492
msg.set(fix44::MSG_SEQ_NUM, 42u64);
491493
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
492494

493-
let result = verify_message(&msg, &config, 42);
495+
let result = verify_message(&msg, &config, Some(42));
494496

495497
assert!(matches!(
496498
result,
@@ -509,7 +511,7 @@ mod tests {
509511
msg.set(fix44::TARGET_COMP_ID, "SENDER");
510512
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
511513

512-
let result = verify_message(&msg, &config, 42);
514+
let result = verify_message(&msg, &config, Some(42));
513515

514516
// missing seq num defaults to 0, which will be too low
515517
assert!(matches!(
@@ -523,7 +525,7 @@ mod tests {
523525
let config = build_test_config();
524526
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 0);
525527

526-
let result = verify_message(&msg, &config, 1);
528+
let result = verify_message(&msg, &config, Some(1));
527529

528530
assert!(matches!(
529531
result,
@@ -536,7 +538,7 @@ mod tests {
536538
let config = build_test_config();
537539
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 1);
538540

539-
let result = verify_message(&msg, &config, 1);
541+
let result = verify_message(&msg, &config, Some(1));
540542

541543
assert!(result.is_ok());
542544
}
@@ -547,7 +549,7 @@ mod tests {
547549
// wrong begin string AND wrong seq num - begin string error should come first
548550
let msg = build_test_message("FIX.4.2", "TARGET", "SENDER", 100);
549551

550-
let result = verify_message(&msg, &config, 42);
552+
let result = verify_message(&msg, &config, Some(42));
551553

552554
assert!(matches!(
553555
result,
@@ -561,7 +563,7 @@ mod tests {
561563
// wrong sender and wrong target - sender error should come first
562564
let msg = build_test_message("FIX.4.4", "WRONG_SENDER", "WRONG_TARGET", 42);
563565

564-
let result = verify_message(&msg, &config, 42);
566+
let result = verify_message(&msg, &config, Some(42));
565567

566568
assert!(matches!(
567569
result,
@@ -580,7 +582,7 @@ mod tests {
580582
msg.set(fix44::TARGET_COMP_ID, "SENDER");
581583
msg.set(fix44::MSG_SEQ_NUM, 42u64);
582584

583-
let result = verify_message(&msg, &config, 42);
585+
let result = verify_message(&msg, &config, Some(42));
584586

585587
assert!(matches!(
586588
result,
@@ -607,7 +609,7 @@ mod tests {
607609
let past_timestamp: Timestamp = past_time.naive_utc().into();
608610
msg.set(fix44::SENDING_TIME, past_timestamp);
609611

610-
let result = verify_message(&msg, &config, 42);
612+
let result = verify_message(&msg, &config, Some(42));
611613

612614
assert!(matches!(
613615
result,
@@ -634,7 +636,7 @@ mod tests {
634636
let future_timestamp: Timestamp = future_time.naive_utc().into();
635637
msg.set(fix44::SENDING_TIME, future_timestamp);
636638

637-
let result = verify_message(&msg, &config, 42);
639+
let result = verify_message(&msg, &config, Some(42));
638640

639641
assert!(matches!(
640642
result,
@@ -661,7 +663,7 @@ mod tests {
661663
let boundary_timestamp: Timestamp = boundary_time.naive_utc().into();
662664
msg.set(fix44::SENDING_TIME, boundary_timestamp);
663665

664-
let result = verify_message(&msg, &config, 42);
666+
let result = verify_message(&msg, &config, Some(42));
665667

666668
assert!(result.is_ok());
667669
}
@@ -682,7 +684,7 @@ mod tests {
682684
let valid_timestamp: Timestamp = valid_time.naive_utc().into();
683685
msg.set(fix44::SENDING_TIME, valid_timestamp);
684686

685-
let result = verify_message(&msg, &config, 42);
687+
let result = verify_message(&msg, &config, Some(42));
686688

687689
assert!(result.is_ok());
688690
}

crates/hotfix/src/session.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,6 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
167167
let message_type = message.header().get(fix44::MSG_TYPE)?;
168168

169169
if let SessionState::AwaitingResend(state) = &mut self.state {
170-
// TODO: consider what messages won't have a sequence number?
171-
// e.g. SequenceReset?
172170
let seq_number: u64 = message
173171
.header()
174172
.get(fix44::MSG_SEQ_NUM)
@@ -216,7 +214,7 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
216214
}
217215

218216
async fn process_app_message(&mut self, message: &Message) -> Result<()> {
219-
match self.verify_message(message) {
217+
match self.verify_message(message, true) {
220218
Ok(_) => {
221219
let parsed_message = M::parse(message);
222220
if matches!(
@@ -268,8 +266,14 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
268266
fn verify_message(
269267
&self,
270268
message: &Message,
269+
verify_target_seq_number: bool,
271270
) -> std::result::Result<(), MessageVerificationError> {
272-
verify_message(message, &self.config, self.store.next_target_seq_number())
271+
let expected_seq_number = if verify_target_seq_number {
272+
Some(self.store.next_target_seq_number())
273+
} else {
274+
None
275+
};
276+
verify_message(message, &self.config, expected_seq_number)
273277
}
274278

275279
async fn on_connect(&mut self, writer: WriterRef) {
@@ -300,9 +304,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
300304
}
301305

302306
async fn on_logon(&mut self, message: &Message) -> Result<()> {
303-
// TODO: this should wait to see if a resend request is sent
304307
if let SessionState::AwaitingLogon { writer, .. } = &self.state {
305-
match self.verify_message(message) {
308+
match self.verify_message(message, true) {
306309
Ok(_) => {
307310
// happy logon flow, the session is now active
308311
self.state =
@@ -433,13 +436,50 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
433436
}
434437

435438
async fn on_sequence_reset(&mut self, message: &Message) -> Result<()> {
436-
let gap_fill: bool = message.get(fix44::GAP_FILL_FLAG).unwrap();
437-
if !gap_fill {
438-
// TODO: non gap fill is valid as well of course, but I don't yet know the use-case for it is
439-
panic!("expected sequence reset with gap fill");
439+
let msg_seq_num = message
440+
.header()
441+
.get(fix44::MSG_SEQ_NUM)
442+
.map_err(|_| anyhow!("failed to get seq number"))?;
443+
let is_gap_fill: bool = message.get(fix44::GAP_FILL_FLAG).unwrap_or(false);
444+
if let Err(err) = self.verify_message(message, is_gap_fill) {
445+
self.handle_verification_error(err).await;
446+
return Ok(());
447+
}
448+
449+
let end: u64 = match message.get(fix44::NEW_SEQ_NO) {
450+
Ok(new_seq_no) => new_seq_no,
451+
Err(err) => {
452+
error!(
453+
"received sequence reset message without new sequence number: {:?}",
454+
err
455+
);
456+
let reject = Reject::new(msg_seq_num)
457+
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
458+
.text("missing NewSeqNo tag in sequence reset message");
459+
self.send_message(reject).await;
460+
461+
// note: we don't increment the target seq number here
462+
// this is an ambiguous case in the specification, but leaving the
463+
// sequence number as is feels the safest
464+
return Ok(());
465+
}
466+
};
467+
468+
// sequence resets cannot move the target seq number backwards
469+
// regardless of whether the message is a gap fill or not
470+
if end <= self.store.next_target_seq_number() {
471+
error!(
472+
"received sequence reset message which would move target seq number backwards: {end}",
473+
);
474+
let text =
475+
format!("attempt to lower sequence number, invalid value NewSeqNo(36)={end}");
476+
let reject = Reject::new(msg_seq_num)
477+
.session_reject_reason(SessionRejectReason::ValueIsIncorrect)
478+
.text(&text);
479+
self.send_message(reject).await;
480+
return Ok(());
440481
}
441482

442-
let end: u64 = message.get(fix44::NEW_SEQ_NO).unwrap();
443483
self.store.set_target_seq_number(end - 1).await
444484
}
445485

crates/hotfix/tests/common/actions.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,15 @@ impl When<&mut FakeCounterparty<TestMessage>> {
4747
}
4848

4949
pub async fn sends_gap_fill(&mut self, start_seq_no: u64, new_seq_no: u64) {
50-
self.target.send_gap_fill(start_seq_no, new_seq_no).await;
50+
self.target
51+
.send_sequence_reset(start_seq_no, new_seq_no, true)
52+
.await;
53+
}
54+
55+
pub async fn sends_sequence_reset(&mut self, start_seq_no: u64, new_seq_no: u64) {
56+
self.target
57+
.send_sequence_reset(start_seq_no, new_seq_no, false)
58+
.await;
5159
}
5260

5361
pub async fn sends_logon(&mut self) {

crates/hotfix/tests/common/fakes/fake_counterparty.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,14 @@ where
117117
}
118118
}
119119

120-
pub async fn send_gap_fill(&mut self, start_seq_no: u64, new_seq_no: u64) {
120+
pub async fn send_sequence_reset(
121+
&mut self,
122+
start_seq_no: u64,
123+
new_seq_no: u64,
124+
gap_fill: bool,
125+
) {
121126
let sequence_reset = SequenceReset {
122-
gap_fill: true,
127+
gap_fill,
123128
new_seq_no,
124129
};
125130
let raw_message = generate_message(

crates/hotfix/tests/common/test_messages.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,3 +395,14 @@ pub fn build_invalid_resend_request(
395395

396396
msg.encode(&Config::default()).unwrap()
397397
}
398+
399+
pub fn build_sequence_reset_without_new_seq_no(msg_seq_num: u64) -> Vec<u8> {
400+
let mut msg = Message::new("FIX.4.4", "4"); // MsgType 4 = SequenceReset
401+
msg.set(fix44::SENDER_COMP_ID, COUNTERPARTY_COMP_ID);
402+
msg.set(fix44::TARGET_COMP_ID, OUR_COMP_ID);
403+
msg.set(fix44::MSG_SEQ_NUM, msg_seq_num);
404+
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
405+
// Deliberately omit NEW_SEQ_NO to create an invalid SequenceReset
406+
407+
msg.encode(&Config::default()).unwrap()
408+
}

0 commit comments

Comments
 (0)