Skip to content

Commit 704b6bc

Browse files
authored
feat: allow one off restarts with sequence number reset (#241)
* Split up event queue for sessions so that messages can be prioritised * Split out app facing session handle which doesn't support internal actions * Support action to reset sequence numbers as a one-off action * Allow shutdown action to set the reconnect flag * Expose reset on next logon action via public interface * Add test case for requesting a one-off reset and then reconnecting * Add unit test suite for socket reader * Add unit test suite for socket writer * Add doc comment for SessionHandle
1 parent 17bb026 commit 704b6bc

26 files changed

Lines changed: 723 additions & 155 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/hotfix-status/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ ui = ["askama", "mime_guess", "rust-embed"]
2020
[dependencies]
2121
hotfix = { version = "0.2.9", path = "../hotfix" }
2222

23+
anyhow = { workspace = true }
2324
askama = { workspace = true, features = ["serde_json"], optional = true }
2425
async-trait = { workspace = true }
2526
axum = { workspace = true }

crates/hotfix-status/src/api.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::AppState;
22
use crate::data_provider::DataProvider;
3+
use crate::error::AppResult;
34
use axum::extract::State;
45
use axum::routing::get;
56
use axum::{Json, Router};
@@ -30,8 +31,8 @@ struct SessionInfoResponse {
3031

3132
async fn get_session_info<P: DataProvider>(
3233
State(state): State<AppState<P>>,
33-
) -> Json<SessionInfoResponse> {
34-
let session_info = state.data_provider.get_session_info().await;
34+
) -> AppResult<Json<SessionInfoResponse>> {
35+
let session_info = state.data_provider.get_session_info().await?;
3536

36-
Json(SessionInfoResponse { session_info })
37+
Ok(Json(SessionInfoResponse { session_info }))
3738
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
use hotfix::message::FixMessage;
2-
use hotfix::session::{SessionInfo, SessionRef};
2+
use hotfix::session::{SessionHandle, SessionInfo};
33

44
#[async_trait::async_trait]
55
pub trait DataProvider: Clone + Send + Sync {
6-
async fn get_session_info(&self) -> SessionInfo;
6+
async fn get_session_info(&self) -> anyhow::Result<SessionInfo>;
77
}
88

99
#[derive(Clone)]
1010
pub struct SessionDataProvider<M> {
11-
pub(crate) session_ref: SessionRef<M>,
11+
pub(crate) session_handle: SessionHandle<M>,
1212
}
1313

1414
#[async_trait::async_trait]
1515
impl<M: FixMessage> DataProvider for SessionDataProvider<M> {
16-
async fn get_session_info(&self) -> SessionInfo {
17-
self.session_ref.get_session_info().await
16+
async fn get_session_info(&self) -> anyhow::Result<SessionInfo> {
17+
self.session_handle.get_session_info().await
1818
}
1919
}

crates/hotfix-status/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use axum::response::{IntoResponse, Response};
33

44
#[derive(Debug, displaydoc::Display, thiserror::Error)]
55
pub enum AppError {
6+
/// General anyhow errors
7+
Anyhow(#[from] anyhow::Error),
68
#[cfg(feature = "ui")]
79
/// could not render the template
810
Render(#[from] askama::Error),

crates/hotfix-status/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ use crate::api::build_api_router;
99
use crate::data_provider::{DataProvider, SessionDataProvider};
1010
use axum::Router;
1111
use hotfix::message::FixMessage;
12-
use hotfix::session::SessionRef;
12+
use hotfix::session::SessionHandle;
1313

1414
#[derive(Clone)]
1515
struct AppState<P> {
1616
data_provider: P,
1717
}
1818

19-
pub fn build_router<M: FixMessage>(session_ref: SessionRef<M>) -> Router {
20-
let data_provider = SessionDataProvider { session_ref };
19+
pub fn build_router<M: FixMessage>(session_handle: SessionHandle<M>) -> Router {
20+
let data_provider = SessionDataProvider { session_handle };
2121
build_router_with_provider(data_provider)
2222
}
2323

@@ -55,8 +55,8 @@ mod tests {
5555

5656
#[async_trait::async_trait]
5757
impl DataProvider for FakeDataProvider {
58-
async fn get_session_info(&self) -> SessionInfo {
59-
self.session_info.clone()
58+
async fn get_session_info(&self) -> anyhow::Result<SessionInfo> {
59+
Ok(self.session_info.clone())
6060
}
6161
}
6262

crates/hotfix-status/src/ui/dashboard.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct DashboardTemplate<'a> {
1919
pub(crate) async fn dashboard_handler<P: DataProvider>(
2020
State(state): State<AppState<P>>,
2121
) -> AppResult<impl IntoResponse> {
22-
let session_info = state.data_provider.get_session_info().await;
22+
let session_info = state.data_provider.get_session_info().await?;
2323
let timestamp_string = Utc::now().to_rfc3339();
2424

2525
let template = DashboardTemplate {

crates/hotfix/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ keywords.workspace = true
1212
categories.workspace = true
1313

1414
[features]
15+
default = ["test-utils"]
1516
redb = ["dep:redb"]
1617
mongodb = ["dep:mongodb"]
18+
test-utils = []
1719

1820
[lints]
1921
workspace = true

crates/hotfix/src/initiator.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ use tracing::{debug, warn};
1515
use crate::application::Application;
1616
use crate::config::SessionConfig;
1717
use crate::message::FixMessage;
18-
use crate::session::SessionRef;
18+
use crate::session::{InternalSessionRef, SessionHandle};
1919
use crate::store::MessageStore;
2020
use crate::transport::connect;
2121

2222
#[derive(Clone)]
2323
pub struct Initiator<M> {
2424
pub config: SessionConfig,
25-
session: SessionRef<M>,
25+
session_handle: SessionHandle<M>,
2626
completion_rx: watch::Receiver<bool>,
2727
}
2828

@@ -32,7 +32,7 @@ impl<M: FixMessage> Initiator<M> {
3232
application: impl Application<M>,
3333
store: impl MessageStore + Send + Sync + 'static,
3434
) -> Self {
35-
let session_ref = SessionRef::new(config.clone(), application, store);
35+
let session_ref = InternalSessionRef::new(config.clone(), application, store);
3636
let (completion_tx, completion_rx) = watch::channel(false);
3737

3838
tokio::spawn({
@@ -43,25 +43,27 @@ impl<M: FixMessage> Initiator<M> {
4343

4444
Self {
4545
config,
46-
session: session_ref,
46+
session_handle: session_ref.into(),
4747
completion_rx,
4848
}
4949
}
5050

51-
pub async fn send_message(&self, msg: M) {
52-
self.session.send_message(msg).await;
51+
pub async fn send_message(&self, msg: M) -> anyhow::Result<()> {
52+
self.session_handle.send_message(msg).await?;
53+
54+
Ok(())
5355
}
5456

5557
pub fn is_interested(&self, sender_comp_id: &str, target_comp_id: &str) -> bool {
5658
self.config.sender_comp_id == sender_comp_id && self.config.target_comp_id == target_comp_id
5759
}
5860

59-
pub fn session_ref(&self) -> SessionRef<M> {
60-
self.session.clone()
61+
pub fn session_handle(&self) -> SessionHandle<M> {
62+
self.session_handle.clone()
6163
}
6264

63-
pub async fn shutdown(self) -> Result<(), Elapsed> {
64-
self.session.shutdown().await;
65+
pub async fn shutdown(self, reconnect: bool) -> Result<(), Elapsed> {
66+
self.session_handle.shutdown(reconnect).await;
6567
tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown()).await
6668
}
6769

@@ -84,7 +86,7 @@ impl<M: FixMessage> Initiator<M> {
8486

8587
async fn establish_connection<M: FixMessage>(
8688
config: SessionConfig,
87-
session_ref: SessionRef<M>,
89+
session_ref: InternalSessionRef<M>,
8890
completion_tx: watch::Sender<bool>,
8991
) {
9092
loop {

0 commit comments

Comments
 (0)