Skip to content

Commit 5eef99a

Browse files
authored
fix: resolve deadlock when both sides send resendrequest simultaneously (#314)
* Bypass sequence number check when handling resend request * Remove magic strings for admin message types * Remove magic values for message type in session states * Add test case for receiving heartbeats * Add test for rejects * Attempt to fix flaky tests due to timestamp precision
1 parent 5491727 commit 5eef99a

19 files changed

Lines changed: 470 additions & 228 deletions

crates/hotfix/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ pub mod application;
2525
pub mod config;
2626
pub mod initiator;
2727
pub mod message;
28-
pub mod message_utils;
2928
pub mod session;
3029
mod session_schedule;
3130
pub mod store;

crates/hotfix/src/message.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
use hotfix_message::error::EncodingError as EncodeError;
33
pub use hotfix_message::field_types::Timestamp;
44
pub(crate) use hotfix_message::message::{Config, Message};
5-
use hotfix_message::session_fields::{MSG_SEQ_NUM, SENDER_COMP_ID, SENDING_TIME, TARGET_COMP_ID};
5+
use hotfix_message::session_fields::{
6+
MSG_SEQ_NUM, ORIG_SENDING_TIME, POSS_DUP_FLAG, SENDER_COMP_ID, SENDING_TIME, TARGET_COMP_ID,
7+
};
68
pub use hotfix_message::{Part, RepeatingGroup};
79

810
pub mod business_reject;
@@ -20,12 +22,75 @@ pub mod verification_error;
2022
pub use parser::RawFixMessage;
2123
pub use resend_request::ResendRequest;
2224

25+
use heartbeat::Heartbeat;
26+
use logon::Logon;
27+
use logout::Logout;
28+
use reject::Reject;
29+
use sequence_reset::SequenceReset;
30+
use test_request::TestRequest;
31+
32+
static ADMIN_TYPES: [&str; 7] = [
33+
Logon::MSG_TYPE,
34+
Heartbeat::MSG_TYPE,
35+
TestRequest::MSG_TYPE,
36+
ResendRequest::MSG_TYPE,
37+
Reject::MSG_TYPE,
38+
SequenceReset::MSG_TYPE,
39+
Logout::MSG_TYPE,
40+
];
41+
42+
pub fn is_admin(message_type: &str) -> bool {
43+
ADMIN_TYPES.contains(&message_type)
44+
}
45+
2346
pub trait OutboundMessage: Clone + Send + 'static {
2447
fn write(&self, msg: &mut Message);
2548

2649
fn message_type(&self) -> &str;
2750
}
2851

52+
/// Prepares a FIX message for resend per the FIX spec (PossDupFlag logic).
53+
///
54+
/// Behaviour:
55+
/// - On first resend (no PossDupFlag Y / no OrigSendingTime):
56+
/// * Move current SendingTime(52) to OrigSendingTime(122)
57+
/// * Set SendingTime(52) to the current sending time (may be equal if clock granularity causes no change)
58+
/// * Set PossDupFlag(43)=Y
59+
/// - On subsequent resends (already marked possible duplicate and has OrigSendingTime):
60+
/// * Refresh SendingTime(52) to current time (value may or may not differ from previous)
61+
pub fn prepare_message_for_resend(msg: &mut Message) -> Result<(), &'static str> {
62+
let header = msg.header_mut();
63+
64+
if header.get_raw(SENDING_TIME).is_none() {
65+
return Err("Missing SendingTime (52)");
66+
}
67+
68+
let already_poss_dup = header.get::<bool>(POSS_DUP_FLAG).unwrap_or(false);
69+
let has_orig_sending_time = header.get_raw(ORIG_SENDING_TIME).is_some();
70+
71+
if already_poss_dup && has_orig_sending_time {
72+
// Subsequent resend: refresh SendingTime only
73+
return if header.pop(SENDING_TIME).is_some() {
74+
header.set(SENDING_TIME, Timestamp::utc_now());
75+
Ok(())
76+
} else {
77+
Err("Failed to extract previous SendingTime")
78+
};
79+
}
80+
81+
// First resend path
82+
if let Some(original_sending_time_field) = header.pop(SENDING_TIME) {
83+
let original_ts = Timestamp::parse(&original_sending_time_field.data)
84+
.ok_or("Invalid original SendingTime format")?;
85+
header.set(ORIG_SENDING_TIME, original_ts);
86+
header.set(SENDING_TIME, Timestamp::utc_now());
87+
header.set(POSS_DUP_FLAG, true);
88+
Ok(())
89+
} else {
90+
Err("Failed to extract original SendingTime")
91+
}
92+
}
93+
2994
pub fn generate_message(
3095
begin_string: &str,
3196
sender_comp_id: &str,

crates/hotfix/src/message/business_reject.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub(crate) struct BusinessReject {
4949
}
5050

5151
impl BusinessReject {
52+
pub(crate) const MSG_TYPE: &str = "j";
53+
5254
pub(crate) fn new(ref_msg_type: &str, reason: BusinessRejectReason) -> Self {
5355
Self {
5456
ref_msg_type: ref_msg_type.to_string(),
@@ -100,7 +102,7 @@ impl OutboundMessage for BusinessReject {
100102
}
101103

102104
fn message_type(&self) -> &str {
103-
"j"
105+
Self::MSG_TYPE
104106
}
105107
}
106108

crates/hotfix/src/message/heartbeat.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct Heartbeat {
99
}
1010

1111
impl Heartbeat {
12+
pub const MSG_TYPE: &str = "0";
13+
1214
pub fn for_request(test_req_id: String) -> Self {
1315
Self {
1416
test_req_id: Some(test_req_id),
@@ -24,6 +26,6 @@ impl OutboundMessage for Heartbeat {
2426
}
2527

2628
fn message_type(&self) -> &str {
27-
"0"
29+
Self::MSG_TYPE
2830
}
2931
}

crates/hotfix/src/message/logon.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ pub enum ResetSeqNumConfig {
1919
}
2020

2121
impl Logon {
22+
pub const MSG_TYPE: &str = "A";
23+
2224
pub fn new(heartbeat_interval: u64, reset_config: ResetSeqNumConfig) -> Self {
2325
let (reset_seq_num_flag, next_expected_msg_seq_num) = match reset_config {
2426
ResetSeqNumConfig::Reset => (ResetSeqNumFlag::Yes, None),
@@ -45,7 +47,7 @@ impl OutboundMessage for Logon {
4547
}
4648

4749
fn message_type(&self) -> &str {
48-
"A"
50+
Self::MSG_TYPE
4951
}
5052
}
5153

crates/hotfix/src/message/logout.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct Logout {
99
}
1010

1111
impl Logout {
12+
pub const MSG_TYPE: &str = "5";
13+
1214
pub fn with_reason(reason: String) -> Self {
1315
Self { text: Some(reason) }
1416
}
@@ -22,6 +24,6 @@ impl OutboundMessage for Logout {
2224
}
2325

2426
fn message_type(&self) -> &str {
25-
"5"
27+
Self::MSG_TYPE
2628
}
2729
}

crates/hotfix/src/message/reject.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub(crate) struct Reject {
1616
}
1717

1818
impl Reject {
19+
pub(crate) const MSG_TYPE: &str = "3";
20+
1921
pub(crate) fn new(ref_seq_num: u64) -> Self {
2022
Self {
2123
ref_seq_num,
@@ -85,7 +87,7 @@ impl OutboundMessage for Reject {
8587
}
8688

8789
fn message_type(&self) -> &str {
88-
"3"
90+
Self::MSG_TYPE
8991
}
9092
}
9193

crates/hotfix/src/message/resend_request.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub struct ResendRequest {
1010
}
1111

1212
impl ResendRequest {
13+
pub const MSG_TYPE: &str = "2";
14+
1315
pub fn new(begin: u64, end: u64) -> Self {
1416
Self {
1517
begin_seq_no: begin,
@@ -25,6 +27,6 @@ impl OutboundMessage for ResendRequest {
2527
}
2628

2729
fn message_type(&self) -> &str {
28-
"2"
30+
Self::MSG_TYPE
2931
}
3032
}

crates/hotfix/src/message/sequence_reset.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ pub struct SequenceReset {
1212
pub new_seq_no: u64,
1313
}
1414

15+
impl SequenceReset {
16+
pub const MSG_TYPE: &str = "4";
17+
}
18+
1519
impl OutboundMessage for SequenceReset {
1620
fn write(&self, msg: &mut Message) {
1721
msg.set(GAP_FILL_FLAG, self.gap_fill);
@@ -25,6 +29,6 @@ impl OutboundMessage for SequenceReset {
2529
}
2630

2731
fn message_type(&self) -> &str {
28-
"4"
32+
Self::MSG_TYPE
2933
}
3034
}

crates/hotfix/src/message/test_request.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct TestRequest {
99
}
1010

1111
impl TestRequest {
12+
pub const MSG_TYPE: &str = "1";
13+
1214
pub fn new(test_req_id: String) -> Self {
1315
Self { test_req_id }
1416
}
@@ -20,6 +22,6 @@ impl OutboundMessage for TestRequest {
2022
}
2123

2224
fn message_type(&self) -> &str {
23-
"1"
25+
Self::MSG_TYPE
2426
}
2527
}

0 commit comments

Comments
 (0)