Skip to content

Commit 06fec66

Browse files
committed
Centralise verification call for inbound messages
1 parent 3f97c0b commit 06fec66

1 file changed

Lines changed: 50 additions & 93 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 50 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -210,13 +210,39 @@ where
210210
}
211211

212212
if let SessionState::AwaitingLogon(_) = &mut self.state {
213-
// TODO: should this (and all inbound message processing) logic be pushed into the state?
214213
if message_type != Logon::MSG_TYPE {
215214
self.state.disconnect_writer().await;
216215
return Ok(());
217216
}
218217
}
219218

219+
// Logon has its own verification inside the AwaitingLogon guard
220+
if message_type != Logon::MSG_TYPE {
221+
let (check_too_high, check_too_low) = match message_type {
222+
// check_too_high=false: QFJ-673 deadlock fix. When both sides send
223+
// ResendRequest simultaneously, each side's ResendRequest will have a seq
224+
// number higher than expected. By not treating that as an error, we allow
225+
// the ResendRequest to be processed.
226+
ResendRequest::MSG_TYPE => (false, true),
227+
Reject::MSG_TYPE => (false, true),
228+
Logout::MSG_TYPE => (false, false),
229+
SequenceReset::MSG_TYPE => {
230+
let is_gap_fill: bool = message.get(GAP_FILL_FLAG).unwrap_or(false);
231+
(is_gap_fill, is_gap_fill)
232+
}
233+
_ => (true, true),
234+
};
235+
236+
if let VerificationResult::Issue(result) = self
237+
.state
238+
.handle_verification_issue(&mut self.ctx, &message, check_too_high, check_too_low)
239+
.await?
240+
{
241+
self.apply_transition(result);
242+
return Ok(());
243+
}
244+
}
245+
220246
match message_type {
221247
Heartbeat::MSG_TYPE => {
222248
self.on_heartbeat(&message).await?;
@@ -228,13 +254,13 @@ where
228254
self.on_resend_request(&message).await?;
229255
}
230256
Reject::MSG_TYPE => {
231-
self.on_reject(&message).await?;
257+
self.on_reject().await?;
232258
}
233259
SequenceReset::MSG_TYPE => {
234260
self.on_sequence_reset(&message).await?;
235261
}
236262
Logout::MSG_TYPE => {
237-
self.on_logout(&message).await?;
263+
self.on_logout().await?;
238264
}
239265
Logon::MSG_TYPE => {
240266
self.on_logon(&message).await?;
@@ -249,39 +275,28 @@ where
249275
&mut self,
250276
message: &Message,
251277
) -> Result<(), SessionOperationError> {
252-
match self
253-
.state
254-
.handle_verification_issue(&mut self.ctx, message, true, true)
255-
.await?
256-
{
257-
VerificationResult::Issue(result) => {
258-
self.apply_transition(result);
259-
}
260-
VerificationResult::Passed => {
261-
match self.ctx.application.on_inbound_message(message).await {
262-
InboundDecision::Accept => {}
263-
InboundDecision::Reject { reason, text } => {
264-
let msg_type: &str = message
265-
.header()
266-
.get(MSG_TYPE)
267-
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
268-
let mut reject = BusinessReject::new(msg_type, reason)
269-
.ref_seq_num(get_msg_seq_num(message));
270-
if let Some(text) = text {
271-
reject = reject.text(&text);
272-
}
273-
self.send_message(reject)
274-
.await
275-
.with_send_context("business message reject")?;
276-
}
277-
InboundDecision::TerminateSession => {
278-
error!("failed to send inbound message to application");
279-
self.state.disconnect_writer().await;
280-
}
278+
match self.ctx.application.on_inbound_message(message).await {
279+
InboundDecision::Accept => {}
280+
InboundDecision::Reject { reason, text } => {
281+
let msg_type: &str = message
282+
.header()
283+
.get(MSG_TYPE)
284+
.map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?;
285+
let mut reject =
286+
BusinessReject::new(msg_type, reason).ref_seq_num(get_msg_seq_num(message));
287+
if let Some(text) = text {
288+
reject = reject.text(&text);
281289
}
282-
self.ctx.store.increment_target_seq_number().await?;
290+
self.send_message(reject)
291+
.await
292+
.with_send_context("business message reject")?;
293+
}
294+
InboundDecision::TerminateSession => {
295+
error!("failed to send inbound message to application");
296+
self.state.disconnect_writer().await;
283297
}
284298
}
299+
self.ctx.store.increment_target_seq_number().await?;
285300

286301
Ok(())
287302
}
@@ -383,16 +398,7 @@ where
383398
Ok(())
384399
}
385400

386-
async fn on_logout(&mut self, message: &Message) -> Result<(), SessionOperationError> {
387-
if let VerificationResult::Issue(result) = self
388-
.state
389-
.handle_verification_issue(&mut self.ctx, message, false, false)
390-
.await?
391-
{
392-
self.apply_transition(result);
393-
return Ok(());
394-
}
395-
401+
async fn on_logout(&mut self) -> Result<(), SessionOperationError> {
396402
if self.state.is_logged_on() {
397403
self.state
398404
.send_logout(&mut self.ctx, "Logout acknowledged")
@@ -424,15 +430,6 @@ where
424430
}
425431

426432
async fn on_heartbeat(&mut self, message: &Message) -> Result<(), SessionOperationError> {
427-
if let VerificationResult::Issue(result) = self
428-
.state
429-
.handle_verification_issue(&mut self.ctx, message, true, true)
430-
.await?
431-
{
432-
self.apply_transition(result);
433-
return Ok(());
434-
}
435-
436433
if let (Some(expected_req_id), Ok(message_req_id)) = (
437434
&self.state.expected_test_response_id(),
438435
message.get::<&str>(TEST_REQ_ID),
@@ -447,15 +444,6 @@ where
447444
}
448445

449446
async fn on_test_request(&mut self, message: &Message) -> Result<(), SessionOperationError> {
450-
if let VerificationResult::Issue(result) = self
451-
.state
452-
.handle_verification_issue(&mut self.ctx, message, true, true)
453-
.await?
454-
{
455-
self.apply_transition(result);
456-
return Ok(());
457-
}
458-
459447
let req_id: &str = message.get(TEST_REQ_ID).unwrap_or_else(|_| {
460448
// TODO: send reject?
461449
todo!()
@@ -476,19 +464,6 @@ where
476464
return Ok(());
477465
}
478466

479-
// Verify with check_too_high=false so ResendRequest is never blocked by seq-too-high.
480-
// This is the key part of the QFJ-673 deadlock fix: when both sides send ResendRequest
481-
// simultaneously, each side's ResendRequest will have a seq number higher than expected.
482-
// By not treating that as an error, we allow the ResendRequest to be processed.
483-
if let VerificationResult::Issue(result) = self
484-
.state
485-
.handle_verification_issue(&mut self.ctx, message, false, true)
486-
.await?
487-
{
488-
self.apply_transition(result);
489-
return Ok(());
490-
}
491-
492467
let msg_seq_num = get_msg_seq_num(message);
493468
let expected = self.ctx.store.next_target_seq_number();
494469

@@ -548,31 +523,13 @@ where
548523
}
549524

550525
/// Handle Reject messages.
551-
async fn on_reject(&mut self, message: &Message) -> Result<(), SessionOperationError> {
552-
if let VerificationResult::Issue(result) = self
553-
.state
554-
.handle_verification_issue(&mut self.ctx, message, false, true)
555-
.await?
556-
{
557-
self.apply_transition(result);
558-
return Ok(());
559-
}
560-
526+
async fn on_reject(&mut self) -> Result<(), SessionOperationError> {
561527
self.ctx.store.increment_target_seq_number().await?;
562528
Ok(())
563529
}
564530

565531
async fn on_sequence_reset(&mut self, message: &Message) -> Result<(), SessionOperationError> {
566532
let msg_seq_num = get_msg_seq_num(message);
567-
let is_gap_fill: bool = message.get(GAP_FILL_FLAG).unwrap_or(false);
568-
if let VerificationResult::Issue(result) = self
569-
.state
570-
.handle_verification_issue(&mut self.ctx, message, is_gap_fill, is_gap_fill)
571-
.await?
572-
{
573-
self.apply_transition(result);
574-
return Ok(());
575-
}
576533

577534
let end: u64 = match message.get(NEW_SEQ_NO) {
578535
Ok(new_seq_no) => new_seq_no,

0 commit comments

Comments
 (0)