Skip to content

Commit 77820f5

Browse files
committed
Move resend completion into AwaitingResend state variant
1 parent 8154494 commit 77820f5

3 files changed

Lines changed: 48 additions & 34 deletions

File tree

crates/hotfix/src/session.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -277,46 +277,42 @@ where
277277
}
278278

279279
async fn check_end_of_resend(&mut self) -> Result<(), SessionOperationError> {
280-
let backlog = if let SessionState::AwaitingResend(state) = &mut self.state {
281-
if self.ctx.store.next_target_seq_number() > state.end_seq_number {
282-
let inbound_queue = std::mem::take(&mut state.inbound_queue);
283-
let new_state = SessionState::new_active(
284-
state.writer.clone(),
285-
self.ctx.config.heartbeat_interval,
286-
);
287-
self.apply_transition(TransitionResult::TransitionTo(new_state))
288-
.await;
289-
Some(inbound_queue)
290-
} else {
291-
None
292-
}
280+
let completed = if let SessionState::AwaitingResend(state) = &mut self.state {
281+
state.try_complete(
282+
self.ctx.store.next_target_seq_number(),
283+
self.ctx.config.heartbeat_interval,
284+
)
293285
} else {
294286
None
295287
};
296288

297-
if let Some(mut inbound_queue) = backlog {
298-
// we have reached the end of the resend,
299-
// process queued messages and resume normal operation
300-
debug!("resend is done, processing backlog");
301-
while let Some(msg) = inbound_queue.pop_front() {
302-
let seq_number: u64 = msg.get(MSG_SEQ_NUM).unwrap_or_else(|e| {
303-
error!("failed to get seq number: {:?}", e);
304-
0
305-
});
306-
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap_or("");
307-
debug!(seq_number, msg_type, "processing queued message");
308-
309-
if msg_type == ResendRequest::MSG_TYPE {
310-
// ResendRequest was already processed when it arrived (it bypasses
311-
// the queue in process_message). Just increment the target seq number
312-
// for sequence accounting purposes.
313-
self.ctx.store.increment_target_seq_number().await?;
314-
} else {
315-
self.process_message(msg).await?;
316-
}
289+
let Some((new_state, mut backlog)) = completed else {
290+
return Ok(());
291+
};
292+
293+
self.apply_transition(TransitionResult::TransitionTo(new_state))
294+
.await;
295+
296+
// Process queued messages and resume normal operation
297+
debug!("resend is done, processing backlog");
298+
while let Some(msg) = backlog.pop_front() {
299+
let seq_number: u64 = msg.get(MSG_SEQ_NUM).unwrap_or_else(|e| {
300+
error!("failed to get seq number: {:?}", e);
301+
0
302+
});
303+
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap_or("");
304+
debug!(seq_number, msg_type, "processing queued message");
305+
306+
if msg_type == ResendRequest::MSG_TYPE {
307+
// ResendRequest was already processed when it arrived (it bypasses
308+
// the queue in process_message). Just increment the target seq number
309+
// for sequence accounting purposes.
310+
self.ctx.store.increment_target_seq_number().await?;
311+
} else {
312+
self.process_message(msg).await?;
317313
}
318-
debug!("resend backlog is cleared, resuming normal operation");
319314
}
315+
debug!("resend backlog is cleared, resuming normal operation");
320316

321317
Ok(())
322318
}

crates/hotfix/src/session/ctx.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub(crate) enum TransitionResult {
2020
///
2121
/// Before verification and dispatch, the current state gets a chance to
2222
/// decide whether a message should be processed, queued, or rejected.
23+
#[allow(clippy::large_enum_variant)]
2324
pub(crate) enum PreProcessDecision {
2425
/// Continue processing this message through verification and dispatch.
2526
Accept(Message),

crates/hotfix/src/session/state/awaiting_resend.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,23 @@ impl AwaitingResendState {
4242
}
4343
}
4444

45+
/// Check whether the resend is complete. If the next expected target sequence number
46+
/// exceeds the end of the gap, return the queued backlog for replay and transition
47+
/// to Active. Otherwise return `None`.
48+
pub(crate) fn try_complete(
49+
&mut self,
50+
next_target_seq: u64,
51+
heartbeat_interval: u64,
52+
) -> Option<(SessionState, VecDeque<Message>)> {
53+
if next_target_seq > self.end_seq_number {
54+
let backlog = std::mem::take(&mut self.inbound_queue);
55+
let new_state = SessionState::new_active(self.writer.clone(), heartbeat_interval);
56+
Some((new_state, backlog))
57+
} else {
58+
None
59+
}
60+
}
61+
4562
pub(crate) fn pre_process_inbound(&mut self, message: Message) -> PreProcessDecision {
4663
let dominated_by_resend = message
4764
.header()

0 commit comments

Comments
 (0)