diff --git a/Cargo.lock b/Cargo.lock index 0087389..9b300e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,9 +197,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" @@ -341,12 +341,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" -dependencies = [ - "powerfmt", -] +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" [[package]] name = "digest" @@ -1081,12 +1078,11 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" [[package]] name = "nu-ansi-term" -version = "0.46.0" +version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "overload", - "winapi", + "windows-sys 0.59.0", ] [[package]] @@ -1101,9 +1097,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" [[package]] name = "num-integer" @@ -1182,12 +1178,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "parking_lot" version = "0.12.4" @@ -1492,9 +1482,9 @@ dependencies = [ [[package]] name = "rkyv" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" dependencies = [ "bitvec", "bytecheck", @@ -1510,9 +1500,9 @@ dependencies = [ [[package]] name = "rkyv_derive" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" dependencies = [ "proc-macro2", "quote", @@ -1665,18 +1655,28 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1977,30 +1977,29 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "711a53c2d47bbd818258c498c8dbfe186a2526c631495cfe7e078567f86b8469" dependencies = [ "deranged", - "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "9e1c906769ad99c88eaa54e728060edef082f8e358ff32030cb7c7d315e81109" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "71c652a3727a9cbb9a02f707f530b618ce00d0ccd762009c8c23bd191df3c17d" dependencies = [ "num-conv", "time-core", @@ -2155,9 +2154,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -2176,9 +2175,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "nu-ansi-term", "sharded-slab", diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 93988df..36545d0 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -105,8 +105,8 @@ Risks and debt: - `ExchangeConnector` is not the best documentation surface today; prefer the smaller capability traits when writing examples or new code. - `src/lib.rs` re-exports only part of the exchange surface. The reliable public namespace is still `lotusx::exchanges::`. - Some legacy compatibility constructors keep old names or parameters while delegating to the new builders. Prefer the direct `build_connector*` functions in new code. -- `ExchangeFactory` is for latency tests and demos, not the canonical construction path for every production connector. In particular, check exchange-specific builders when credentials, passphrases, testnet URLs, or custom base URLs matter. -- `RestClientConfig::max_retries` is currently configuration intent, not a guarantee that every request path retries. +- `ExchangeFactory` is for latency tests and demos. It now delegates Bybit and OKX creation through the exchange builders, but production code should still prefer exchange-specific builders when credentials, passphrases, or custom behavior matter. +- `RestClientConfig::max_retries` applies to retryable GET and DELETE failures. POST requests are not automatically retried to avoid repeating order-style side effects. - `KlineInterval` includes exchange-specific formatting helpers, so a small amount of exchange dialect knowledge leaks into the shared type layer. - `src/exchanges/okx/` is implemented, but `src/exchanges/okx_perp/` is an empty, unregistered directory. Treat OKX perpetual as not implemented. - Some tests depend on public exchange APIs and can fail because of network or upstream API changes. diff --git a/examples/custom_latency_test.rs b/examples/custom_latency_test.rs index 61c4977..6334e2a 100644 --- a/examples/custom_latency_test.rs +++ b/examples/custom_latency_test.rs @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box> { markets_metrics: markets_metrics.clone(), klines_metrics: markets_metrics.clone(), // Simplified for demo websocket_connection_time: std::time::Duration::from_millis(100), - websocket_first_message: std::time::Duration::from_millis(1000), + websocket_first_message: std::time::Duration::from_secs(1), websocket_success_rate: 1.0, tick_to_trade_latency: std::time::Duration::from_millis(50), market_impact_bps: calculate_market_impact(&markets_metrics), diff --git a/examples/okx_example.rs b/examples/okx_example.rs index d1ab349..4f8bb49 100644 --- a/examples/okx_example.rs +++ b/examples/okx_example.rs @@ -53,9 +53,7 @@ async fn main() -> Result<(), Box> { /// Create OKX configuration from environment variables or use defaults fn create_config() -> ExchangeConfig { - let testnet = env::var("OKX_TESTNET") - .map(|v| v.to_lowercase() == "true") - .unwrap_or(false); + let testnet = env::var("OKX_TESTNET").is_ok_and(|v| v.to_lowercase() == "true"); // Create config with credentials if available, otherwise use defaults let api_key = env::var("OKX_API_KEY").unwrap_or_else(|_| "your_api_key".to_string()); diff --git a/src/core/kernel/rest.rs b/src/core/kernel/rest.rs index cd4d69d..3b7a7ea 100644 --- a/src/core/kernel/rest.rs +++ b/src/core/kernel/rest.rs @@ -354,23 +354,27 @@ impl ReqwestRest { ExchangeError::DeserializationError(format!("Failed to parse JSON response: {}", e)) }) } else { - Err(ExchangeError::ApiError { - code: status.as_u16() as i32, - message: response_text, - }) + let status_code = status.as_u16(); + match status_code { + 401 | 403 => Err(ExchangeError::AuthError(response_text)), + 429 => Err(ExchangeError::RateLimitExceeded(response_text)), + 500..=599 => Err(ExchangeError::ServerError(response_text)), + _ => Err(ExchangeError::ApiError { + code: status_code as i32, + message: response_text, + }), + } } } - /// Make a request with the given parameters - #[instrument(skip(self, body), fields(exchange = %self.config.exchange_name, method = %method, endpoint = %endpoint))] - async fn make_request( + fn build_request( &self, - method: Method, + method: &Method, endpoint: &str, query_params: &[(&str, &str)], body: &[u8], authenticated: bool, - ) -> Result { + ) -> Result { let url = self.build_url(endpoint); let mut request = self.client.request(method.clone(), &url); @@ -416,6 +420,19 @@ impl ReqwestRest { .body(body.to_vec()); } + Ok(request) + } + + async fn send_once( + &self, + method: &Method, + endpoint: &str, + query_params: &[(&str, &str)], + body: &[u8], + authenticated: bool, + ) -> Result { + let request = self.build_request(method, endpoint, query_params, body, authenticated)?; + let response = request .send() .await @@ -423,6 +440,48 @@ impl ReqwestRest { self.handle_response(response).await } + + fn can_retry_method(method: &Method) -> bool { + *method == Method::GET || *method == Method::DELETE + } + + /// Make a request with the given parameters + #[instrument(skip(self, body), fields(exchange = %self.config.exchange_name, method = %method, endpoint = %endpoint))] + async fn make_request( + &self, + method: Method, + endpoint: &str, + query_params: &[(&str, &str)], + body: &[u8], + authenticated: bool, + ) -> Result { + let mut retries = 0; + + loop { + match self + .send_once(&method, endpoint, query_params, body, authenticated) + .await + { + Ok(value) => return Ok(value), + Err(error) + if retries < self.config.max_retries + && error.is_retryable() + && Self::can_retry_method(&method) => + { + retries += 1; + trace!( + exchange = %self.config.exchange_name, + endpoint = %endpoint, + retry = retries, + max_retries = self.config.max_retries, + error = %error, + "Retrying REST request" + ); + } + Err(error) => return Err(error), + } + } + } } #[async_trait] @@ -626,3 +685,104 @@ impl Signer for NoopSigner { Ok((headers, signed_params)) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpListener; + + #[tokio::test] + async fn retries_retryable_api_errors_until_success() { + let attempts = Arc::new(AtomicUsize::new(0)); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let server_attempts = attempts.clone(); + + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + let current_attempt = server_attempts.fetch_add(1, Ordering::SeqCst) + 1; + let mut buffer = [0; 1024]; + let _ = stream.read(&mut buffer).await; + + let response = if current_attempt == 1 { + let body = r#"{"error":"boom"}"#; + format!( + "HTTP/1.1 500 Internal Server Error\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ) + } else { + let body = r#"{"ok":true}"#; + format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ) + }; + + let _ = stream.write_all(response.as_bytes()).await; + } + }); + + let client = RestClientBuilder::new( + RestClientConfig::new(format!("http://{}", address), "test".to_string()) + .with_max_retries(2), + ) + .build() + .unwrap(); + + let response = client.get("/retry", &[], false).await.unwrap(); + + assert_eq!(response, serde_json::json!({ "ok": true })); + assert_eq!(attempts.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn does_not_retry_post_api_errors() { + let attempts = Arc::new(AtomicUsize::new(0)); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + let server_attempts = attempts.clone(); + + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + server_attempts.fetch_add(1, Ordering::SeqCst); + let mut buffer = [0; 1024]; + let _ = stream.read(&mut buffer).await; + let body = r#"{"error":"boom"}"#; + let response = format!( + "HTTP/1.1 500 Internal Server Error\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()).await; + } + }); + + let client = RestClientBuilder::new( + RestClientConfig::new(format!("http://{}", address), "test".to_string()) + .with_max_retries(2), + ) + .build() + .unwrap(); + + let result = client + .post( + "/orders", + &serde_json::json!({ "symbol": "BTCUSDT" }), + false, + ) + .await; + + assert!(result.is_err()); + assert_eq!(attempts.load(Ordering::SeqCst), 1); + } +} diff --git a/src/core/kernel/signer.rs b/src/core/kernel/signer.rs index a2ededc..81b0604 100644 --- a/src/core/kernel/signer.rs +++ b/src/core/kernel/signer.rs @@ -27,7 +27,7 @@ pub trait Signer: Send + Sync { /// * `timestamp` - Request timestamp in milliseconds /// /// # Returns - /// Tuple of (headers, signed_query_params) to include in the request + /// Tuple of (headers, `signed_query_params`) to include in the request fn sign_request( &self, method: &str, diff --git a/src/exchanges/bybit_perp/conversions.rs b/src/exchanges/bybit_perp/conversions.rs index 3db3311..aca0c2b 100644 --- a/src/exchanges/bybit_perp/conversions.rs +++ b/src/exchanges/bybit_perp/conversions.rs @@ -16,8 +16,7 @@ pub fn convert_bybit_perp_market(bybit_perp_market: bybit_perp_types::BybitPerpM .lot_size_filter .qty_step .parse::() - .map(|p| (-p.log10()).ceil() as i32) - .unwrap_or(3); + .map_or(3, |p| (-p.log10()).ceil() as i32); Market { symbol: Symbol::new(bybit_perp_market.base_coin, bybit_perp_market.quote_coin) diff --git a/src/exchanges/hyperliquid/conversions.rs b/src/exchanges/hyperliquid/conversions.rs index a90adde..a8dc7ad 100644 --- a/src/exchanges/hyperliquid/conversions.rs +++ b/src/exchanges/hyperliquid/conversions.rs @@ -181,7 +181,7 @@ pub fn convert_asset_to_market(asset: AssetInfo) -> Market { pub fn convert_user_state_to_balances(user_state: &UserState) -> Vec { let balances = vec![Balance { asset: "USD".to_string(), - free: conversion::string_to_quantity(&user_state.margin_summary.account_value.to_string()), + free: conversion::string_to_quantity(&user_state.margin_summary.account_value), locked: conversion::string_to_quantity("0"), }]; diff --git a/src/exchanges/okx/connector/mod.rs b/src/exchanges/okx/connector/mod.rs index ec36692..589a8de 100644 --- a/src/exchanges/okx/connector/mod.rs +++ b/src/exchanges/okx/connector/mod.rs @@ -47,7 +47,7 @@ impl OkxConnector { } } -/// Implement AccountInfo trait for the OKX connector +/// Implement `AccountInfo` trait for the OKX connector #[async_trait] impl AccountInfo for OkxConnector { async fn get_account_balance(&self) -> Result, ExchangeError> { @@ -59,7 +59,7 @@ impl AccountInfo for OkxCon } } -/// Implement MarketDataSource trait for the OKX connector +/// Implement `MarketDataSource` trait for the OKX connector #[async_trait] impl MarketDataSource for OkxConnector { async fn get_markets(&self) -> Result, ExchangeError> { @@ -95,7 +95,7 @@ impl MarketDataSource for O } } -/// Implement OrderPlacer trait for the OKX connector +/// Implement `OrderPlacer` trait for the OKX connector #[async_trait] impl OrderPlacer for OkxConnector { async fn place_order(&self, order: OrderRequest) -> Result { diff --git a/src/utils/exchange_factory.rs b/src/utils/exchange_factory.rs index bf89bb9..222b7e7 100644 --- a/src/utils/exchange_factory.rs +++ b/src/utils/exchange_factory.rs @@ -1,6 +1,6 @@ use crate::core::{config::ExchangeConfig, traits::MarketDataSource}; use crate::exchanges::backpack; -use crate::exchanges::{bybit::BybitConnector, hyperliquid, paradex}; +use crate::exchanges::{hyperliquid, okx, paradex}; /// Configuration for an exchange in the latency test #[derive(Debug, Clone)] @@ -22,6 +22,7 @@ pub enum ExchangeType { BybitPerp, Backpack, Hyperliquid, + Okx, Paradex, } @@ -34,6 +35,7 @@ impl std::fmt::Display for ExchangeType { Self::BybitPerp => write!(f, "Bybit Perp"), Self::Backpack => write!(f, "Backpack"), Self::Hyperliquid => write!(f, "Hyperliquid"), + Self::Okx => write!(f, "OKX"), Self::Paradex => write!(f, "Paradex"), } } @@ -64,7 +66,7 @@ impl ExchangeFactory { } ExchangeType::Bybit => { let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); - Ok(Box::new(BybitConnector::for_factory(cfg))) + Ok(Box::new(crate::exchanges::bybit::build_connector(cfg)?)) } ExchangeType::BybitPerp => { let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); @@ -87,6 +89,10 @@ impl ExchangeFactory { let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); Ok(Box::new(hyperliquid::build_hyperliquid_connector(cfg)?)) } + ExchangeType::Okx => { + let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); + Ok(Box::new(okx::build_connector(cfg)?)) + } ExchangeType::Paradex => { let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); match paradex::build_connector(cfg) { @@ -212,6 +218,7 @@ impl ExchangeFactory { ExchangeType::BybitPerp, ExchangeType::Backpack, ExchangeType::Hyperliquid, + ExchangeType::Okx, ExchangeType::Paradex, ] } @@ -244,6 +251,7 @@ impl ExchangeTestConfigBuilder { let symbols = match exchange_type { ExchangeType::Hyperliquid => vec!["BTC".to_string(), "ETH".to_string()], ExchangeType::Backpack => vec!["SOL_USDC".to_string(), "BTC_USDC".to_string()], + ExchangeType::Okx => vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()], ExchangeType::Paradex => vec!["BTC-USD".to_string(), "ETH-USD".to_string()], _ => vec!["BTCUSDT".to_string(), "ETHUSDT".to_string()], }; @@ -277,3 +285,45 @@ impl ExchangeTestConfigBuilder { self.configs } } + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpListener; + + #[test] + fn available_exchanges_include_okx() { + assert!(ExchangeFactory::get_available_exchanges().contains(&ExchangeType::Okx)); + } + + #[tokio::test] + async fn bybit_factory_uses_builder_base_url() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let Ok((mut stream, _)) = listener.accept().await else { + return; + }; + let mut buffer = [0; 2048]; + let _ = stream.read(&mut buffer).await; + let body = r#"{"retCode":0,"retMsg":"OK","result":{"list":[{"symbol":"BTCUSDT","status":"Trading","baseCoin":"BTC","quoteCoin":"USDT","basePrecision":8,"quotePrecision":8,"minOrderQty":"0.0001","maxOrderQty":"100","qtyStep":"0.0001","minPrice":"1","maxPrice":"1000000","tickSize":"0.1","isSpotTradingAllowed":true,"isMarginTradingAllowed":false}]}}"#; + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(response.as_bytes()).await; + }); + + let config = ExchangeConfig::read_only().base_url(format!("http://{}", address)); + let connector = + ExchangeFactory::create_connector(&ExchangeType::Bybit, Some(config), false).unwrap(); + + let markets = connector.get_markets().await.unwrap(); + + assert_eq!(markets.len(), 1); + assert_eq!(markets[0].symbol.to_string(), "BTCUSDT"); + } +}