From b65e0065a6cb837c20ac04eec2cf57ef872fa943 Mon Sep 17 00:00:00 2001 From: Tunay Engin Date: Mon, 18 May 2026 23:08:41 +0300 Subject: [PATCH 1/3] chore: remove async-trait, base64, smallvec, thiserror dependencies Replace external crates with native std equivalents: - async-trait (rustapi-jobs): use native async fn in traits + Pin> for dyn-compatible traits (JobBackend, JobHandler) - base64 (rustapi-core, rustapi-ws): inline 12-line RFC 4648 encode fn; RFC 6455 WebSocket accept key test vector confirmed - smallvec (rustapi-core): replace PathParams inner storage with Vec<(String,String)> - thiserror (10 crates + workspace): manual Display/Error/From impls across WebSocketError, AuthError, ViewError, DieselPoolError, AuditError, ExportError, TokenError, ReplayClientError, PoolError, SessionError, ReplayStoreError, ToonError, JobError All 37 workspace test suites pass (0 failures). --- Cargo.lock | 66 +---- Cargo.toml | 1 - crates/cargo-rustapi/Cargo.toml | 7 +- crates/cargo-rustapi/src/commands/doctor.rs | 116 ++++---- crates/cargo-rustapi/src/commands/watch.rs | 258 ++++++++++++++---- crates/rustapi-core/Cargo.toml | 5 - crates/rustapi-core/src/app.rs | 20 +- crates/rustapi-core/src/path_params.rs | 33 +-- crates/rustapi-core/src/replay/store.rs | 26 +- crates/rustapi-extras/Cargo.toml | 1 - crates/rustapi-extras/src/audit/store.rs | 32 ++- crates/rustapi-extras/src/diesel/mod.rs | 21 +- crates/rustapi-extras/src/insight/export.rs | 42 ++- crates/rustapi-extras/src/oauth2/tokens.rs | 39 ++- crates/rustapi-extras/src/replay/client.rs | 29 +- crates/rustapi-extras/src/session/mod.rs | 26 +- crates/rustapi-extras/src/sqlx/mod.rs | 34 ++- crates/rustapi-jobs/Cargo.toml | 6 +- crates/rustapi-jobs/src/backend.rs | 26 +- crates/rustapi-jobs/src/backend/memory.rs | 84 +++--- crates/rustapi-jobs/src/backend/postgres.rs | 117 ++++---- crates/rustapi-jobs/src/backend/redis.rs | 97 ++++--- crates/rustapi-jobs/src/error.rs | 43 ++- crates/rustapi-jobs/src/job.rs | 32 ++- crates/rustapi-jobs/src/queue.rs | 2 - .../tests/job_persistence_test.rs | 3 - crates/rustapi-jobs/tests/repro_blocking.rs | 2 - crates/rustapi-testing/Cargo.toml | 11 +- crates/rustapi-toon/Cargo.toml | 2 - crates/rustapi-toon/src/error.rs | 24 +- crates/rustapi-validate/Cargo.toml | 3 - crates/rustapi-view/Cargo.toml | 1 - crates/rustapi-view/src/error.rs | 57 ++-- crates/rustapi-ws/Cargo.toml | 2 - crates/rustapi-ws/src/auth.rs | 30 +- crates/rustapi-ws/src/error.rs | 66 +++-- crates/rustapi-ws/src/upgrade.rs | 20 +- 37 files changed, 850 insertions(+), 534 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a247391e..a20d0b75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,13 +398,11 @@ dependencies = [ "serde_json", "serde_yaml", "tempfile", - "thiserror 2.0.18", "tokio", "toml 1.1.2+spec-1.1.0", "toml_edit 0.22.27", "tracing", "tracing-subscriber", - "walkdir", ] [[package]] @@ -598,16 +596,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.1" @@ -1607,12 +1595,9 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", - "system-configuration", "tokio", - "tower-layer", "tower-service", "tracing", - "windows-registry", ] [[package]] @@ -3176,7 +3161,6 @@ version = "0.1.478" dependencies = [ "async-stream", "async-trait", - "base64", "brotli", "bytes", "cookie", @@ -3207,9 +3191,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "simd-json", - "smallvec", "sqlx", - "thiserror 2.0.18", "tokio", "tower-service", "tracing", @@ -3254,7 +3236,6 @@ dependencies = [ "sha2 0.11.0", "sqlx", "tempfile", - "thiserror 2.0.18", "tokio", "tracing", "tracing-opentelemetry", @@ -3279,13 +3260,11 @@ version = "0.1.478" dependencies = [ "async-trait", "chrono", - "futures-util", "proptest", "redis", "serde", "serde_json", "sqlx", - "thiserror 2.0.18", "tokio", "tracing", "uuid", @@ -3346,7 +3325,6 @@ name = "rustapi-testing" version = "0.1.478" dependencies = [ "bytes", - "futures-util", "http", "http-body-util", "hyper", @@ -3356,9 +3334,7 @@ dependencies = [ "rustapi-core", "serde", "serde_json", - "thiserror 2.0.18", "tokio", - "tracing", ] [[package]] @@ -3373,7 +3349,6 @@ dependencies = [ "rustapi-openapi", "serde", "serde_json", - "thiserror 2.0.18", "tokio", "toon-format", "tracing", @@ -3392,7 +3367,6 @@ dependencies = [ "rustapi-macros", "serde", "serde_json", - "thiserror 2.0.18", "tokio", ] @@ -3408,7 +3382,6 @@ dependencies = [ "serde", "serde_json", "tera", - "thiserror 2.0.18", "tokio", "tracing", ] @@ -3418,7 +3391,6 @@ name = "rustapi-ws" version = "0.1.478" dependencies = [ "async-trait", - "base64", "bytes", "futures-util", "http", @@ -3432,7 +3404,6 @@ dependencies = [ "serde", "serde_json", "sha1", - "thiserror 2.0.18", "tokio", "tokio-tungstenite", "tracing", @@ -3501,7 +3472,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" dependencies = [ - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "jni", "log", @@ -3612,7 +3583,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ "bitflags 2.11.1", - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -4183,27 +4154,6 @@ dependencies = [ "syn", ] -[[package]] -name = "system-configuration" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" -dependencies = [ - "bitflags 2.11.1", - "core-foundation 0.9.4", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tempfile" version = "3.27.0" @@ -4359,7 +4309,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -5206,17 +5155,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-registry" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" -dependencies = [ - "windows-link", - "windows-result", - "windows-strings", -] - [[package]] name = "windows-result" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index e58cadcc..34ce4c51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ serde_json = "1.0" tower-service = "0.3" # Utilities -thiserror = "2.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } futures-util = "0.3" diff --git a/crates/cargo-rustapi/Cargo.toml b/crates/cargo-rustapi/Cargo.toml index be1d037f..76b5eb05 100644 --- a/crates/cargo-rustapi/Cargo.toml +++ b/crates/cargo-rustapi/Cargo.toml @@ -24,10 +24,9 @@ indicatif = { workspace = true } console = { workspace = true } # File system -walkdir = "2.5" toml_edit = "0.22" -notify = "8.0" -notify-debouncer-mini = "0.7" +notify = { version = "8.0", optional = true } +notify-debouncer-mini = { version = "0.7", optional = true } # Async tokio = { workspace = true, features = ["process", "fs", "macros", "rt-multi-thread", "time", "signal", "sync"] } @@ -42,7 +41,6 @@ toml = "1.1" reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false, optional = true } # Utilities -thiserror = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } anyhow = "1.0" @@ -54,5 +52,6 @@ predicates = "3.1" [features] default = ["remote-spec", "replay"] +native-watch = ["dep:notify", "dep:notify-debouncer-mini"] remote-spec = ["dep:reqwest"] replay = ["dep:reqwest"] diff --git a/crates/cargo-rustapi/src/commands/doctor.rs b/crates/cargo-rustapi/src/commands/doctor.rs index 11dc0edf..21c2c321 100644 --- a/crates/cargo-rustapi/src/commands/doctor.rs +++ b/crates/cargo-rustapi/src/commands/doctor.rs @@ -6,7 +6,6 @@ use console::{style, Emoji}; use std::fs; use std::path::{Path, PathBuf}; use tokio::process::Command; -use walkdir::WalkDir; #[derive(Args, Debug, Clone)] pub struct DoctorArgs { @@ -382,61 +381,86 @@ fn build_project_checks(workspace_root: &Path) -> Result> { fn scan_workspace_signals(workspace_root: &Path) -> Result { let mut signals = WorkspaceSignals::default(); - for entry in WalkDir::new(workspace_root) - .into_iter() - .filter_entry(|entry| should_scan(entry.path())) - { + scan_workspace_dir(workspace_root, &mut signals)?; + + Ok(signals) +} + +fn scan_workspace_dir(dir: &Path, signals: &mut WorkspaceSignals) -> Result<()> { + if !should_scan(dir) { + return Ok(()); + } + + for entry in fs::read_dir(dir).with_context(|| format!("failed to read {}", dir.display()))? { let entry = entry?; - if !entry.file_type().is_file() || !is_scannable_file(entry.path()) { + let path = entry.path(); + + if !should_scan(&path) { + continue; + } + + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(_) => continue, + }; + + if file_type.is_dir() { + scan_workspace_dir(&path, signals)?; continue; } - let contents = match fs::read_to_string(entry.path()) { + if !file_type.is_file() || !is_scannable_file(&path) { + continue; + } + + let contents = match fs::read_to_string(&path) { Ok(contents) => contents, Err(_) => continue, }; - signals.production_defaults |= contains_any( - &contents, - &[".production_defaults(", ".production_defaults_with_config("], - ); - signals.health_endpoints |= contains_any( - &contents, - &[ - ".health_endpoints(", - ".health_endpoint_config(", - "HealthEndpointConfig", - ], - ); - signals.health_checks |= contents.contains(".with_health_check("); - signals.request_id |= contents.contains("RequestIdLayer"); - signals.tracing |= contains_any(&contents, &["TracingLayer", "tracing_subscriber"]); - signals.shutdown |= contents.contains("run_with_shutdown("); - signals.shutdown_hooks |= contents.contains(".on_shutdown("); - signals.structured_logging |= contains_any( - &contents, - &["StructuredLoggingLayer", "structured_logging("], - ); - signals.otel |= contains_any(&contents, &["OtelLayer", "otel("]); - signals.rate_limit |= contains_any(&contents, &["RateLimitLayer", "rate_limit("]); - signals.security_headers |= - contains_any(&contents, &["SecurityHeadersLayer", "security_headers("]); - signals.timeout |= contains_any(&contents, &["TimeoutLayer", "timeout("]); - signals.cors |= contains_any(&contents, &["CorsLayer", "cors("]); - signals.body_limit |= contains_any(&contents, &["BodyLimitLayer", ".body_limit("]); - signals.env_production |= contains_any( - &contents, - &[ - "RUSTAPI_ENV=production", - "RUSTAPI_ENV: production", - "RUSTAPI_ENV = \"production\"", - "RUSTAPI_ENV','production", - "RUSTAPI_ENV\", \"production\"", - ], - ); + apply_workspace_signals(signals, &contents); } - Ok(signals) + Ok(()) +} + +fn apply_workspace_signals(signals: &mut WorkspaceSignals, contents: &str) { + signals.production_defaults |= contains_any( + contents, + &[".production_defaults(", ".production_defaults_with_config("], + ); + signals.health_endpoints |= contains_any( + contents, + &[ + ".health_endpoints(", + ".health_endpoint_config(", + "HealthEndpointConfig", + ], + ); + signals.health_checks |= contents.contains(".with_health_check("); + signals.request_id |= contents.contains("RequestIdLayer"); + signals.tracing |= contains_any(contents, &["TracingLayer", "tracing_subscriber"]); + signals.shutdown |= contents.contains("run_with_shutdown("); + signals.shutdown_hooks |= contents.contains(".on_shutdown("); + signals.structured_logging |= + contains_any(contents, &["StructuredLoggingLayer", "structured_logging("]); + signals.otel |= contains_any(contents, &["OtelLayer", "otel("]); + signals.rate_limit |= contains_any(contents, &["RateLimitLayer", "rate_limit("]); + signals.security_headers |= + contains_any(contents, &["SecurityHeadersLayer", "security_headers("]); + signals.timeout |= contains_any(contents, &["TimeoutLayer", "timeout("]); + signals.cors |= contains_any(contents, &["CorsLayer", "cors("]); + signals.body_limit |= contains_any(contents, &["BodyLimitLayer", ".body_limit("]); + signals.env_production |= contains_any( + contents, + &[ + "RUSTAPI_ENV=production", + "RUSTAPI_ENV: production", + "RUSTAPI_ENV = \"production\"", + "RUSTAPI_ENV','production", + "RUSTAPI_ENV\", \"production\"", + ], + ); } fn find_workspace_root(start: &Path) -> Option { diff --git a/crates/cargo-rustapi/src/commands/watch.rs b/crates/cargo-rustapi/src/commands/watch.rs index 78e45bb7..3b148a4d 100644 --- a/crates/cargo-rustapi/src/commands/watch.rs +++ b/crates/cargo-rustapi/src/commands/watch.rs @@ -1,16 +1,21 @@ -//! Watch command for development with native hot-reload +//! Watch command for development with hot-reload //! -//! Uses the `notify` crate for zero-dependency filesystem watching. +//! Uses a std-only polling watcher by default and the `notify` crate when the +//! `native-watch` feature is enabled. //! Detects file changes, rebuilds the project, and restarts the server //! automatically — no external tools (cargo-watch) required. use anyhow::{Context, Result}; use clap::Args; use console::{style, Emoji}; +#[cfg(feature = "native-watch")] use notify_debouncer_mini::{new_debouncer, DebouncedEventKind}; -use std::path::PathBuf; +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; +#[cfg(feature = "native-watch")] use std::sync::mpsc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use tokio::process::{Child, Command}; use tokio::sync::mpsc as tokio_mpsc; @@ -55,7 +60,9 @@ pub struct WatchArgs { #[arg(long)] pub no_restart_on_fail: bool, - /// Poll for changes instead of using filesystem events + /// Poll for changes instead of using native filesystem events. + /// + /// This is the default when cargo-rustapi is built without the `native-watch` feature. #[arg(long)] pub poll: bool, @@ -91,6 +98,158 @@ fn is_ignored(path: &std::path::Path, ignore_paths: &[String]) -> bool { }) } +fn collect_watch_snapshot( + watch_paths: &[String], + ignore_paths: &[String], + extensions: &[String], +) -> Result<(usize, HashMap)> { + let mut roots_watched = 0; + let mut snapshot = HashMap::new(); + + for watch_path in watch_paths { + let path = PathBuf::from(watch_path); + if path.exists() { + roots_watched += 1; + collect_path_snapshot(&path, ignore_paths, extensions, &mut snapshot)?; + } + } + + Ok((roots_watched, snapshot)) +} + +fn collect_path_snapshot( + path: &Path, + ignore_paths: &[String], + extensions: &[String], + snapshot: &mut HashMap, +) -> Result<()> { + if is_ignored(path, ignore_paths) { + return Ok(()); + } + + let metadata = match fs::metadata(path) { + Ok(metadata) => metadata, + Err(_) => return Ok(()), + }; + + if metadata.is_dir() { + for entry in + fs::read_dir(path).with_context(|| format!("Failed to scan {}", path.display()))? + { + let entry = match entry { + Ok(entry) => entry, + Err(_) => continue, + }; + collect_path_snapshot(&entry.path(), ignore_paths, extensions, snapshot)?; + } + } else if metadata.is_file() && is_watched_extension(path, extensions) { + let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH); + snapshot.insert(path.to_path_buf(), modified); + } + + Ok(()) +} + +fn setup_polling_watcher( + args: &WatchArgs, + extensions: &[String], + debounce_duration: Duration, +) -> Result> { + let (paths_watched, mut snapshot) = + collect_watch_snapshot(&args.watch_paths, &args.ignore_paths, extensions)?; + + if paths_watched == 0 { + anyhow::bail!( + "No valid paths to watch. Ensure at least one of [{}] exists.", + args.watch_paths.join(", ") + ); + } + + let watch_paths = args.watch_paths.clone(); + let ignore_paths = args.ignore_paths.clone(); + let extensions = extensions.to_vec(); + let poll_interval = debounce_duration.max(Duration::from_millis(100)); + let (async_tx, async_rx) = tokio_mpsc::channel::<()>(1); + + std::thread::spawn(move || loop { + if async_tx.is_closed() { + break; + } + + std::thread::sleep(poll_interval); + + let Ok((_, next_snapshot)) = + collect_watch_snapshot(&watch_paths, &ignore_paths, &extensions) + else { + continue; + }; + + if next_snapshot != snapshot { + snapshot = next_snapshot; + if async_tx.blocking_send(()).is_err() { + break; + } + } + }); + + Ok(async_rx) +} + +#[cfg(feature = "native-watch")] +fn setup_native_watcher( + args: &WatchArgs, + extensions: &[String], + debounce_duration: Duration, +) -> Result<(tokio_mpsc::Receiver<()>, Box)> { + let (tx, rx) = mpsc::channel(); + let mut debouncer = + new_debouncer(debounce_duration, tx).context("Failed to create file watcher")?; + + let mut paths_watched = 0; + for watch_path in &args.watch_paths { + let path = PathBuf::from(watch_path); + if path.exists() { + debouncer + .watcher() + .watch(&path, notify::RecursiveMode::Recursive) + .with_context(|| format!("Failed to watch path: {watch_path}"))?; + paths_watched += 1; + } + } + + if paths_watched == 0 { + anyhow::bail!( + "No valid paths to watch. Ensure at least one of [{}] exists.", + args.watch_paths.join(", ") + ); + } + + let (async_tx, async_rx) = tokio_mpsc::channel::<()>(1); + let ignore_paths = args.ignore_paths.clone(); + let ext_clone = extensions.to_vec(); + std::thread::spawn(move || { + for result in rx { + match result { + Ok(events) => { + let has_relevant = events.iter().any(|event| { + event.kind == DebouncedEventKind::Any + && !is_ignored(&event.path, &ignore_paths) + && is_watched_extension(&event.path, &ext_clone) + }); + if has_relevant && async_tx.blocking_send(()).is_err() { + break; + } + } + Err(e) => { + tracing::warn!("File watcher error: {e}"); + } + } + } + }); + + Ok((async_rx, Box::new(debouncer))) +} + /// Build the project, returning (success, duration, error_output) async fn build_project(args: &WatchArgs) -> (bool, Duration, String) { let start = Instant::now(); @@ -218,55 +377,30 @@ pub async fn watch(args: WatchArgs) -> Result<()> { println!(); } - // ─── Set up native file watcher via notify ────────────────────────── - let (tx, rx) = mpsc::channel(); + // ─── Set up file watcher ──────────────────────────────────────────── let debounce_duration = Duration::from_millis(args.delay as u64); - - let mut debouncer = - new_debouncer(debounce_duration, tx).context("Failed to create file watcher")?; - - let mut paths_watched = 0; - for watch_path in &args.watch_paths { - let path = PathBuf::from(watch_path); - if path.exists() { - debouncer - .watcher() - .watch(&path, notify::RecursiveMode::Recursive) - .with_context(|| format!("Failed to watch path: {watch_path}"))?; - paths_watched += 1; + let (mut async_rx, _watcher_guard): (tokio_mpsc::Receiver<()>, Option>) = { + #[cfg(feature = "native-watch")] + { + if !args.poll { + let (rx, guard) = setup_native_watcher(&args, &extensions, debounce_duration)?; + (rx, Some(guard)) + } else { + ( + setup_polling_watcher(&args, &extensions, debounce_duration)?, + None, + ) + } } - } - - if paths_watched == 0 { - anyhow::bail!( - "No valid paths to watch. Ensure at least one of [{}] exists.", - args.watch_paths.join(", ") - ); - } - // Bridge sync notify channel → async tokio channel - let (async_tx, mut async_rx) = tokio_mpsc::channel::<()>(1); - let ignore_paths = args.ignore_paths.clone(); - let ext_clone = extensions.clone(); - std::thread::spawn(move || { - for result in rx { - match result { - Ok(events) => { - let has_relevant = events.iter().any(|event| { - event.kind == DebouncedEventKind::Any - && !is_ignored(&event.path, &ignore_paths) - && is_watched_extension(&event.path, &ext_clone) - }); - if has_relevant { - let _ = async_tx.blocking_send(()); - } - } - Err(e) => { - tracing::warn!("File watcher error: {e}"); - } - } + #[cfg(not(feature = "native-watch"))] + { + ( + setup_polling_watcher(&args, &extensions, debounce_duration)?, + None, + ) } - }); + }; // ─── Initial build & start ────────────────────────────────────────── if !args.quiet { @@ -452,6 +586,30 @@ mod tests { assert!(!is_ignored(std::path::Path::new("src/main.rs"), &ignore)); } + #[test] + fn test_collect_watch_snapshot_skips_ignored_dirs() { + let temp = tempfile::tempdir().unwrap(); + let src = temp.path().join("src"); + let target = temp.path().join("target"); + fs::create_dir_all(&src).unwrap(); + fs::create_dir_all(&target).unwrap(); + fs::write(src.join("main.rs"), "fn main() {}").unwrap(); + fs::write(target.join("generated.rs"), "fn ignored() {}").unwrap(); + + let watch_paths = vec![temp.path().to_string_lossy().to_string()]; + let ignore_paths = vec!["target".to_string()]; + let extensions = vec!["rs".to_string()]; + + let (roots, snapshot) = + collect_watch_snapshot(&watch_paths, &ignore_paths, &extensions).unwrap(); + + assert_eq!(roots, 1); + assert!(snapshot.keys().any(|path| path.ends_with("src/main.rs"))); + assert!(!snapshot + .keys() + .any(|path| path.ends_with("target/generated.rs"))); + } + #[test] fn test_default_args() { let args = WatchArgs { diff --git a/crates/rustapi-core/Cargo.toml b/crates/rustapi-core/Cargo.toml index 9fc67f00..75307283 100644 --- a/crates/rustapi-core/Cargo.toml +++ b/crates/rustapi-core/Cargo.toml @@ -32,19 +32,14 @@ serde_json = { workspace = true } serde_urlencoded = "0.7" simd-json = { version = "0.17", optional = true } -# Stack-allocated collections for performance -smallvec = "1.13" - # Middleware tower-service = { workspace = true } # Utilities -thiserror = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } linkme = { workspace = true } uuid = { workspace = true } -base64 = "0.22" # Compression (optional) flate2 = { version = "1.0", optional = true } diff --git a/crates/rustapi-core/src/app.rs b/crates/rustapi-core/src/app.rs index 7e5767a3..81f93ee7 100644 --- a/crates/rustapi-core/src/app.rs +++ b/crates/rustapi-core/src/app.rs @@ -989,9 +989,25 @@ impl RustApi { description: Option<&str>, ) -> Self { use crate::router::MethodRouter; - use base64::{engine::general_purpose::STANDARD, Engine}; use std::collections::HashMap; + #[inline] + fn base64_encode(input: &[u8]) -> String { + const ALPHA: &[u8; 64] = + b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + let mut out = String::with_capacity((input.len() + 2) / 3 * 4); + for chunk in input.chunks(3) { + let b0 = chunk[0] as usize; + let b1 = if chunk.len() > 1 { chunk[1] as usize } else { 0 }; + let b2 = if chunk.len() > 2 { chunk[2] as usize } else { 0 }; + out.push(ALPHA[b0 >> 2] as char); + out.push(ALPHA[((b0 & 3) << 4) | (b1 >> 4)] as char); + out.push(if chunk.len() > 1 { ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char } else { '=' }); + out.push(if chunk.len() > 2 { ALPHA[b2 & 63] as char } else { '=' }); + } + out + } + // Update spec info self.openapi_spec.info.title = title.to_string(); self.openapi_spec.info.version = version.to_string(); @@ -1004,7 +1020,7 @@ impl RustApi { // Create expected auth header value let credentials = format!("{}:{}", username, password); - let encoded = STANDARD.encode(credentials.as_bytes()); + let encoded = base64_encode(credentials.as_bytes()); let expected_auth = format!("Basic {}", encoded); // Clone values for closures diff --git a/crates/rustapi-core/src/path_params.rs b/crates/rustapi-core/src/path_params.rs index fdb324df..e71bd0e3 100644 --- a/crates/rustapi-core/src/path_params.rs +++ b/crates/rustapi-core/src/path_params.rs @@ -1,39 +1,25 @@ -//! Path parameter types with optimized storage -//! -//! This module provides efficient path parameter storage using stack allocation -//! for the common case of having 4 or fewer parameters. +//! Path parameter types. -use smallvec::SmallVec; use std::collections::HashMap; -/// Maximum number of path parameters to store on the stack. -/// Most routes have 1-4 parameters, so this covers the majority of cases -/// without heap allocation. -pub const STACK_PARAMS_CAPACITY: usize = 4; - -/// Path parameters with stack-optimized storage. -/// -/// Uses `SmallVec` to store up to 4 key-value pairs on the stack, -/// avoiding heap allocation for the common case. +/// Path parameters collection. #[derive(Debug, Clone, Default)] pub struct PathParams { - inner: SmallVec<[(String, String); STACK_PARAMS_CAPACITY]>, + inner: Vec<(String, String)>, } impl PathParams { /// Create a new empty path params collection. #[inline] pub fn new() -> Self { - Self { - inner: SmallVec::new(), - } + Self { inner: Vec::new() } } /// Create path params with pre-allocated capacity. #[inline] pub fn with_capacity(capacity: usize) -> Self { Self { - inner: SmallVec::with_capacity(capacity), + inner: Vec::with_capacity(capacity), } } @@ -126,7 +112,7 @@ mod tests { use super::*; #[test] - fn test_small_params_on_stack() { + fn test_small_params() { let mut params = PathParams::new(); params.insert("id".to_string(), "123".to_string()); params.insert("name".to_string(), "test".to_string()); @@ -134,21 +120,16 @@ mod tests { assert_eq!(params.get("id"), Some(&"123".to_string())); assert_eq!(params.get("name"), Some(&"test".to_string())); assert_eq!(params.len(), 2); - - // Should be on stack (not spilled) - assert!(!params.inner.spilled()); } #[test] - fn test_many_params_spill_to_heap() { + fn test_many_params() { let mut params = PathParams::new(); for i in 0..10 { params.insert(format!("key{}", i), format!("value{}", i)); } assert_eq!(params.len(), 10); - // Should have spilled to heap - assert!(params.inner.spilled()); } #[test] diff --git a/crates/rustapi-core/src/replay/store.rs b/crates/rustapi-core/src/replay/store.rs index 4a2a5444..82a67959 100644 --- a/crates/rustapi-core/src/replay/store.rs +++ b/crates/rustapi-core/src/replay/store.rs @@ -3,33 +3,39 @@ //! Defines the [`ReplayStore`] trait for pluggable storage backends. use async_trait::async_trait; +use std::fmt; use super::entry::ReplayEntry; /// Errors from replay store operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum ReplayStoreError { /// IO error (file, network, etc.). - #[error("IO error: {0}")] Io(String), - /// Serialization/deserialization error. - #[error("Serialization error: {0}")] Serialization(String), - /// Entry not found. - #[error("Entry not found: {0}")] NotFound(String), - /// Store is full. - #[error("Store full")] StoreFull, - /// Other error. - #[error("Store error: {0}")] Other(String), } +impl fmt::Display for ReplayStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(msg) => write!(f, "IO error: {}", msg), + Self::Serialization(msg) => write!(f, "Serialization error: {}", msg), + Self::NotFound(msg) => write!(f, "Entry not found: {}", msg), + Self::StoreFull => write!(f, "Store full"), + Self::Other(msg) => write!(f, "Store error: {}", msg), + } + } +} + +impl std::error::Error for ReplayStoreError {} + /// Convenience result type for replay store operations. pub type ReplayStoreResult = Result; diff --git a/crates/rustapi-extras/Cargo.toml b/crates/rustapi-extras/Cargo.toml index 2c0f27e3..12908622 100644 --- a/crates/rustapi-extras/Cargo.toml +++ b/crates/rustapi-extras/Cargo.toml @@ -29,7 +29,6 @@ serde = { workspace = true } serde_json = { workspace = true } # Utilities -thiserror = { workspace = true } tracing = { workspace = true } # JWT (feature-gated) diff --git a/crates/rustapi-extras/src/audit/store.rs b/crates/rustapi-extras/src/audit/store.rs index 11721cff..4828378c 100644 --- a/crates/rustapi-extras/src/audit/store.rs +++ b/crates/rustapi-extras/src/audit/store.rs @@ -2,6 +2,7 @@ use super::event::AuditEvent; use super::query::AuditQueryBuilder; +use std::fmt; use std::future::Future; use std::pin::Pin; @@ -9,37 +10,40 @@ use std::pin::Pin; pub type AuditResult = Result; /// Errors that can occur during audit operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum AuditError { /// Failed to write audit event. - #[error("Failed to write audit event: {0}")] WriteError(String), - /// Failed to read audit events. - #[error("Failed to read audit events: {0}")] ReadError(String), - /// Storage is full. - #[error("Audit storage is full")] StorageFull, - /// Event not found. - #[error("Audit event not found: {0}")] NotFound(String), - /// Serialization error. - #[error("Serialization error: {0}")] SerializationError(String), - /// IO error. - #[error("IO error: {0}")] IoError(String), - /// Configuration error. - #[error("Configuration error: {0}")] ConfigError(String), } +impl fmt::Display for AuditError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::WriteError(msg) => write!(f, "Failed to write audit event: {}", msg), + Self::ReadError(msg) => write!(f, "Failed to read audit events: {}", msg), + Self::StorageFull => write!(f, "Audit storage is full"), + Self::NotFound(msg) => write!(f, "Audit event not found: {}", msg), + Self::SerializationError(msg) => write!(f, "Serialization error: {}", msg), + Self::IoError(msg) => write!(f, "IO error: {}", msg), + Self::ConfigError(msg) => write!(f, "Configuration error: {}", msg), + } + } +} + +impl std::error::Error for AuditError {} + /// Trait for audit event storage backends. pub trait AuditStore: Send + Sync { /// Log an audit event. diff --git a/crates/rustapi-extras/src/diesel/mod.rs b/crates/rustapi-extras/src/diesel/mod.rs index c9efa8e2..cdd09c6c 100644 --- a/crates/rustapi-extras/src/diesel/mod.rs +++ b/crates/rustapi-extras/src/diesel/mod.rs @@ -26,24 +26,31 @@ use rustapi_core::health::{HealthCheck, HealthCheckBuilder, HealthStatus}; use std::sync::Arc; use std::time::Duration; -use thiserror::Error; +use std::fmt; /// Error type for Diesel pool operations -#[derive(Debug, Error)] +#[derive(Debug)] pub enum DieselPoolError { /// Configuration error - #[error("Pool configuration error: {0}")] Configuration(String), - /// Connection error - #[error("Database connection error: {0}")] Connection(String), - /// R2D2 pool error - #[error("Pool error: {0}")] Pool(String), } +impl fmt::Display for DieselPoolError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Configuration(msg) => write!(f, "Pool configuration error: {}", msg), + Self::Connection(msg) => write!(f, "Database connection error: {}", msg), + Self::Pool(msg) => write!(f, "Pool error: {}", msg), + } + } +} + +impl std::error::Error for DieselPoolError {} + /// Configuration for Diesel connection pool /// /// This struct holds all configuration options for the pool builder. diff --git a/crates/rustapi-extras/src/insight/export.rs b/crates/rustapi-extras/src/insight/export.rs index 33e797df..8ada6a4e 100644 --- a/crates/rustapi-extras/src/insight/export.rs +++ b/crates/rustapi-extras/src/insight/export.rs @@ -10,25 +10,47 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; /// Error type for export operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum ExportError { /// IO error during export. - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - + Io(std::io::Error), /// Serialization error. - #[error("Serialization error: {0}")] - Serialization(#[from] serde_json::Error), - + Serialization(serde_json::Error), /// HTTP error during webhook export. - #[error("HTTP error: {0}")] Http(String), - /// Export sink is closed or unavailable. - #[error("Export sink unavailable: {0}")] Unavailable(String), } +impl std::fmt::Display for ExportError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Io(e) => write!(f, "IO error: {}", e), + Self::Serialization(e) => write!(f, "Serialization error: {}", e), + Self::Http(msg) => write!(f, "HTTP error: {}", msg), + Self::Unavailable(msg) => write!(f, "Export sink unavailable: {}", msg), + } + } +} + +impl std::error::Error for ExportError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(e) => Some(e), + Self::Serialization(e) => Some(e), + _ => None, + } + } +} + +impl From for ExportError { + fn from(e: std::io::Error) -> Self { Self::Io(e) } +} + +impl From for ExportError { + fn from(e: serde_json::Error) -> Self { Self::Serialization(e) } +} + /// Result type for export operations. pub type ExportResult = Result; diff --git a/crates/rustapi-extras/src/oauth2/tokens.rs b/crates/rustapi-extras/src/oauth2/tokens.rs index bfe4e66e..391c368f 100644 --- a/crates/rustapi-extras/src/oauth2/tokens.rs +++ b/crates/rustapi-extras/src/oauth2/tokens.rs @@ -1,8 +1,6 @@ //! OAuth2 token types and errors use std::time::{Duration, Instant}; -use thiserror::Error; - /// OAuth2 token response from the authorization server. #[derive(Debug, Clone)] pub struct TokenResponse { @@ -103,45 +101,46 @@ impl TokenResponse { } /// Errors that can occur during OAuth2 operations. -#[derive(Debug, Error)] +#[derive(Debug)] pub enum TokenError { /// The authorization request was denied. - #[error("Authorization denied: {0}")] AuthorizationDenied(String), - /// Invalid authorization code. - #[error("Invalid authorization code")] InvalidCode, - /// Invalid CSRF state. - #[error("Invalid CSRF state - possible CSRF attack")] InvalidState, - /// Token exchange failed. - #[error("Token exchange failed: {0}")] ExchangeFailed(String), - /// Token refresh failed. - #[error("Token refresh failed: {0}")] RefreshFailed(String), - /// Network error. - #[error("Network error: {0}")] NetworkError(String), - /// Invalid response from the authorization server. - #[error("Invalid response: {0}")] InvalidResponse(String), - /// Token is expired. - #[error("Token is expired")] TokenExpired, - /// Missing required field in response. - #[error("Missing required field: {0}")] MissingField(String), } +impl std::fmt::Display for TokenError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AuthorizationDenied(msg) => write!(f, "Authorization denied: {}", msg), + Self::InvalidCode => write!(f, "Invalid authorization code"), + Self::InvalidState => write!(f, "Invalid CSRF state - possible CSRF attack"), + Self::ExchangeFailed(msg) => write!(f, "Token exchange failed: {}", msg), + Self::RefreshFailed(msg) => write!(f, "Token refresh failed: {}", msg), + Self::NetworkError(msg) => write!(f, "Network error: {}", msg), + Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg), + Self::TokenExpired => write!(f, "Token is expired"), + Self::MissingField(msg) => write!(f, "Missing required field: {}", msg), + } + } +} + +impl std::error::Error for TokenError {} + /// PKCE (Proof Key for Code Exchange) verifier. #[derive(Debug, Clone)] pub struct PkceVerifier { diff --git a/crates/rustapi-extras/src/replay/client.rs b/crates/rustapi-extras/src/replay/client.rs index c28d4e9d..fdd53ced 100644 --- a/crates/rustapi-extras/src/replay/client.rs +++ b/crates/rustapi-extras/src/replay/client.rs @@ -5,17 +5,36 @@ use std::collections::HashMap; use std::time::Duration; /// Error from replay HTTP client operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum ReplayClientError { /// HTTP request error. - #[error("HTTP error: {0}")] - Http(#[from] reqwest::Error), - + Http(reqwest::Error), /// Invalid URL. - #[error("Invalid URL: {0}")] InvalidUrl(String), } +impl std::fmt::Display for ReplayClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Http(e) => write!(f, "HTTP error: {}", e), + Self::InvalidUrl(url) => write!(f, "Invalid URL: {}", url), + } + } +} + +impl std::error::Error for ReplayClientError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Http(e) => Some(e), + _ => None, + } + } +} + +impl From for ReplayClientError { + fn from(e: reqwest::Error) -> Self { Self::Http(e) } +} + /// HTTP client for replaying recorded requests against a target server. /// /// Takes a [`ReplayEntry`] and sends the recorded request to a target URL, diff --git a/crates/rustapi-extras/src/session/mod.rs b/crates/rustapi-extras/src/session/mod.rs index 80bf475d..04a89a5e 100644 --- a/crates/rustapi-extras/src/session/mod.rs +++ b/crates/rustapi-extras/src/session/mod.rs @@ -55,6 +55,7 @@ use rustapi_openapi::{Operation, OperationModifier}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; +use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -70,29 +71,34 @@ pub type SessionResult = std::result::Result; pub type SessionData = BTreeMap; /// Errors that can occur when loading, mutating, or persisting sessions. -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum SessionError { /// The configured store failed to read data. - #[error("Failed to read session data: {0}")] Read(String), - /// The configured store failed to persist data. - #[error("Failed to persist session data: {0}")] Write(String), - /// A serialized value could not be converted to JSON. - #[error("Failed to serialize session value: {0}")] Serialize(String), - /// A JSON value could not be converted back to the requested type. - #[error("Failed to deserialize session value: {0}")] Deserialize(String), - /// A store-specific configuration error occurred. - #[error("Invalid session store configuration: {0}")] Config(String), } +impl fmt::Display for SessionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Read(msg) => write!(f, "Failed to read session data: {}", msg), + Self::Write(msg) => write!(f, "Failed to persist session data: {}", msg), + Self::Serialize(msg) => write!(f, "Failed to serialize session value: {}", msg), + Self::Deserialize(msg) => write!(f, "Failed to deserialize session value: {}", msg), + Self::Config(msg) => write!(f, "Invalid session store configuration: {}", msg), + } + } +} + +impl std::error::Error for SessionError {} + impl From for ApiError { fn from(error: SessionError) -> Self { ApiError::internal(error.to_string()) diff --git a/crates/rustapi-extras/src/sqlx/mod.rs b/crates/rustapi-extras/src/sqlx/mod.rs index 3f3305c1..5e755dbe 100644 --- a/crates/rustapi-extras/src/sqlx/mod.rs +++ b/crates/rustapi-extras/src/sqlx/mod.rs @@ -62,22 +62,38 @@ use rustapi_core::ApiError; ))] use std::sync::Arc; use std::time::Duration; -use thiserror::Error; - /// Error type for pool operations -#[derive(Debug, Error)] +#[derive(Debug)] pub enum PoolError { /// Configuration error - #[error("Pool configuration error: {0}")] Configuration(String), - /// Connection error - #[error("Database connection error: {0}")] Connection(String), - /// SQLx error - #[error("SQLx error: {0}")] - Sqlx(#[from] sqlx::Error), + Sqlx(sqlx::Error), +} + +impl std::fmt::Display for PoolError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Configuration(msg) => write!(f, "Pool configuration error: {}", msg), + Self::Connection(msg) => write!(f, "Database connection error: {}", msg), + Self::Sqlx(e) => write!(f, "SQLx error: {}", e), + } + } +} + +impl std::error::Error for PoolError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Sqlx(e) => Some(e), + _ => None, + } + } +} + +impl From for PoolError { + fn from(e: sqlx::Error) -> Self { Self::Sqlx(e) } } /// Configuration for SQLx connection pool diff --git a/crates/rustapi-jobs/Cargo.toml b/crates/rustapi-jobs/Cargo.toml index 3c8da5b4..d42e4086 100644 --- a/crates/rustapi-jobs/Cargo.toml +++ b/crates/rustapi-jobs/Cargo.toml @@ -21,10 +21,8 @@ postgres = ["dep:sqlx"] async-trait = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -thiserror = { workspace = true } tracing = { workspace = true } -futures-util = { workspace = true } -tokio = { workspace = true, features = ["full"] } +tokio = { workspace = true, features = ["sync", "time"] } uuid = { workspace = true, features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } @@ -36,4 +34,4 @@ sqlx = { version = "0.8", features = ["postgres", "runtime-tokio", "tls-rustls", [dev-dependencies] proptest = "1.8.0" -tokio = { workspace = true, features = ["test-util"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "test-util", "time"] } diff --git a/crates/rustapi-jobs/src/backend.rs b/crates/rustapi-jobs/src/backend.rs index 5f3aa34d..0be02045 100644 --- a/crates/rustapi-jobs/src/backend.rs +++ b/crates/rustapi-jobs/src/backend.rs @@ -1,7 +1,8 @@ use crate::error::Result; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use std::future::Future; +use std::pin::Pin; pub mod memory; @@ -24,20 +25,31 @@ pub struct JobRequest { pub run_at: Option>, } -/// Backend storage for jobs -#[async_trait] +/// Backend storage for jobs (dyn-compatible via boxed futures) pub trait JobBackend: Send + Sync { /// Push a new job to the queue - async fn push(&self, job: JobRequest) -> Result<()>; + fn push<'a>( + &'a self, + job: JobRequest, + ) -> Pin> + Send + 'a>>; /// Pop the next available job /// Should return None if no job is available or ready - async fn pop(&self) -> Result>; + fn pop<'a>( + &'a self, + ) -> Pin>> + Send + 'a>>; /// Mark a job as completed successfully - async fn complete(&self, job_id: &str) -> Result<()>; + fn complete<'a>( + &'a self, + job_id: &'a str, + ) -> Pin> + Send + 'a>>; /// Mark a job as failed /// The manager will decide whether to retry (re-push) or move to DLQ - async fn fail(&self, job_id: &str, error: &str) -> Result<()>; + fn fail<'a>( + &'a self, + job_id: &'a str, + error: &'a str, + ) -> Pin> + Send + 'a>>; } diff --git a/crates/rustapi-jobs/src/backend/memory.rs b/crates/rustapi-jobs/src/backend/memory.rs index eaabd0b7..e40d7790 100644 --- a/crates/rustapi-jobs/src/backend/memory.rs +++ b/crates/rustapi-jobs/src/backend/memory.rs @@ -1,7 +1,8 @@ use super::{JobBackend, JobRequest}; use crate::error::{JobError, Result}; -use async_trait::async_trait; use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; /// In-memory job backend (not persistent, for testing/dev) @@ -17,52 +18,67 @@ impl InMemoryBackend { } } -#[async_trait] impl JobBackend for InMemoryBackend { - async fn push(&self, job: JobRequest) -> Result<()> { - let mut q = self - .queue - .lock() - .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?; - q.push_back(job); - Ok(()) + fn push<'a>( + &'a self, + job: JobRequest, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let mut q = self + .queue + .lock() + .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?; + q.push_back(job); + Ok(()) + }) } - async fn pop(&self) -> Result> { - let mut q = self - .queue - .lock() - .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?; + fn pop<'a>( + &'a self, + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + let mut q = self + .queue + .lock() + .map_err(|_| JobError::BackendError("Lock poisoned".to_string()))?; - let now = chrono::Utc::now(); - let mut index_to_remove = None; + let now = chrono::Utc::now(); + let mut index_to_remove = None; - // Scan the queue for the first ready job - for (i, job) in q.iter().enumerate() { - if let Some(run_at) = job.run_at { - if run_at > now { - continue; + // Scan the queue for the first ready job + for (i, job) in q.iter().enumerate() { + if let Some(run_at) = job.run_at { + if run_at > now { + continue; + } } + // Found a ready job (no run_at, or run_at <= now) + index_to_remove = Some(i); + break; } - // Found a ready job (no run_at, or run_at <= now) - index_to_remove = Some(i); - break; - } - if let Some(i) = index_to_remove { - Ok(q.remove(i)) - } else { - Ok(None) - } + if let Some(i) = index_to_remove { + Ok(q.remove(i)) + } else { + Ok(None) + } + }) } - async fn complete(&self, _job_id: &str) -> Result<()> { + fn complete<'a>( + &'a self, + _job_id: &'a str, + ) -> Pin> + Send + 'a>> { // No-op for simple in-memory queue that removes on pop - Ok(()) + Box::pin(async move { Ok(()) }) } - async fn fail(&self, _job_id: &str, _error: &str) -> Result<()> { + fn fail<'a>( + &'a self, + _job_id: &'a str, + _error: &'a str, + ) -> Pin> + Send + 'a>> { // In a real implementation we might move to DLQ or re-queue - Ok(()) + Box::pin(async move { Ok(()) }) } } diff --git a/crates/rustapi-jobs/src/backend/postgres.rs b/crates/rustapi-jobs/src/backend/postgres.rs index 78b5e14c..2819a365 100644 --- a/crates/rustapi-jobs/src/backend/postgres.rs +++ b/crates/rustapi-jobs/src/backend/postgres.rs @@ -1,7 +1,8 @@ use super::{JobBackend, JobRequest}; use crate::error::{JobError, Result}; -use async_trait::async_trait; use sqlx::{Pool, Postgres, Row}; +use std::future::Future; +use std::pin::Pin; /// Postgres-backed job queue #[derive(Debug, Clone)] @@ -46,37 +47,44 @@ impl PostgresBackend { } } -#[async_trait] impl JobBackend for PostgresBackend { - async fn push(&self, job: JobRequest) -> Result<()> { - let query = format!( - r#" + fn push<'a>( + &'a self, + job: JobRequest, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let query = format!( + r#" INSERT INTO {} (id, name, payload, created_at, run_at, attempts, max_attempts, last_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, - self.table_name - ); + self.table_name + ); - sqlx::query(&query) - .bind(&job.id) - .bind(&job.name) - .bind(&job.payload) - .bind(job.created_at) - .bind(job.run_at) - .bind(job.attempts as i32) - .bind(job.max_attempts as i32) - .bind(&job.last_error) - .execute(&self.pool) - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + sqlx::query(&query) + .bind(&job.id) + .bind(&job.name) + .bind(&job.payload) + .bind(job.created_at) + .bind(job.run_at) + .bind(job.attempts as i32) + .bind(job.max_attempts as i32) + .bind(&job.last_error) + .execute(&self.pool) + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - Ok(()) + Ok(()) + }) } - async fn pop(&self) -> Result> { - // Atomic pop using DELETE ... RETURNING with locking - let query = format!( - r#" + fn pop<'a>( + &'a self, + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + // Atomic pop using DELETE ... RETURNING with locking + let query = format!( + r#" DELETE FROM {} WHERE id = ( SELECT id @@ -88,42 +96,45 @@ impl JobBackend for PostgresBackend { ) RETURNING id, name, payload, created_at, run_at, attempts, max_attempts, last_error "#, - self.table_name, self.table_name - ); - - let row = sqlx::query(&query) - .fetch_optional(&self.pool) - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + self.table_name, self.table_name + ); - if let Some(row) = row { - // Reconstruct JobRequest - // Note: In a real persistent queue we wouldn't DELETE, we'd update status to 'processing' - // and have a 'cleanup' or 'retry' mechanism for stalled jobs. - // But staying consistent with simple queue contract for now. + let row = sqlx::query(&query) + .fetch_optional(&self.pool) + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - Ok(Some(JobRequest { - id: row.get("id"), - name: row.get("name"), - payload: row.get("payload"), - created_at: row.get("created_at"), - run_at: row.get("run_at"), - attempts: row.get::("attempts") as u32, - max_attempts: row.get::("max_attempts") as u32, - last_error: row.get("last_error"), - })) - } else { - Ok(None) - } + if let Some(row) = row { + Ok(Some(JobRequest { + id: row.get("id"), + name: row.get("name"), + payload: row.get("payload"), + created_at: row.get("created_at"), + run_at: row.get("run_at"), + attempts: row.get::("attempts") as u32, + max_attempts: row.get::("max_attempts") as u32, + last_error: row.get("last_error"), + })) + } else { + Ok(None) + } + }) } - async fn complete(&self, _job_id: &str) -> Result<()> { + fn complete<'a>( + &'a self, + _job_id: &'a str, + ) -> Pin> + Send + 'a>> { // Already deleted - Ok(()) + Box::pin(async move { Ok(()) }) } - async fn fail(&self, _job_id: &str, _error: &str) -> Result<()> { + fn fail<'a>( + &'a self, + _job_id: &'a str, + _error: &'a str, + ) -> Pin> + Send + 'a>> { // Already deleted. DLQ logic would go here. - Ok(()) + Box::pin(async move { Ok(()) }) } } diff --git a/crates/rustapi-jobs/src/backend/redis.rs b/crates/rustapi-jobs/src/backend/redis.rs index e0c099bf..5d323b6f 100644 --- a/crates/rustapi-jobs/src/backend/redis.rs +++ b/crates/rustapi-jobs/src/backend/redis.rs @@ -1,7 +1,8 @@ use super::{JobBackend, JobRequest}; use crate::error::{JobError, Result}; -use async_trait::async_trait; use redis::{AsyncCommands, Client, Script}; +use std::future::Future; +use std::pin::Pin; /// Redis-backed job queue #[derive(Debug, Clone)] @@ -38,60 +39,72 @@ impl RedisBackend { } } -#[async_trait] impl JobBackend for RedisBackend { - async fn push(&self, job: JobRequest) -> Result<()> { - let mut conn = self - .client - .get_multiplexed_async_connection() - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + fn push<'a>( + &'a self, + job: JobRequest, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let mut conn = self + .client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - let score = job.run_at.unwrap_or(chrono::Utc::now()).timestamp() as f64; - let payload = serde_json::to_string(&job)?; + let score = job.run_at.unwrap_or(chrono::Utc::now()).timestamp() as f64; + let payload = serde_json::to_string(&job)?; - conn.zadd::<_, _, _, ()>(&self.queue_key, score, payload) - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + conn.zadd::<_, _, _, ()>(&self.queue_key, score, payload) + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - Ok(()) + Ok(()) + }) } - async fn pop(&self) -> Result> { - let mut conn = self - .client - .get_multiplexed_async_connection() - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + fn pop<'a>( + &'a self, + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + let mut conn = self + .client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - let now = chrono::Utc::now().timestamp() as f64; + let now = chrono::Utc::now().timestamp() as f64; - let result: Option = self - .pop_script - .key(&self.queue_key) - .arg(now) - .invoke_async(&mut conn) - .await - .map_err(|e| JobError::BackendError(e.to_string()))?; + let result: Option = self + .pop_script + .key(&self.queue_key) + .arg(now) + .invoke_async(&mut conn) + .await + .map_err(|e| JobError::BackendError(e.to_string()))?; - if let Some(json_str) = result { - let job: JobRequest = serde_json::from_str(&json_str)?; - Ok(Some(job)) - } else { - Ok(None) - } + if let Some(json_str) = result { + let job: JobRequest = serde_json::from_str(&json_str)?; + Ok(Some(job)) + } else { + Ok(None) + } + }) } - async fn complete(&self, _job_id: &str) -> Result<()> { + fn complete<'a>( + &'a self, + _job_id: &'a str, + ) -> Pin> + Send + 'a>> { // Job is already removed from ZSET on pop - // In a reliable system we would move it to a 'processing' set first - // But for this implementation we assume 'at-most-once' or simple mechanics - Ok(()) + Box::pin(async move { Ok(()) }) } - async fn fail(&self, _job_id: &str, _error: &str) -> Result<()> { - // Similar to complete, it's already removed. - // We could implement a DLQ here. - Ok(()) + fn fail<'a>( + &'a self, + _job_id: &'a str, + _error: &'a str, + ) -> Pin> + Send + 'a>> { + // Already removed. DLQ logic would go here. + Box::pin(async move { Ok(()) }) } } diff --git a/crates/rustapi-jobs/src/error.rs b/crates/rustapi-jobs/src/error.rs index a878966b..7f97f0de 100644 --- a/crates/rustapi-jobs/src/error.rs +++ b/crates/rustapi-jobs/src/error.rs @@ -1,24 +1,39 @@ -use thiserror::Error; +use std::fmt; -#[derive(Debug, Error)] +#[derive(Debug)] pub enum JobError { - #[error("Job serialization error: {0}")] - SerializationError(#[from] serde_json::Error), - - #[error("Backend error: {0}")] + SerializationError(serde_json::Error), BackendError(String), - - #[error("Job not found: {0}")] NotFound(String), - - #[error("Worker error: {0}")] WorkerError(String), - - #[error("Configuration error: {0}")] ConfigError(String), - - #[error("Unknown job type: {0}")] UnknownJobType(String), } +impl fmt::Display for JobError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::SerializationError(e) => write!(f, "Job serialization error: {}", e), + Self::BackendError(msg) => write!(f, "Backend error: {}", msg), + Self::NotFound(msg) => write!(f, "Job not found: {}", msg), + Self::WorkerError(msg) => write!(f, "Worker error: {}", msg), + Self::ConfigError(msg) => write!(f, "Configuration error: {}", msg), + Self::UnknownJobType(msg) => write!(f, "Unknown job type: {}", msg), + } + } +} + +impl std::error::Error for JobError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::SerializationError(e) => Some(e), + _ => None, + } + } +} + +impl From for JobError { + fn from(e: serde_json::Error) -> Self { Self::SerializationError(e) } +} + pub type Result = std::result::Result; diff --git a/crates/rustapi-jobs/src/job.rs b/crates/rustapi-jobs/src/job.rs index 5d914ac2..5b847d82 100644 --- a/crates/rustapi-jobs/src/job.rs +++ b/crates/rustapi-jobs/src/job.rs @@ -1,7 +1,8 @@ use crate::error::Result; -use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; /// Context passed to job execution #[derive(Debug, Clone)] @@ -12,7 +13,6 @@ pub struct JobContext { } /// A job that can be executed -#[async_trait] pub trait Job: Send + Sync + 'static { /// The job name/type const NAME: &'static str; @@ -21,19 +21,31 @@ pub trait Job: Send + Sync + 'static { type Data: Serialize + DeserializeOwned + Send + Sync + Debug; /// Execute the job - async fn execute(&self, ctx: JobContext, data: Self::Data) -> Result<()>; + fn execute( + &self, + ctx: JobContext, + data: Self::Data, + ) -> impl Future> + Send; } -/// A type-erased job handler -#[async_trait] +/// A type-erased job handler (dyn-compatible via boxed futures) pub trait JobHandler: Send + Sync { - async fn handle(&self, ctx: JobContext, data: serde_json::Value) -> Result<()>; + fn handle<'a>( + &'a self, + ctx: JobContext, + data: serde_json::Value, + ) -> Pin> + Send + 'a>>; } -#[async_trait] impl JobHandler for J { - async fn handle(&self, ctx: JobContext, data: serde_json::Value) -> Result<()> { - let data: J::Data = serde_json::from_value(data)?; - self.execute(ctx, data).await + fn handle<'a>( + &'a self, + ctx: JobContext, + data: serde_json::Value, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let data: J::Data = serde_json::from_value(data)?; + self.execute(ctx, data).await + }) } } diff --git a/crates/rustapi-jobs/src/queue.rs b/crates/rustapi-jobs/src/queue.rs index 427aff28..89f8fe19 100644 --- a/crates/rustapi-jobs/src/queue.rs +++ b/crates/rustapi-jobs/src/queue.rs @@ -159,7 +159,6 @@ mod property_tests { use super::*; use crate::backend::memory::InMemoryBackend as MemoryBackend; use crate::JobError; - use async_trait::async_trait; use proptest::prelude::*; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -186,7 +185,6 @@ mod property_tests { execution_count: Arc>, } - #[async_trait] impl Job for TestJob { const NAME: &'static str = "test_job"; type Data = TestJobData; diff --git a/crates/rustapi-jobs/tests/job_persistence_test.rs b/crates/rustapi-jobs/tests/job_persistence_test.rs index 893dd84d..d51279c1 100644 --- a/crates/rustapi-jobs/tests/job_persistence_test.rs +++ b/crates/rustapi-jobs/tests/job_persistence_test.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use rustapi_jobs::{InMemoryBackend, Job, JobContext, JobQueue, Result}; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex}; @@ -16,7 +15,6 @@ struct EmailJob { processed: Arc>>, } -#[async_trait] impl Job for EmailJob { const NAME: &'static str = "email_job"; type Data = EmailJobData; @@ -102,7 +100,6 @@ struct FailingJob { attempts: Arc>, } -#[async_trait] impl Job for FailingJob { const NAME: &'static str = "failing_job"; type Data = (); // No data needed diff --git a/crates/rustapi-jobs/tests/repro_blocking.rs b/crates/rustapi-jobs/tests/repro_blocking.rs index 40da453a..4e6cdf63 100644 --- a/crates/rustapi-jobs/tests/repro_blocking.rs +++ b/crates/rustapi-jobs/tests/repro_blocking.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use rustapi_jobs::{EnqueueOptions, InMemoryBackend, Job, JobContext, JobQueue, Result}; use serde::{Deserialize, Serialize}; use std::sync::{Arc, Mutex}; @@ -14,7 +13,6 @@ struct SimpleJob { processed_ids: Arc>>, } -#[async_trait] impl Job for SimpleJob { const NAME: &'static str = "simple_job"; type Data = SimpleJobData; diff --git a/crates/rustapi-testing/Cargo.toml b/crates/rustapi-testing/Cargo.toml index dee84ca6..21e3289b 100644 --- a/crates/rustapi-testing/Cargo.toml +++ b/crates/rustapi-testing/Cargo.toml @@ -13,21 +13,18 @@ rust-version.workspace = true description = "Testing utilities for RustAPI applications. Provides checking helpers, test servers, and fluid assertions." [dependencies] -tokio = { workspace = true, features = ["full"] } -hyper = { workspace = true, features = ["full"] } -hyper-util = { workspace = true, features = ["full"] } +tokio = { workspace = true, features = ["macros", "net", "rt", "sync"] } +hyper = { workspace = true, features = ["http1", "server"] } +hyper-util = { workspace = true, features = ["http1", "server-auto", "tokio"] } http = { workspace = true } http-body-util = { workspace = true } bytes = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -thiserror = { workspace = true } -tracing = { workspace = true } -futures-util = { workspace = true } rustapi-core = { workspace = true } [dev-dependencies] proptest = "1.8.0" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } -tokio = { workspace = true, features = ["full", "test-util"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "test-util"] } diff --git a/crates/rustapi-toon/Cargo.toml b/crates/rustapi-toon/Cargo.toml index 98529b67..5a27119e 100644 --- a/crates/rustapi-toon/Cargo.toml +++ b/crates/rustapi-toon/Cargo.toml @@ -36,8 +36,6 @@ futures-util = { workspace = true } tracing = { workspace = true } # Error handling -thiserror = { workspace = true } - [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } serde_json = { workspace = true } diff --git a/crates/rustapi-toon/src/error.rs b/crates/rustapi-toon/src/error.rs index e85895e2..964bd03b 100644 --- a/crates/rustapi-toon/src/error.rs +++ b/crates/rustapi-toon/src/error.rs @@ -1,28 +1,34 @@ //! TOON Error types and conversions +use std::fmt; use rustapi_core::ApiError; -use thiserror::Error; /// Error type for TOON operations -#[derive(Error, Debug)] +#[derive(Debug)] pub enum ToonError { /// Error during TOON encoding (serialization) - #[error("TOON encoding error: {0}")] Encode(String), - /// Error during TOON decoding (parsing/deserialization) - #[error("TOON decoding error: {0}")] Decode(String), - /// Invalid content type for TOON request - #[error("Invalid content type: expected application/toon or text/toon")] InvalidContentType, - /// Empty body provided - #[error("Empty request body")] EmptyBody, } +impl fmt::Display for ToonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Encode(msg) => write!(f, "TOON encoding error: {}", msg), + Self::Decode(msg) => write!(f, "TOON decoding error: {}", msg), + Self::InvalidContentType => write!(f, "Invalid content type: expected application/toon or text/toon"), + Self::EmptyBody => write!(f, "Empty request body"), + } + } +} + +impl std::error::Error for ToonError {} + impl From for ToonError { fn from(err: toon_format::ToonError) -> Self { match &err { diff --git a/crates/rustapi-validate/Cargo.toml b/crates/rustapi-validate/Cargo.toml index be6f04cd..d657c462 100644 --- a/crates/rustapi-validate/Cargo.toml +++ b/crates/rustapi-validate/Cargo.toml @@ -16,9 +16,6 @@ homepage.workspace = true serde = { workspace = true } serde_json = { workspace = true } -# Error handling -thiserror = { workspace = true } - # HTTP types for response http = { workspace = true } diff --git a/crates/rustapi-view/Cargo.toml b/crates/rustapi-view/Cargo.toml index 1146c866..1a9d52e9 100644 --- a/crates/rustapi-view/Cargo.toml +++ b/crates/rustapi-view/Cargo.toml @@ -33,7 +33,6 @@ bytes = { workspace = true } tokio = { workspace = true, features = ["sync"] } # Utilities -thiserror = { workspace = true } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/rustapi-view/src/error.rs b/crates/rustapi-view/src/error.rs index 29374642..070f490b 100644 --- a/crates/rustapi-view/src/error.rs +++ b/crates/rustapi-view/src/error.rs @@ -1,37 +1,60 @@ //! View error types -use thiserror::Error; +use std::fmt; /// Error type for view/template operations -#[derive(Error, Debug)] +#[derive(Debug)] pub enum ViewError { /// Template not found - #[error("Template not found: {0}")] TemplateNotFound(String), - /// Template rendering failed - #[error("Template rendering failed: {0}")] RenderError(String), - /// Template parsing failed - #[error("Template parsing failed: {0}")] ParseError(String), - /// Context serialization failed - #[error("Context serialization failed: {0}")] SerializationError(String), - /// Template engine not initialized - #[error("Template engine not initialized")] NotInitialized, - /// IO error - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - + IoError(std::io::Error), /// Tera error - #[error("Tera error: {0}")] - Tera(#[from] tera::Error), + Tera(tera::Error), +} + +impl fmt::Display for ViewError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::TemplateNotFound(name) => write!(f, "Template not found: {}", name), + Self::RenderError(msg) => write!(f, "Template rendering failed: {}", msg), + Self::ParseError(msg) => write!(f, "Template parsing failed: {}", msg), + Self::SerializationError(msg) => write!(f, "Context serialization failed: {}", msg), + Self::NotInitialized => write!(f, "Template engine not initialized"), + Self::IoError(e) => write!(f, "IO error: {}", e), + Self::Tera(e) => write!(f, "Tera error: {}", e), + } + } +} + +impl std::error::Error for ViewError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::IoError(e) => Some(e), + Self::Tera(e) => Some(e), + _ => None, + } + } +} + +impl From for ViewError { + fn from(e: std::io::Error) -> Self { + Self::IoError(e) + } +} + +impl From for ViewError { + fn from(e: tera::Error) -> Self { + Self::Tera(e) + } } impl ViewError { diff --git a/crates/rustapi-ws/Cargo.toml b/crates/rustapi-ws/Cargo.toml index 2ce79b2c..5ff2b037 100644 --- a/crates/rustapi-ws/Cargo.toml +++ b/crates/rustapi-ws/Cargo.toml @@ -37,7 +37,6 @@ serde = { workspace = true } serde_json = { workspace = true } # Utilities -thiserror = { workspace = true } tracing = { workspace = true } pin-project-lite = { workspace = true } async-trait = { workspace = true } @@ -45,7 +44,6 @@ url = "2.5" # SHA-1 for WebSocket handshake sha1 = "0.10" -base64 = "0.22" [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } diff --git a/crates/rustapi-ws/src/auth.rs b/crates/rustapi-ws/src/auth.rs index 35c9bd35..11045646 100644 --- a/crates/rustapi-ws/src/auth.rs +++ b/crates/rustapi-ws/src/auth.rs @@ -25,36 +25,40 @@ use std::collections::HashMap; use std::sync::Arc; -use thiserror::Error; +use std::fmt; /// Error type for WebSocket authentication -#[derive(Error, Debug, Clone)] +#[derive(Debug, Clone)] pub enum AuthError { /// Token is missing from the request - #[error("Authentication token missing")] TokenMissing, - /// Token format is invalid - #[error("Invalid token format: {0}")] InvalidFormat(String), - /// Token has expired - #[error("Token has expired")] TokenExpired, - /// Token signature is invalid - #[error("Invalid token signature")] InvalidSignature, - /// Token validation failed - #[error("Token validation failed: {0}")] ValidationFailed(String), - /// Insufficient permissions - #[error("Insufficient permissions: {0}")] InsufficientPermissions(String), } +impl fmt::Display for AuthError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::TokenMissing => write!(f, "Authentication token missing"), + Self::InvalidFormat(msg) => write!(f, "Invalid token format: {}", msg), + Self::TokenExpired => write!(f, "Token has expired"), + Self::InvalidSignature => write!(f, "Invalid token signature"), + Self::ValidationFailed(msg) => write!(f, "Token validation failed: {}", msg), + Self::InsufficientPermissions(msg) => write!(f, "Insufficient permissions: {}", msg), + } + } +} + +impl std::error::Error for AuthError {} + impl AuthError { /// Create a validation failed error pub fn validation_failed(msg: impl Into) -> Self { diff --git a/crates/rustapi-ws/src/error.rs b/crates/rustapi-ws/src/error.rs index 0efd187c..254174e8 100644 --- a/crates/rustapi-ws/src/error.rs +++ b/crates/rustapi-ws/src/error.rs @@ -1,49 +1,69 @@ //! WebSocket error types -use thiserror::Error; +use std::fmt; /// Error type for WebSocket operations -#[derive(Error, Debug)] +#[derive(Debug)] pub enum WebSocketError { /// Invalid WebSocket upgrade request - #[error("Invalid WebSocket upgrade request: {0}")] InvalidUpgrade(String), - /// WebSocket handshake failed - #[error("WebSocket handshake failed: {0}")] HandshakeFailed(String), - /// Connection closed unexpectedly - #[error("Connection closed unexpectedly")] ConnectionClosed, - /// Failed to send message - #[error("Failed to send message: {0}")] SendFailed(String), - /// Failed to receive message - #[error("Failed to receive message: {0}")] ReceiveFailed(String), - /// Message serialization error - #[error("Message serialization error: {0}")] SerializationError(String), - /// Message deserialization error - #[error("Message deserialization error: {0}")] DeserializationError(String), - /// Protocol error - #[error("WebSocket protocol error: {0}")] ProtocolError(String), - /// IO error - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - + IoError(std::io::Error), /// Tungstenite error - #[error("WebSocket error: {0}")] - Tungstenite(#[from] tungstenite::Error), + Tungstenite(tungstenite::Error), +} + +impl fmt::Display for WebSocketError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidUpgrade(msg) => write!(f, "Invalid WebSocket upgrade request: {}", msg), + Self::HandshakeFailed(msg) => write!(f, "WebSocket handshake failed: {}", msg), + Self::ConnectionClosed => write!(f, "Connection closed unexpectedly"), + Self::SendFailed(msg) => write!(f, "Failed to send message: {}", msg), + Self::ReceiveFailed(msg) => write!(f, "Failed to receive message: {}", msg), + Self::SerializationError(msg) => write!(f, "Message serialization error: {}", msg), + Self::DeserializationError(msg) => write!(f, "Message deserialization error: {}", msg), + Self::ProtocolError(msg) => write!(f, "WebSocket protocol error: {}", msg), + Self::IoError(e) => write!(f, "IO error: {}", e), + Self::Tungstenite(e) => write!(f, "WebSocket error: {}", e), + } + } +} + +impl std::error::Error for WebSocketError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::IoError(e) => Some(e), + Self::Tungstenite(e) => Some(e), + _ => None, + } + } +} + +impl From for WebSocketError { + fn from(e: std::io::Error) -> Self { + Self::IoError(e) + } +} + +impl From for WebSocketError { + fn from(e: tungstenite::Error) -> Self { + Self::Tungstenite(e) + } } impl WebSocketError { diff --git a/crates/rustapi-ws/src/upgrade.rs b/crates/rustapi-ws/src/upgrade.rs index 48d8dbbc..37b6b45a 100644 --- a/crates/rustapi-ws/src/upgrade.rs +++ b/crates/rustapi-ws/src/upgrade.rs @@ -344,9 +344,25 @@ fn parse_window_bits(value: &str) -> Option { } } +/// RFC 4648 standard base64 encode (no external crate) +fn base64_encode(input: &[u8]) -> String { + const ALPHA: &[u8; 64] = + b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + let mut out = String::with_capacity((input.len() + 2) / 3 * 4); + for chunk in input.chunks(3) { + let b0 = chunk[0] as usize; + let b1 = if chunk.len() > 1 { chunk[1] as usize } else { 0 }; + let b2 = if chunk.len() > 2 { chunk[2] as usize } else { 0 }; + out.push(ALPHA[b0 >> 2] as char); + out.push(ALPHA[((b0 & 3) << 4) | (b1 >> 4)] as char); + out.push(if chunk.len() > 1 { ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char } else { '=' }); + out.push(if chunk.len() > 2 { ALPHA[b2 & 63] as char } else { '=' }); + } + out +} + /// Generate the Sec-WebSocket-Accept key from the client's Sec-WebSocket-Key fn generate_accept_key(key: &str) -> String { - use base64::Engine; use sha1::{Digest, Sha1}; const GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; @@ -356,7 +372,7 @@ fn generate_accept_key(key: &str) -> String { hasher.update(GUID.as_bytes()); let hash = hasher.finalize(); - base64::engine::general_purpose::STANDARD.encode(hash) + base64_encode(&hash) } /// Validate that a request is a valid WebSocket upgrade request From 12ad1a623d7442dc92578bf303baeda34f2b536d Mon Sep 17 00:00:00 2001 From: Tunay <121901995+Tuntii@users.noreply.github.com> Date: Tue, 19 May 2026 09:02:46 +0300 Subject: [PATCH 2/3] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- crates/rustapi-toon/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rustapi-toon/Cargo.toml b/crates/rustapi-toon/Cargo.toml index 5a27119e..2e14b80e 100644 --- a/crates/rustapi-toon/Cargo.toml +++ b/crates/rustapi-toon/Cargo.toml @@ -35,7 +35,6 @@ futures-util = { workspace = true } # Logging tracing = { workspace = true } -# Error handling [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } serde_json = { workspace = true } From 5d5fedcc26b0bfda62a25b19a2a3b939c31e6eb9 Mon Sep 17 00:00:00 2001 From: Tuntii Date: Wed, 20 May 2026 20:28:58 +0300 Subject: [PATCH 3/3] style: run cargo fmt and fix manual_div_ceil clippy warnings --- crates/rustapi-core/src/app.rs | 26 ++++++++++++++---- crates/rustapi-extras/src/diesel/mod.rs | 2 +- crates/rustapi-extras/src/insight/export.rs | 8 ++++-- crates/rustapi-extras/src/replay/client.rs | 4 ++- crates/rustapi-extras/src/sqlx/mod.rs | 4 ++- crates/rustapi-jobs/src/backend.rs | 10 +++---- crates/rustapi-jobs/src/backend/memory.rs | 4 +-- crates/rustapi-jobs/src/backend/postgres.rs | 4 +-- crates/rustapi-jobs/src/backend/redis.rs | 4 +-- crates/rustapi-jobs/src/error.rs | 4 ++- crates/rustapi-jobs/src/job.rs | 7 ++--- crates/rustapi-toon/src/error.rs | 7 +++-- crates/rustapi-ws/src/auth.rs | 2 +- crates/rustapi-ws/src/upgrade.rs | 29 ++++++++++++++++----- 14 files changed, 73 insertions(+), 42 deletions(-) diff --git a/crates/rustapi-core/src/app.rs b/crates/rustapi-core/src/app.rs index 81f93ee7..e1944a93 100644 --- a/crates/rustapi-core/src/app.rs +++ b/crates/rustapi-core/src/app.rs @@ -995,15 +995,31 @@ impl RustApi { fn base64_encode(input: &[u8]) -> String { const ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let mut out = String::with_capacity((input.len() + 2) / 3 * 4); + let mut out = String::with_capacity(input.len().div_ceil(3) * 4); for chunk in input.chunks(3) { let b0 = chunk[0] as usize; - let b1 = if chunk.len() > 1 { chunk[1] as usize } else { 0 }; - let b2 = if chunk.len() > 2 { chunk[2] as usize } else { 0 }; + let b1 = if chunk.len() > 1 { + chunk[1] as usize + } else { + 0 + }; + let b2 = if chunk.len() > 2 { + chunk[2] as usize + } else { + 0 + }; out.push(ALPHA[b0 >> 2] as char); out.push(ALPHA[((b0 & 3) << 4) | (b1 >> 4)] as char); - out.push(if chunk.len() > 1 { ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char } else { '=' }); - out.push(if chunk.len() > 2 { ALPHA[b2 & 63] as char } else { '=' }); + out.push(if chunk.len() > 1 { + ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char + } else { + '=' + }); + out.push(if chunk.len() > 2 { + ALPHA[b2 & 63] as char + } else { + '=' + }); } out } diff --git a/crates/rustapi-extras/src/diesel/mod.rs b/crates/rustapi-extras/src/diesel/mod.rs index cdd09c6c..e15f2c4c 100644 --- a/crates/rustapi-extras/src/diesel/mod.rs +++ b/crates/rustapi-extras/src/diesel/mod.rs @@ -24,9 +24,9 @@ //! ``` use rustapi_core::health::{HealthCheck, HealthCheckBuilder, HealthStatus}; +use std::fmt; use std::sync::Arc; use std::time::Duration; -use std::fmt; /// Error type for Diesel pool operations #[derive(Debug)] diff --git a/crates/rustapi-extras/src/insight/export.rs b/crates/rustapi-extras/src/insight/export.rs index 8ada6a4e..3749ad98 100644 --- a/crates/rustapi-extras/src/insight/export.rs +++ b/crates/rustapi-extras/src/insight/export.rs @@ -44,11 +44,15 @@ impl std::error::Error for ExportError { } impl From for ExportError { - fn from(e: std::io::Error) -> Self { Self::Io(e) } + fn from(e: std::io::Error) -> Self { + Self::Io(e) + } } impl From for ExportError { - fn from(e: serde_json::Error) -> Self { Self::Serialization(e) } + fn from(e: serde_json::Error) -> Self { + Self::Serialization(e) + } } /// Result type for export operations. diff --git a/crates/rustapi-extras/src/replay/client.rs b/crates/rustapi-extras/src/replay/client.rs index fdd53ced..0d759778 100644 --- a/crates/rustapi-extras/src/replay/client.rs +++ b/crates/rustapi-extras/src/replay/client.rs @@ -32,7 +32,9 @@ impl std::error::Error for ReplayClientError { } impl From for ReplayClientError { - fn from(e: reqwest::Error) -> Self { Self::Http(e) } + fn from(e: reqwest::Error) -> Self { + Self::Http(e) + } } /// HTTP client for replaying recorded requests against a target server. diff --git a/crates/rustapi-extras/src/sqlx/mod.rs b/crates/rustapi-extras/src/sqlx/mod.rs index 5e755dbe..1d849e00 100644 --- a/crates/rustapi-extras/src/sqlx/mod.rs +++ b/crates/rustapi-extras/src/sqlx/mod.rs @@ -93,7 +93,9 @@ impl std::error::Error for PoolError { } impl From for PoolError { - fn from(e: sqlx::Error) -> Self { Self::Sqlx(e) } + fn from(e: sqlx::Error) -> Self { + Self::Sqlx(e) + } } /// Configuration for SQLx connection pool diff --git a/crates/rustapi-jobs/src/backend.rs b/crates/rustapi-jobs/src/backend.rs index 0be02045..b60f7c9b 100644 --- a/crates/rustapi-jobs/src/backend.rs +++ b/crates/rustapi-jobs/src/backend.rs @@ -28,16 +28,12 @@ pub struct JobRequest { /// Backend storage for jobs (dyn-compatible via boxed futures) pub trait JobBackend: Send + Sync { /// Push a new job to the queue - fn push<'a>( - &'a self, - job: JobRequest, - ) -> Pin> + Send + 'a>>; + fn push<'a>(&'a self, job: JobRequest) + -> Pin> + Send + 'a>>; /// Pop the next available job /// Should return None if no job is available or ready - fn pop<'a>( - &'a self, - ) -> Pin>> + Send + 'a>>; + fn pop<'a>(&'a self) -> Pin>> + Send + 'a>>; /// Mark a job as completed successfully fn complete<'a>( diff --git a/crates/rustapi-jobs/src/backend/memory.rs b/crates/rustapi-jobs/src/backend/memory.rs index e40d7790..bca52009 100644 --- a/crates/rustapi-jobs/src/backend/memory.rs +++ b/crates/rustapi-jobs/src/backend/memory.rs @@ -33,9 +33,7 @@ impl JobBackend for InMemoryBackend { }) } - fn pop<'a>( - &'a self, - ) -> Pin>> + Send + 'a>> { + fn pop<'a>(&'a self) -> Pin>> + Send + 'a>> { Box::pin(async move { let mut q = self .queue diff --git a/crates/rustapi-jobs/src/backend/postgres.rs b/crates/rustapi-jobs/src/backend/postgres.rs index 2819a365..f0683c73 100644 --- a/crates/rustapi-jobs/src/backend/postgres.rs +++ b/crates/rustapi-jobs/src/backend/postgres.rs @@ -78,9 +78,7 @@ impl JobBackend for PostgresBackend { }) } - fn pop<'a>( - &'a self, - ) -> Pin>> + Send + 'a>> { + fn pop<'a>(&'a self) -> Pin>> + Send + 'a>> { Box::pin(async move { // Atomic pop using DELETE ... RETURNING with locking let query = format!( diff --git a/crates/rustapi-jobs/src/backend/redis.rs b/crates/rustapi-jobs/src/backend/redis.rs index 5d323b6f..5c9d9079 100644 --- a/crates/rustapi-jobs/src/backend/redis.rs +++ b/crates/rustapi-jobs/src/backend/redis.rs @@ -62,9 +62,7 @@ impl JobBackend for RedisBackend { }) } - fn pop<'a>( - &'a self, - ) -> Pin>> + Send + 'a>> { + fn pop<'a>(&'a self) -> Pin>> + Send + 'a>> { Box::pin(async move { let mut conn = self .client diff --git a/crates/rustapi-jobs/src/error.rs b/crates/rustapi-jobs/src/error.rs index 7f97f0de..866469cc 100644 --- a/crates/rustapi-jobs/src/error.rs +++ b/crates/rustapi-jobs/src/error.rs @@ -33,7 +33,9 @@ impl std::error::Error for JobError { } impl From for JobError { - fn from(e: serde_json::Error) -> Self { Self::SerializationError(e) } + fn from(e: serde_json::Error) -> Self { + Self::SerializationError(e) + } } pub type Result = std::result::Result; diff --git a/crates/rustapi-jobs/src/job.rs b/crates/rustapi-jobs/src/job.rs index 5b847d82..8d7f09e7 100644 --- a/crates/rustapi-jobs/src/job.rs +++ b/crates/rustapi-jobs/src/job.rs @@ -21,11 +21,8 @@ pub trait Job: Send + Sync + 'static { type Data: Serialize + DeserializeOwned + Send + Sync + Debug; /// Execute the job - fn execute( - &self, - ctx: JobContext, - data: Self::Data, - ) -> impl Future> + Send; + fn execute(&self, ctx: JobContext, data: Self::Data) + -> impl Future> + Send; } /// A type-erased job handler (dyn-compatible via boxed futures) diff --git a/crates/rustapi-toon/src/error.rs b/crates/rustapi-toon/src/error.rs index 964bd03b..bdddaaa7 100644 --- a/crates/rustapi-toon/src/error.rs +++ b/crates/rustapi-toon/src/error.rs @@ -1,7 +1,7 @@ //! TOON Error types and conversions -use std::fmt; use rustapi_core::ApiError; +use std::fmt; /// Error type for TOON operations #[derive(Debug)] @@ -21,7 +21,10 @@ impl fmt::Display for ToonError { match self { Self::Encode(msg) => write!(f, "TOON encoding error: {}", msg), Self::Decode(msg) => write!(f, "TOON decoding error: {}", msg), - Self::InvalidContentType => write!(f, "Invalid content type: expected application/toon or text/toon"), + Self::InvalidContentType => write!( + f, + "Invalid content type: expected application/toon or text/toon" + ), Self::EmptyBody => write!(f, "Empty request body"), } } diff --git a/crates/rustapi-ws/src/auth.rs b/crates/rustapi-ws/src/auth.rs index 11045646..d87990a8 100644 --- a/crates/rustapi-ws/src/auth.rs +++ b/crates/rustapi-ws/src/auth.rs @@ -24,8 +24,8 @@ //! ``` use std::collections::HashMap; -use std::sync::Arc; use std::fmt; +use std::sync::Arc; /// Error type for WebSocket authentication #[derive(Debug, Clone)] diff --git a/crates/rustapi-ws/src/upgrade.rs b/crates/rustapi-ws/src/upgrade.rs index 37b6b45a..5ad1e78a 100644 --- a/crates/rustapi-ws/src/upgrade.rs +++ b/crates/rustapi-ws/src/upgrade.rs @@ -346,17 +346,32 @@ fn parse_window_bits(value: &str) -> Option { /// RFC 4648 standard base64 encode (no external crate) fn base64_encode(input: &[u8]) -> String { - const ALPHA: &[u8; 64] = - b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let mut out = String::with_capacity((input.len() + 2) / 3 * 4); + const ALPHA: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + let mut out = String::with_capacity(input.len().div_ceil(3) * 4); for chunk in input.chunks(3) { let b0 = chunk[0] as usize; - let b1 = if chunk.len() > 1 { chunk[1] as usize } else { 0 }; - let b2 = if chunk.len() > 2 { chunk[2] as usize } else { 0 }; + let b1 = if chunk.len() > 1 { + chunk[1] as usize + } else { + 0 + }; + let b2 = if chunk.len() > 2 { + chunk[2] as usize + } else { + 0 + }; out.push(ALPHA[b0 >> 2] as char); out.push(ALPHA[((b0 & 3) << 4) | (b1 >> 4)] as char); - out.push(if chunk.len() > 1 { ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char } else { '=' }); - out.push(if chunk.len() > 2 { ALPHA[b2 & 63] as char } else { '=' }); + out.push(if chunk.len() > 1 { + ALPHA[((b1 & 0xf) << 2) | (b2 >> 6)] as char + } else { + '=' + }); + out.push(if chunk.len() > 2 { + ALPHA[b2 & 63] as char + } else { + '=' + }); } out }