diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..ecf4b751 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,37 @@ +# Agent Instructions + +- Run repository-wide checks with `./run_checks.sh`. +- Run repository-wide tests with `./run_tests.sh` outside the sandbox, after confirming single tests of affected changes inside the sandbox. + +## Rust Rules + +- Do not run multiple `cargo` instances in parallel! They anyway lock. +- Inside the sandbox, `cargo test` may run either one explicit test or a broader test selection only when passed `-- --test-threads=1`. Any grouped or repeated multi-threaded `cargo test` run must be executed outside the sandbox. +- Format Rust code according to `rustfmt.toml`. +- Keep Rust changes clippy-clean. +- Avoid immediately executed anonymous functions such as `(|| { ... })()`. Prefer ordinary `Result` or `Option` handling when the body is short; extract a named helper for larger bodies. +- Prefer readable control flow over chained iterator side effects. +- Use Snafu-derived error types (`#[derive(Snafu)]`) for Rust error enums. +- Prefer `context(...)` / `with_context(...)` over manual `map_err(...)` when the target error still wraps the original source. Use `with_context(...)` when building the context captures clones, allocations, or other non-trivial work. +- If the only reason to introduce a new error variant is to differentiate the use-site of an existing variant, prefer adding `location: Location` to the existing variant instead. +- Use `#[snafu(module(...))]` plus module-qualified selector names when otherwise identical selector names would collide. Do not introduce custom selector aliases like `FooBarBazSnafu` just to disambiguate use sites. +- Keep Snafu variant names generic inside one error enum. Do not bake call-site names like `PublishStoreAccess` into the variant when the enum type or selector module already provides that context. +- Reserve manual `map_err(...)` for real error translation cases that `context(...)` cannot express cleanly. +- Do not manually construct Snafu boxed-source variants with `Box::new(source)`; use `result.boxed().context(SelectorSnafu)` or `context(...)`/`with_context(...)` instead. +- When splitting a single-file Rust module into a folder module, move the original module contents to `mod.rs` in the new folder. +- Avoid nesting `?` into expressions. It's easier to read if they only occur at the end of a line. Refactor the expression into a field where needed. +- Document non-public Rust helpers, fields, variants, and local types whenever their role, invariants, lifecycle, or preconditions are non-trivial or non-obvious. Prefer documenting what the item is supposed to do before adding code that explains how it does it. +- Add loop labels when control flow spans non-trivial nested loops or retries. +- Prefer the following top-level grouping within Rust files unless there is a strong local reason not to: + 1. public items (`pub`) + 2. restricted-visibility items (`pub()`) + 3. macros + 4. private items + 5. exposed test helpers + 6. tests +- Within each group, use this order: + 1. constants + 2. traits + 3. functions + 4. structs/enums, each followed immediately by all associated `impl` blocks +- Imports should remain at the very top of the file/module/function. diff --git a/Cargo.lock b/Cargo.lock index 55090160..ab64a671 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1387,6 +1387,7 @@ dependencies = [ "oncemutex", "owning_ref", "protobuf", + "regex", "rustc-hash", "rustc_version", "rustversion", @@ -1394,6 +1395,7 @@ dependencies = [ "slog", "slog-async", "slog-term", + "snafu", "synchronoise", "tempfile", "toml", @@ -1461,6 +1463,7 @@ dependencies = [ "lru", "mio", "rustc-hash", + "snafu", "uuid", ] @@ -2367,6 +2370,27 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "snafu" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1d4bced6a69f90b2056c03dcff2c4737f98d6fb9e0853493996e1d253ca29c6" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54254b8531cafa275c5e096f62d48c81435d1015405a91198ddb11e967301d40" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "socket2" version = "0.5.10" diff --git a/Cargo.toml b/Cargo.toml index 76efd2df..64d89165 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,5 +21,7 @@ crossbeam-channel = "0.5" futures = "0.3" ipnet = "2.12" lru = "0.16" +regex = "1" rustc-hash = "2" +snafu = "0.9" uuid = { version = "1.23", features = ["v4"] } diff --git a/core/Cargo.toml b/core/Cargo.toml index 80fd4a1c..27ea190f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,6 +66,7 @@ slog = "2" slog-async = "2" slog-term = "2" rustc-hash = { workspace = true } +snafu = { workspace = true } toml = "1" humantime = "2" byte-unit = "5" @@ -75,15 +76,15 @@ futures = { workspace = true } async-std = { workspace = true } executors = "0.10" lru = { workspace = true } +bytes = { workspace = true } +ipnet = { workspace = true } +regex = { workspace = true } # Optional protobuf = { version = "3", optional = true, features = ["with-bytes"] } serde = { version = "1.0", optional = true } core_affinity = { version = "0.8", optional = true } -bytes = { workspace = true } -ipnet = { workspace = true } - [dev-dependencies] tempfile = "3" serde = { version = "1.0", features = ["derive"] } diff --git a/core/src/actors/paths.rs b/core/src/actors/paths.rs index 6475e04f..7192ede9 100644 --- a/core/src/actors/paths.rs +++ b/core/src/actors/paths.rs @@ -72,11 +72,7 @@ impl fmt::Display for TransportParseError { } } -impl Error for TransportParseError { - fn description(&self) -> &str { - "Transport must be one of [local,tcp,udp]" - } -} +impl Error for TransportParseError {} /// Error type for parsing [paths](ActorPath) from a string #[derive(Debug, PartialEq, Eq, Clone)] @@ -105,11 +101,7 @@ impl fmt::Display for PathParseError { } impl Error for PathParseError { - fn description(&self) -> &str { - "Path could not be parsed" - } - - fn cause(&self) -> Option<&dyn Error> { + fn source(&self) -> Option<&(dyn Error + 'static)> { match self { PathParseError::Form(_) => None, PathParseError::Transport(e) => Some(e), diff --git a/core/src/dedicated_scheduler.rs b/core/src/dedicated_scheduler.rs index 0e237969..016726c8 100644 --- a/core/src/dedicated_scheduler.rs +++ b/core/src/dedicated_scheduler.rs @@ -1,6 +1,6 @@ use crate::{ component::{Component, ComponentDefinition, CoreContainer, SchedulingDecision}, - runtime::Scheduler, + runtime::{Scheduler, SchedulerShutdownError}, utils, }; use std::{ @@ -139,7 +139,7 @@ impl Scheduler for DedicatedThreadScheduler { self.stop.store(true, Ordering::Relaxed); } - fn shutdown(&self) -> Result<(), String> { + fn shutdown(&self) -> Result<(), SchedulerShutdownError> { self.stop.store(true, Ordering::Relaxed); loop { if self.stopped.load(Ordering::Relaxed) { @@ -150,7 +150,9 @@ impl Scheduler for DedicatedThreadScheduler { } } - fn shutdown_notify(&self) -> futures::future::BoxFuture<'static, Result<(), String>> { + fn shutdown_notify( + &self, + ) -> futures::future::BoxFuture<'static, Result<(), SchedulerShutdownError>> { let stop = self.stop.clone(); let stopped = self.stopped.clone(); async move { diff --git a/core/src/default_components.rs b/core/src/default_components.rs index d2c9ea30..62513bbb 100644 --- a/core/src/default_components.rs +++ b/core/src/default_components.rs @@ -81,7 +81,7 @@ impl SystemComponents for DefaultComponents { fn stop_notify<'a>( &'a self, system: &'a KompactSystem, - ) -> futures::future::BoxFuture<'a, Result<(), String>> { + ) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>> { async move { system.kill(self.dispatcher.clone()); system.kill(self.deadletter_box.clone()); @@ -126,17 +126,19 @@ impl TimerRefFactory for DefaultTimer { } impl TimerComponent for DefaultTimer { - fn shutdown(&self) -> Result<(), String> { + fn shutdown(&self) -> Result<(), TimerShutdownError> { self.inner .shutdown_async() - .map_err(|e| format!("Error during timer shutdown: {:?}", e)) + .map_err(TimerShutdownError::from_debug) } - fn shutdown_notify<'a>(&'a self) -> futures::future::BoxFuture<'a, Result<(), String>> { + fn shutdown_notify<'a>( + &'a self, + ) -> futures::future::BoxFuture<'a, Result<(), TimerShutdownError>> { future::ready( self.inner .shutdown_async() - .map_err(|e| format!("Error during timer shutdown: {:?}", e)), + .map_err(TimerShutdownError::from_debug), ) .boxed() } @@ -159,12 +161,14 @@ impl TimerRefFactory for ManualTimerComponent { } impl TimerComponent for ManualTimerComponent { - fn shutdown(&self) -> Result<(), String> { + fn shutdown(&self) -> Result<(), TimerShutdownError> { self.inner.stop(); Ok(()) } - fn shutdown_notify<'a>(&'a self) -> futures::future::BoxFuture<'a, Result<(), String>> { + fn shutdown_notify<'a>( + &'a self, + ) -> futures::future::BoxFuture<'a, Result<(), TimerShutdownError>> { // This actually locks a Mutex, so there's a small deadlock risk here. self.inner.stop(); future::ready(Ok(())).boxed() @@ -232,7 +236,7 @@ where fn stop_notify<'a>( &'a self, system: &'a KompactSystem, - ) -> futures::future::BoxFuture<'a, Result<(), String>> { + ) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>> { async move { // Gracefully stop the dispatcher/Network layer. system.stop(&self.dispatcher.clone()); diff --git a/core/src/lib.rs b/core/src/lib.rs index 4e7d993d..099dbec5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -235,7 +235,7 @@ pub mod prelude { TryLockError, TwoWayChannel, }, - runtime::{KompactConfig, KompactSystem, SystemHandle}, + runtime::{KompactConfig, KompactSystem, ShutdownError, SystemHandle}, supervision::{Fault, FaultContext, RecoveryHandler}, }; diff --git a/core/src/messaging/deser_macro.rs b/core/src/messaging/deser_macro.rs index 3f02be42..307d39c2 100644 --- a/core/src/messaging/deser_macro.rs +++ b/core/src/messaging/deser_macro.rs @@ -679,8 +679,8 @@ mod deser_macro_tests { fn deserialise(buf: &mut dyn Buf) -> Result { if buf.remaining() < 8 { - return Err(SerError::InvalidData( - "Less than 8bytes remaining in buffer!".to_string(), + return Err(SerError::invalid_data( + "Less than 8bytes remaining in buffer!", )); } let index = buf.get_u64(); @@ -799,7 +799,7 @@ mod deser_macro_tests { fn deserialise(buf: &mut dyn Buf) -> Result { let magic_byte = buf.get_u8(); if magic_byte != Self::MAGIC_BYTE { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Expected MAGIC_BYTE={}, but got {} instead", Self::MAGIC_BYTE, magic_byte diff --git a/core/src/messaging/framing.rs b/core/src/messaging/framing.rs index 11b80c8e..9b8d782a 100644 --- a/core/src/messaging/framing.rs +++ b/core/src/messaging/framing.rs @@ -77,7 +77,7 @@ impl TryFrom for AddressType { match x { x if x == AddressType::IPv4 as u8 => Ok(AddressType::IPv4), x if x == AddressType::IPv6 as u8 => Ok(AddressType::IPv6), - _ => Err(SerError::InvalidType("Unsupported AddressType".into())), + _ => Err(SerError::invalid_type("Unsupported AddressType")), } } } @@ -89,7 +89,7 @@ impl TryFrom for PathType { match x { x if x == PathType::Unique as u8 => Ok(PathType::Unique), x if x == PathType::Named as u8 => Ok(PathType::Named), - _ => Err(SerError::InvalidType("Unsupported PathType".into())), + _ => Err(SerError::invalid_type("Unsupported PathType")), } } } @@ -102,9 +102,7 @@ impl TryFrom for Transport { x if x == Transport::Local as u8 => Ok(Transport::Local), x if x == Transport::Udp as u8 => Ok(Transport::Udp), x if x == Transport::Tcp as u8 => Ok(Transport::Tcp), - _ => Err(SerError::InvalidType( - "Unsupported transport protocol".into(), - )), + _ => Err(SerError::invalid_type("Unsupported transport protocol")), } } } @@ -192,13 +190,13 @@ impl TryFrom for SystemPathHeader { let storage = [value]; let path_type = storage .get_as::() - .map_err(|_| SerError::InvalidData("System Path could not be read.".to_owned()))?; - let protocol = storage.get_as::().map_err(|_| { - SerError::InvalidData("System Path Transport could not be read.".to_owned()) - })?; - let address_type = storage.get_as::().map_err(|_| { - SerError::InvalidData("System Path AddressType could not be read.".to_owned()) - })?; + .map_err(|_| SerError::invalid_data("System Path could not be read."))?; + let protocol = storage + .get_as::() + .map_err(|_| SerError::invalid_data("System Path Transport could not be read."))?; + let address_type = storage + .get_as::() + .map_err(|_| SerError::invalid_data("System Path AddressType could not be read."))?; let header = SystemPathHeader { storage, @@ -286,8 +284,8 @@ fn system_path_from_buf(buf: &mut dyn Buf) -> Result<(SystemPathHeader, SystemPa let address: IpAddr = match header.address_type { AddressType::IPv4 => { if buf.remaining() < 4 { - return Err(SerError::InvalidData( - "Could not parse 4 bytes for IPv4 address".into(), + return Err(SerError::invalid_data( + "Could not parse 4 bytes for IPv4 address", )); } else { let mut ip_bytes = [0u8; 4]; @@ -297,8 +295,8 @@ fn system_path_from_buf(buf: &mut dyn Buf) -> Result<(SystemPathHeader, SystemPa } AddressType::IPv6 => { if buf.remaining() < 16 { - return Err(SerError::InvalidData( - "Could not parse 16 bytes for IPv6 address".into(), + return Err(SerError::invalid_data( + "Could not parse 16 bytes for IPv6 address", )); } else { let mut ip_bytes = [0u8; 16]; @@ -370,7 +368,7 @@ impl Serialisable for ActorPath { let path = np.path_ref().join("/"); let data = path.as_bytes(); let name_len: u16 = u16::try_from(data.len()).map_err(|_| { - SerError::InvalidData("Named path overflows designated 2 bytes length.".into()) + SerError::invalid_data("Named path overflows designated 2 bytes length.") })?; buf.put_u16(name_len); buf.put_slice(data); @@ -392,9 +390,7 @@ impl Deserialiser for ActorPath { let path = match header.path_type { PathType::Unique => { if buf.remaining() < 16 { - return Err(SerError::InvalidData( - "Could not get 16 bytes for UUID".into(), - )); + return Err(SerError::invalid_data("Could not get 16 bytes for UUID")); } else { let mut uuid_bytes = [0u8; 16]; buf.copy_to_slice(&mut uuid_bytes); @@ -405,7 +401,7 @@ impl Deserialiser for ActorPath { PathType::Named => { let name_len = buf.get_u16() as usize; if buf.remaining() < name_len { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Could not get {} bytes for path name", name_len ))); @@ -420,8 +416,8 @@ impl Deserialiser for ActorPath { }; let parts: Vec<&str> = name.split('/').collect(); if parts.is_empty() { - return Err(SerError::InvalidData( - "Could not determine name for Named path type".into(), + return Err(SerError::invalid_data( + "Could not determine name for Named path type", )); } else { let path = parts.into_iter().map(|s| s.to_string()).collect(); diff --git a/core/src/messaging/net_message.rs b/core/src/messaging/net_message.rs index 87189d1d..cffdf6db 100644 --- a/core/src/messaging/net_message.rs +++ b/core/src/messaging/net_message.rs @@ -489,7 +489,7 @@ impl NetData { match data { HeapOrSer::Boxed(boxed_ser) => { let b = boxed_ser.local().map_err(|_| { - UnpackError::DeserError(SerError::Unknown(format!( + UnpackError::DeserError(SerError::unknown(format!( "Serialisable with id={} can't be converted to local!", ser_id ))) diff --git a/core/src/net/buffers/encode_buffer.rs b/core/src/net/buffers/encode_buffer.rs index ecbcd716..4a4f2245 100644 --- a/core/src/net/buffers/encode_buffer.rs +++ b/core/src/net/buffers/encode_buffer.rs @@ -77,8 +77,8 @@ impl EncodeBuffer { self.buffer_pool.return_buffer(new_buffer); Ok(()) } else { - Err(SerError::NoBuffersAvailable( - "No Available Buffers in BufferPool".to_string(), + Err(SerError::no_buffers_available( + "No Available Buffers in BufferPool", )) } } @@ -166,7 +166,7 @@ impl<'a> BufferEncoder<'a> { /// This method may perform an eager-swap of the underlying buffer to avoid chaining ChunkLeases. /// /// Also ensures that there are available buffers to fit the entire message into. - /// Returns `SerError::NoBuffersAvailable` error if there are no available buffers to fit the + /// Returns a no-buffers-available [SerError] if there are no available buffers to fit the /// entire message into. pub(crate) fn try_reserve(&mut self, size: usize) -> Result<(), SerError> { // Length of BufferChunks in this encode_buffer @@ -184,7 +184,7 @@ impl<'a> BufferEncoder<'a> { self.encode_buffer .buffer_pool .try_reserve(size - remaining) - .map_err(|e| SerError::NoBuffersAvailable(e.to_string())) + .map_err(|e| SerError::no_buffers_available(e.to_string())) } } @@ -215,7 +215,7 @@ impl<'a> BufferEncoder<'a> { // No chain just return what's written into the active buffers self.encode_buffer .get_chunk_lease() - .ok_or_else(|| SerError::InvalidData("No data written".to_string())) + .ok_or_else(|| SerError::invalid_data("No data written")) } } diff --git a/core/src/runtime/errors.rs b/core/src/runtime/errors.rs new file mode 100644 index 00000000..05fdb1ae --- /dev/null +++ b/core/src/runtime/errors.rs @@ -0,0 +1,376 @@ +use std::{error, fmt}; + +use snafu::IntoError; + +/// Internal display helper for optional SNAFU backtraces. +/// +/// This is public so extension crates can reuse the same diagnostic formatting, +/// but it is not part of Kompact's supported public API. +#[doc(hidden)] +pub struct BacktraceSuffix<'a>(pub Option<&'a snafu::Backtrace>); + +impl fmt::Display for BacktraceSuffix<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(backtrace) = self.0 { + write!(f, "\nBacktrace:\n{backtrace}")?; + } + Ok(()) + } +} + +/// Internal display helper for a single source cause. +#[doc(hidden)] +pub struct SourceCause<'a>( + pub &'a dyn fmt::Display, + pub Option<&'a snafu::Location>, + pub Option<&'a snafu::Backtrace>, +); + +impl fmt::Display for SourceCause<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "\n Caused by: {}", self.0)?; + if let Some(location) = self.1 { + write!(f, " ({location})")?; + } + write!(f, ".{}", BacktraceSuffix(self.2)) + } +} + +/// Internal display helper for source chains that already include causes. +#[doc(hidden)] +pub struct NestedCause<'a>(pub &'a dyn fmt::Display); + +impl fmt::Display for NestedCause<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let rendered = self.0.to_string(); + let mut lines = rendered.lines(); + if let Some(first) = lines.next() { + write!(f, "\n Caused by: {first}")?; + } + for line in lines { + write!(f, "\n{line}")?; + } + Ok(()) + } +} + +/// Error returned when a scheduler fails to shut down. +#[derive(Debug, snafu::Snafu)] +pub struct SchedulerShutdownError(SchedulerShutdownErrorInner); + +#[derive(Debug, snafu::Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(name(SchedulerShutdownSnafu)))] +#[snafu(display( + "scheduler shutdown failed ({location}).{}", + SourceCause(source.as_ref(), Some(location), backtrace.as_ref()) +))] +pub(crate) struct SchedulerShutdownErrorInner { + #[snafu(source(from(String, Into::into)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, +} + +impl SchedulerShutdownError { + /// Construct a scheduler shutdown error from a message. + #[track_caller] + pub fn message(message: impl Into) -> Self { + SchedulerShutdownSnafu.into_error(message.into()).into() + } +} + +impl From for SchedulerShutdownError { + fn from(message: String) -> Self { + Self::message(message) + } +} + +impl From<&str> for SchedulerShutdownError { + fn from(message: &str) -> Self { + Self::message(message) + } +} + +/// Error returned when a timer fails to shut down. +#[derive(Debug, snafu::Snafu)] +pub struct TimerShutdownError(TimerShutdownErrorInner); + +#[derive(Debug, snafu::Snafu)] +#[snafu(visibility(pub(crate)))] +#[snafu(context(name(TimerShutdownSnafu)))] +#[snafu(display( + "timer shutdown failed ({location}).{}", + SourceCause(source.as_ref(), Some(location), backtrace.as_ref()) +))] +pub(crate) struct TimerShutdownErrorInner { + #[snafu(source(from(String, Into::into)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, +} + +impl TimerShutdownError { + /// Construct a timer shutdown error from a message. + #[track_caller] + pub fn message(message: impl Into) -> Self { + TimerShutdownSnafu.into_error(message.into()).into() + } + + /// Construct a timer shutdown error from a debug-only third-party error. + #[track_caller] + pub fn from_debug(error: impl fmt::Debug) -> Self { + Self::message(format!("{:?}", error)) + } +} + +impl From for TimerShutdownError { + fn from(message: String) -> Self { + Self::message(message) + } +} + +impl From<&str> for TimerShutdownError { + fn from(message: &str) -> Self { + Self::message(message) + } +} + +/// Error returned when system components fail to shut down. +#[derive(Debug, snafu::Snafu)] +pub struct SystemComponentsShutdownError(SystemComponentsShutdownErrorKind); + +#[derive(Debug, snafu::Snafu)] +#[snafu(module(system_components_shutdown_error), visibility(pub(crate)))] +pub(crate) enum SystemComponentsShutdownErrorKind { + #[snafu(display( + "system components shutdown failed ({location}).{}", + SourceCause(source.as_ref(), Some(location), backtrace.as_ref()) + ))] + Message { + #[snafu(source(from(String, Into::into)))] + source: Box, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "system components shutdown failed ({location}).{}{}", + SourceCause(message, Some(location), None), + SourceCause(source.as_ref(), None, backtrace.as_ref()) + ))] + Source { + message: String, + source: Box, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, +} + +impl SystemComponentsShutdownError { + /// Construct a system-components shutdown error from a message. + #[track_caller] + pub fn message(message: impl Into) -> Self { + system_components_shutdown_error::MessageSnafu + .into_error(message.into()) + .into() + } + + /// Construct a system-components shutdown error with a source. + #[track_caller] + pub fn source( + message: impl Into, + source: impl error::Error + Send + Sync + 'static, + ) -> Self { + let source: Box = Box::new(source); + system_components_shutdown_error::SourceSnafu { + message: message.into(), + } + .into_error(source) + .into() + } +} + +impl From for SystemComponentsShutdownError { + fn from(message: String) -> Self { + Self::message(message) + } +} + +impl From<&str> for SystemComponentsShutdownError { + fn from(message: &str) -> Self { + Self::message(message) + } +} + +/// Error returned when a Kompact system fails to shut down. +#[derive(Debug, snafu::Snafu)] +pub struct ShutdownError(Box); + +#[derive(Debug, snafu::Snafu)] +#[snafu(module(shutdown_error), visibility(pub(crate)))] +pub(crate) enum ShutdownErrorKind { + #[snafu(display( + "system shutdown failed while stopping system components ({location}).{}", + NestedCause(source) + ))] + SystemComponents { + #[snafu(backtrace)] + source: SystemComponentsShutdownError, + #[snafu(implicit)] + location: snafu::Location, + }, + #[snafu(display( + "system shutdown failed while stopping timer ({location}).{}", + NestedCause(source) + ))] + Timer { + #[snafu(backtrace)] + source: TimerShutdownError, + #[snafu(implicit)] + location: snafu::Location, + }, + #[snafu(display( + "system shutdown failed while stopping scheduler ({location}).{}", + NestedCause(source) + ))] + Scheduler { + #[snafu(backtrace)] + source: SchedulerShutdownError, + #[snafu(implicit)] + location: snafu::Location, + }, +} + +impl From for ShutdownError { + #[track_caller] + fn from(source: SystemComponentsShutdownError) -> Self { + shutdown_error::SystemComponentsSnafu + .into_error(source) + .into() + } +} + +impl From for ShutdownError { + #[track_caller] + fn from(source: TimerShutdownError) -> Self { + shutdown_error::TimerSnafu.into_error(source).into() + } +} + +impl From for ShutdownError { + #[track_caller] + fn from(source: SchedulerShutdownError) -> Self { + shutdown_error::SchedulerSnafu.into_error(source).into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_support::assert_display_matches; + use std::error::Error; + + #[test] + fn opaque_shutdown_error_preserves_source_chain() { + let scheduler_error: SchedulerShutdownError = SchedulerShutdownSnafu + .into_error("pool did not stop".to_string()) + .into(); + let error: ShutdownError = shutdown_error::SchedulerSnafu + .into_error(scheduler_error) + .into(); + + assert_display_matches( + &error.to_string(), + "\ +system shutdown failed while stopping scheduler {LOC}. + Caused by: scheduler shutdown failed {LOC}. + Caused by: pool did not stop {LOC}.", + file!(), + ); + let source = error.source().expect("shutdown error should expose source"); + assert_display_matches( + &source.to_string(), + "\ +scheduler shutdown failed {LOC}. + Caused by: pool did not stop {LOC}.", + file!(), + ); + } + + #[test] + fn shutdown_display_includes_context_source_and_locations() { + let source = std::io::Error::other("promise channel disconnected"); + let components_error: SystemComponentsShutdownError = + system_components_shutdown_error::SourceSnafu { + message: "supervisor shutdown promise was dropped".to_string(), + } + .into_error(Box::new(source)) + .into(); + let error: ShutdownError = shutdown_error::SystemComponentsSnafu + .into_error(components_error) + .into(); + + assert_display_matches( + &error.to_string(), + "\ +system shutdown failed while stopping system components {LOC}. + Caused by: system components shutdown failed {LOC}. + Caused by: supervisor shutdown promise was dropped {LOC}. + Caused by: promise channel disconnected.", + file!(), + ); + } + + #[test] + fn shutdown_error_exposes_printable_source_chain() { + let source = std::io::Error::other("promise channel disconnected"); + let components_error: SystemComponentsShutdownError = + system_components_shutdown_error::SourceSnafu { + message: "supervisor shutdown promise was dropped".to_string(), + } + .into_error(Box::new(source)) + .into(); + let error: ShutdownError = shutdown_error::SystemComponentsSnafu + .into_error(components_error) + .into(); + + let mut messages = Vec::new(); + let mut current: Option<&(dyn Error + 'static)> = Some(&error); + while let Some(error) = current { + messages.push(error.to_string()); + current = error.source(); + } + + assert_eq!(messages.len(), 3); + assert_display_matches( + &messages[0], + "\ +system shutdown failed while stopping system components {LOC}. + Caused by: system components shutdown failed {LOC}. + Caused by: supervisor shutdown promise was dropped {LOC}. + Caused by: promise channel disconnected.", + file!(), + ); + assert_display_matches( + &messages[1], + "\ +system components shutdown failed {LOC}. + Caused by: supervisor shutdown promise was dropped {LOC}. + Caused by: promise channel disconnected.", + file!(), + ); + assert_eq!("promise channel disconnected", messages[2]); + } + + #[test] + fn backtrace_suffix_prints_backtrace_when_available() { + let backtrace = snafu::Backtrace::force_capture(); + let rendered = BacktraceSuffix(Some(&backtrace)).to_string(); + + assert!(rendered.contains("Backtrace:")); + } +} diff --git a/core/src/runtime/mod.rs b/core/src/runtime/mod.rs index abcecc0f..247c32f6 100644 --- a/core/src/runtime/mod.rs +++ b/core/src/runtime/mod.rs @@ -14,11 +14,14 @@ use std::{ use crate::config::{ConfigError, ConfigLoadingError}; mod config; +mod errors; mod lifecycle; mod scheduler; mod system; pub use config::*; +pub use errors::*; +pub(crate) use errors::{shutdown_error, system_components_shutdown_error}; pub use scheduler::*; pub use system::*; diff --git a/core/src/runtime/scheduler.rs b/core/src/runtime/scheduler.rs index 89e77245..ae946f43 100644 --- a/core/src/runtime/scheduler.rs +++ b/core/src/runtime/scheduler.rs @@ -27,14 +27,16 @@ pub trait Scheduler: Send + Sync { /// /// Implementations must only return when the pool /// has been shut down, or upon an error. - fn shutdown(&self) -> Result<(), String>; + fn shutdown(&self) -> Result<(), SchedulerShutdownError>; /// Shut this pool down and complete once the pool has stopped. /// /// The default implementation delegates the existing blocking shutdown to a /// blocking task. Scheduler implementations that can signal completion /// without blocking should override this method. - fn shutdown_notify(&self) -> futures::future::BoxFuture<'static, Result<(), String>> { + fn shutdown_notify( + &self, + ) -> futures::future::BoxFuture<'static, Result<(), SchedulerShutdownError>> { let scheduler = self.box_clone(); async move { async_std::task::spawn_blocking(move || scheduler.shutdown()).await }.boxed() } @@ -93,8 +95,10 @@ impl Scheduler for ExecutorScheduler { self.exec.shutdown_async() } - fn shutdown(&self) -> Result<(), String> { - self.exec.shutdown_borrowed() + fn shutdown(&self) -> Result<(), SchedulerShutdownError> { + self.exec + .shutdown_borrowed() + .map_err(SchedulerShutdownError::from) } fn box_clone(&self) -> Box { diff --git a/core/src/runtime/system.rs b/core/src/runtime/system.rs index a731c526..6d651666 100644 --- a/core/src/runtime/system.rs +++ b/core/src/runtime/system.rs @@ -30,6 +30,7 @@ use std::{ }; use futures::FutureExt; +use snafu::ResultExt; const STARTUP_POLL_INTERVAL: Duration = Duration::from_millis(50); @@ -968,11 +969,17 @@ impl KompactSystem { /// let system = KompactConfig::default().build().wait().expect("system"); /// system.shutdown().wait().expect("shutdown"); /// ``` - pub fn shutdown(self) -> impl Future> + Send + Unpin + 'static { + pub fn shutdown( + self, + ) -> impl Future> + Send + Unpin + 'static { async move { self.inner.assert_active(); self.inner.shutdown_notify(&self).await?; - self.scheduler.shutdown_notify().await + self.scheduler + .shutdown_notify() + .await + .context(shutdown_error::SchedulerSnafu)?; + Ok(()) } .boxed() } @@ -983,10 +990,12 @@ impl KompactSystem { /// shutdown more immediate and brutal. /// /// Remote systems will perceive this system as crashed. - pub fn kill_system(self) -> Result<(), String> { + pub fn kill_system(self) -> Result<(), ShutdownError> { self.inner.assert_active(); self.inner.kill(&self)?; - self.scheduler.shutdown()?; + self.scheduler + .shutdown() + .context(shutdown_error::SchedulerSnafu)?; Ok(()) } @@ -1029,7 +1038,10 @@ impl KompactSystem { pub fn shutdown_async(&self) -> () { let sys = self.clone(); async_std::task::spawn(async move { - sys.shutdown().await.expect("shutdown"); + let logger = sys.inner.logger().clone(); + if let Err(error) = sys.shutdown().await { + error!(logger, "async system shutdown failed"; "error" => format!("{}", error)); + } }); } @@ -1601,7 +1613,7 @@ pub trait SystemComponents: Send + Sync + 'static { fn stop_notify<'a>( &'a self, system: &'a KompactSystem, - ) -> futures::future::BoxFuture<'a, Result<(), String>>; + ) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>>; /// Stop all the system components as fast as possible, no graceful shutdown fn kill(&self, _system: &KompactSystem) -> (); /// Allow downcasting to concrete type @@ -1652,10 +1664,12 @@ impl dyn SystemComponents { /// Extra trait for timers to implement pub trait TimerComponent: TimerRefFactory + Send + Sync { /// Stop the underlying timer thread - fn shutdown(&self) -> Result<(), String>; + fn shutdown(&self) -> Result<(), TimerShutdownError>; /// Stop the underlying timer thread from an async shutdown path - fn shutdown_notify<'a>(&'a self) -> futures::future::BoxFuture<'a, Result<(), String>>; + fn shutdown_notify<'a>( + &'a self, + ) -> futures::future::BoxFuture<'a, Result<(), TimerShutdownError>>; } struct InternalComponents { @@ -1711,12 +1725,16 @@ impl InternalComponents { fn stop_notify<'a>( &'a self, system: &'a KompactSystem, - ) -> futures::future::BoxFuture<'a, Result<(), String>> { + ) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>> { async move { let (p, f) = utils::promise(); self.supervision_port .enqueue(SupervisorMsg::Shutdown(Arc::new(Mutex::new(p)))); - f.await.map_err(|e| e.to_string())?; + f.await + .boxed() + .with_context(|_| system_components_shutdown_error::SourceSnafu { + message: "supervisor shutdown promise was dropped".to_string(), + })?; self.system_components.stop_notify(system).await } .boxed() @@ -1907,31 +1925,40 @@ impl KompactRuntime { fn shutdown_notify<'a>( &'a self, system: &'a KompactSystem, - ) -> futures::future::BoxFuture<'a, Result<(), String>> { + ) -> futures::future::BoxFuture<'a, Result<(), ShutdownError>> { async move { - match *self.internal_components { - Some(ref ic) => { - ic.stop_notify(system).await?; - } - None => panic!("KompactRuntime was not initialised at shutdown!"), - } - let res = self.timer.shutdown_notify().await; + let ic = (*self.internal_components) + .as_ref() + .expect("KompactRuntime was not initialised at shutdown!"); + let components_result = ic + .stop_notify(system) + .await + .context(shutdown_error::SystemComponentsSnafu) + .map_err(ShutdownError::from); + let timer_result = self + .timer + .shutdown_notify() + .await + .context(shutdown_error::TimerSnafu) + .map_err(ShutdownError::from); lifecycle::set_destroyed(self.state()); - res + components_result.and(timer_result) } .boxed() } - fn kill(&self, system: &KompactSystem) -> Result<(), String> { - match *self.internal_components { - Some(ref ic) => { - ic.kill(system); - } - None => panic!("KompactRuntime was not initialised at shutdown!"), - } - let res = self.timer.shutdown(); + fn kill(&self, system: &KompactSystem) -> Result<(), ShutdownError> { + let ic = (*self.internal_components) + .as_ref() + .expect("KompactRuntime was not initialised at shutdown!"); + ic.kill(system); + let result = self + .timer + .shutdown() + .context(shutdown_error::TimerSnafu) + .map_err(ShutdownError::from); lifecycle::set_destroyed(self.state()); - res + result } pub(crate) fn poison(&self) { diff --git a/core/src/serialisation/core.rs b/core/src/serialisation/core.rs index 50e36d15..368ac1b5 100644 --- a/core/src/serialisation/core.rs +++ b/core/src/serialisation/core.rs @@ -1,81 +1,203 @@ use super::*; -use std::error::Error; - -/// Errors that can be thrown during serialisation or deserialisation -#[derive(Debug)] -pub enum SerError { - /// The data was invalid, corrupted, or otherwise not as expected - InvalidData(String), - /// The data represents the wrong type, or an unknown type - InvalidType(String), - /// The Buffer we're serializing into failed - BufferError(String), - /// No Buffer Available. +use std::{error::Error, fmt}; + +use crate::runtime::BacktraceSuffix; +use snafu::IntoError; + +/// Stable categories for serialisation and deserialisation failures. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SerErrorKind { + /// The data was invalid, corrupted, or otherwise not as expected. + InvalidData, + /// The data represents the wrong type, or an unknown type. + InvalidType, + /// The buffer used for serialisation failed. + Buffer, + /// No buffer was available. /// - /// Raised by actors attempting to send more messages than their Buffers allow - /// or what the Network is able to handle. - NoBuffersAvailable(String), - /// Type can not be cloned + /// Raised by actors attempting to send more messages than their buffers allow + /// or what the network is able to handle. + NoBuffersAvailable, + /// The type can not be cloned. NoClone, - /// Any other kind of error - Unknown(String), /// An error forwarded from a third-party crate. + ThirdParty, + /// Any other kind of serialisation error. + Unknown, +} + +/// Errors that can be thrown during serialisation or deserialisation. +#[derive(Debug, snafu::Snafu)] +pub struct SerError(SerErrorInner); + +#[derive(Debug, snafu::Snafu)] +#[snafu(module)] +enum SerErrorInner { + #[snafu(display( + "The provided data was not appropriate for (de-)serialisation: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + InvalidData { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "The provided type was not appropriate for (de-)serialisation: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + InvalidType { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "An issue occurred with the serialisation buffers: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + Buffer { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "Serialising into a BufferPool with no available buffers: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + NoBuffersAvailable { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "The provided type can not be cloned, but try_clone() was attempted ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + NoClone { + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "A serialisation error occurred in a third-party crate while {context}: {source} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] ThirdParty { - /// Which crate did the error originate from. context: String, - /// The actual underlying error. source: Box, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "A serialisation error occurred: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + Unknown { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, }, } impl SerError { + /// Return the stable category of this serialisation error. + pub fn kind(&self) -> SerErrorKind { + match &self.0 { + SerErrorInner::InvalidData { .. } => SerErrorKind::InvalidData, + SerErrorInner::InvalidType { .. } => SerErrorKind::InvalidType, + SerErrorInner::Buffer { .. } => SerErrorKind::Buffer, + SerErrorInner::NoBuffersAvailable { .. } => SerErrorKind::NoBuffersAvailable, + SerErrorInner::NoClone { .. } => SerErrorKind::NoClone, + SerErrorInner::ThirdParty { .. } => SerErrorKind::ThirdParty, + SerErrorInner::Unknown { .. } => SerErrorKind::Unknown, + } + } + + /// Construct an invalid-data serialisation error. + #[track_caller] + pub fn invalid_data(message: impl Into) -> Self { + ser_error_inner::InvalidDataSnafu { + message: message.into(), + } + .build() + .into() + } + + /// Construct an invalid-type serialisation error. + #[track_caller] + pub fn invalid_type(message: impl Into) -> Self { + ser_error_inner::InvalidTypeSnafu { + message: message.into(), + } + .build() + .into() + } + + /// Construct a serialisation buffer error. + #[track_caller] + pub fn buffer(message: impl Into) -> Self { + ser_error_inner::BufferSnafu { + message: message.into(), + } + .build() + .into() + } + + /// Construct a no-buffers-available serialisation error. + #[track_caller] + pub fn no_buffers_available(message: impl Into) -> Self { + ser_error_inner::NoBuffersAvailableSnafu { + message: message.into(), + } + .build() + .into() + } + + /// Construct a no-clone serialisation error. + #[track_caller] + pub fn no_clone() -> Self { + ser_error_inner::NoCloneSnafu.build().into() + } + + /// Construct a third-party serialisation error. + #[track_caller] + pub fn third_party(context: impl Into, source: impl Error + 'static) -> Self { + let source: Box = Box::new(source); + ser_error_inner::ThirdPartySnafu { + context: context.into(), + } + .into_error(source) + .into() + } + + /// Construct an unknown serialisation error. + #[track_caller] + pub fn unknown(message: impl Into) -> Self { + ser_error_inner::UnknownSnafu { + message: message.into(), + } + .build() + .into() + } + /// Create a serialisation error from any kind of error that /// implements the [Debug](std::fmt::Debug) trait /// - /// This always produces the [Unknown](SerError::Unknown) variant. + /// This always produces the [Unknown](SerErrorKind::Unknown) kind. + #[track_caller] pub fn from_debug(error: E) -> SerError { let msg = format!("Wrapped error: {:?}", error); - SerError::Unknown(msg) - } -} - -impl fmt::Display for SerError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SerError::BufferError(s) => { - write!(f, "An issue occurred with the serialisation buffers: {}", s) - } - SerError::InvalidData(s) => write!( - f, - "The provided data was not appropriate for (de-)serialisation: {}", - s - ), - SerError::InvalidType(s) => write!( - f, - "The provided type was not appropriate for (de-)serialisation: {}", - s - ), - SerError::NoClone => write!( - f, - "The provided type can not be cloned, but try_clone() was attempted" - ), - SerError::NoBuffersAvailable(s) => write!( - f, - "Serialising into a BufferPool with no available buffers: {}", - s - ), - SerError::Unknown(s) => write!(f, "A serialisation error occurred: {}", s), - SerError::ThirdParty { context, source } => write!( - f, - "A serialisation error occurred in a third party crate: {context}\n Caused by {source}" - ), - } + SerError::unknown(msg) } } -impl std::error::Error for SerError {} - /// A trait that acts like a stable `TypeId` for serialisation /// /// Requires ids to be assigned manually in some consistent fashion @@ -310,3 +432,30 @@ pub trait Deserialisable { /// Returns a [SerError](SerError) if unsuccessful. fn get_deserialised(self) -> Result; } + +#[cfg(test)] +mod tests { + use super::*; + use std::{error::Error, io}; + + #[test] + fn ser_error_reports_stable_kind() { + let error = SerError::invalid_data("bad payload"); + + assert_eq!(SerErrorKind::InvalidData, error.kind()); + assert!(error.to_string().contains("bad payload")); + assert!(error.to_string().contains("core/src/serialisation/core.rs")); + } + + #[test] + fn ser_error_preserves_third_party_source() { + let source = io::Error::new(io::ErrorKind::InvalidData, "external failure"); + let error = SerError::third_party("test", source); + + assert_eq!(SerErrorKind::ThirdParty, error.kind()); + assert_eq!( + "external failure", + error.source().expect("third-party source").to_string() + ); + } +} diff --git a/core/src/serialisation/mod.rs b/core/src/serialisation/mod.rs index 5bc601f5..6eba60eb 100644 --- a/core/src/serialisation/mod.rs +++ b/core/src/serialisation/mod.rs @@ -1,8 +1,5 @@ use bytes::{Buf, BufMut}; -use std::{ - any::Any, - fmt::{self, Debug}, -}; +use std::{any::Any, fmt::Debug}; use super::*; diff --git a/core/src/serialisation/protobuf_serialisers.rs b/core/src/serialisation/protobuf_serialisers.rs index 592f9416..dce076fc 100644 --- a/core/src/serialisation/protobuf_serialisers.rs +++ b/core/src/serialisation/protobuf_serialisers.rs @@ -17,10 +17,8 @@ impl Serialiser for ProtobufSer { fn serialise(&self, v: &M, buf: &mut dyn BufMut) -> Result<(), SerError> { let mut w = buf.writer(); - v.write_to_writer(&mut w).map_err(|e| SerError::ThirdParty { - context: "protobuf".to_string(), - source: Box::new(e), - }) + v.write_to_writer(&mut w) + .map_err(|e| SerError::third_party("protobuf", e)) } } @@ -50,10 +48,7 @@ impl Deserialisable for ProtobufDeser m.merge_from_bytes(b.chunk()) } }; - let r = pr.map_err(|e| SerError::ThirdParty { - context: "protobuf".to_string(), - source: Box::new(e), - }); + let r = pr.map_err(|e| SerError::third_party("protobuf", e)); r.map(|_| m) } } diff --git a/core/src/serialisation/ser_helpers.rs b/core/src/serialisation/ser_helpers.rs index 2a0f881a..7120507c 100644 --- a/core/src/serialisation/ser_helpers.rs +++ b/core/src/serialisation/ser_helpers.rs @@ -33,7 +33,7 @@ pub fn serialise_to_msg( Err(ser_err) => Err(ser_err), } } else { - Err(SerError::Unknown("Unknown serialisation size".into())) + Err(SerError::unknown("Unknown serialisation size")) } } @@ -53,7 +53,7 @@ where data: buf.freeze(), }) } else { - Err(SerError::Unknown("Unknown serialisation size".into())) + Err(SerError::unknown("Unknown serialisation size")) } } @@ -74,7 +74,7 @@ where data: buf.freeze(), }) } else { - Err(SerError::Unknown("Unknown serialisation size".into())) + Err(SerError::unknown("Unknown serialisation size")) } } @@ -235,7 +235,7 @@ pub fn embed_msg(msg: NetMessage, buf: &mut BufferEncoder) -> Result Result { // if buffer.remaining() < 1 { - // return Err(SerError::InvalidData("Not enough bytes available".into())); + // return Err(SerError::invalid_data("Not enough bytes available")); // } // gonna fail below anyway @@ -253,7 +253,7 @@ pub fn deserialise_chunk_lease(mut buffer: ChunkLease) -> Result Result { // if buffer.remaining() < 1 { - // return Err(SerError::InvalidData("Not enough bytes available".into())); + // return Err(SerError::invalid_data("Not enough bytes available")); // } // gonna fail below anyway diff --git a/core/src/serialisation/serde_serialisers.rs b/core/src/serialisation/serde_serialisers.rs index dddd120a..a8ae4f4a 100644 --- a/core/src/serialisation/serde_serialisers.rs +++ b/core/src/serialisation/serde_serialisers.rs @@ -13,7 +13,7 @@ use serde::{ }, *, }; -use std::convert::TryInto; +use std::{convert::TryInto, fmt}; /// Serialiser type for Serde enabled types /// @@ -226,7 +226,7 @@ impl<'a> Serializer for BufSerializer<'a> { // support sequences for which the length is known up front. fn serialize_seq(self, len: Option) -> Result { let len = len.ok_or_else(|| { - SerError::InvalidData("Sequence length must be known ahead of time!".into()) + SerError::invalid_data("Sequence length must be known ahead of time!") })?; self.buffer.put_u64(len as u64); Ok(self) @@ -262,9 +262,8 @@ impl<'a> Serializer for BufSerializer<'a> { // Maps are represented as sequences of key-value pairs fn serialize_map(self, len: Option) -> Result { - let len = len.ok_or_else(|| { - SerError::InvalidData("Map length must be known ahead of time!".into()) - })?; + let len = + len.ok_or_else(|| SerError::invalid_data("Map length must be known ahead of time!"))?; self.buffer.put_u64(len as u64); Ok(self) } @@ -450,13 +449,13 @@ impl<'a> ser::SerializeStructVariant for BufSerializer<'a> { impl ser::Error for SerError { fn custom(msg: T) -> Self { - SerError::Unknown(msg.to_string()) + SerError::unknown(msg.to_string()) } } impl de::Error for SerError { fn custom(msg: T) -> Self { - SerError::Unknown(msg.to_string()) + SerError::unknown(msg.to_string()) } } @@ -582,7 +581,7 @@ impl<'de, 'a> de::Deserializer<'de> for &'a mut BufDeserializer<'de> { { let num = self.buffer.get_u32(); let v = std::char::from_u32(num).ok_or_else(|| { - SerError::Unknown(format!("Number {} does not represent a valid char!", num)) + SerError::unknown(format!("Number {} does not represent a valid char!", num)) })?; visitor.visit_char(v) } diff --git a/core/src/test_support.rs b/core/src/test_support.rs index 56d3e89c..e7f63cd0 100644 --- a/core/src/test_support.rs +++ b/core/src/test_support.rs @@ -9,6 +9,7 @@ use crate::{ runtime::{self, KompactConfig, KompactSystem}, utils::BlockingFutureExt, }; +use regex::Regex; use slog::{Drain, Logger, PushFnValue, o}; use std::{ io::{self, Write}, @@ -76,6 +77,28 @@ pub fn build_test_kompact_system() -> KompactSystem { test_kompact_config().build().wait().expect("system") } +/// Asserts that an error display string matches `expected`. +/// +/// The expected string may include `{LOC}` placeholders. Each placeholder is +/// matched against `(::)`, with line and column +/// left flexible for stable location-aware error tests. +/// +/// # Panics +/// +/// Panics if the expected pattern does not compile or `actual` does not match. +pub fn assert_display_matches(actual: &str, expected: &str, location_file: &str) { + let location = format!(r"\({}:\d+:\d+\)", regex::escape(location_file)); + let pattern = format!( + "^{}$", + regex::escape(expected).replace(r"\{LOC\}", &location) + ); + let regex = Regex::new(&pattern).expect("expected display regex should compile"); + assert!( + regex.is_match(actual), + "expected display to match:\n{expected}\n\nactual:\n{actual}\n\nregex:\n{pattern}", + ); +} + static CAPTURED_LOGGER: CapturedLogLogger = CapturedLogLogger; struct CapturedLogLogger; diff --git a/docs/src/distributed/serialisation.md b/docs/src/distributed/serialisation.md index 7684c150..ab05d181 100644 --- a/docs/src/distributed/serialisation.md +++ b/docs/src/distributed/serialisation.md @@ -118,4 +118,4 @@ To show an easy usage for this approach, we use eager serialisation in the `Boot {{#rustdoc_include ../../examples/net/src/bin/serialisation.rs:tell_serialised}} ``` -As you can see above, another feature of eager serialisation is that you can (and must) deal with serialisaition errors, which you have no control over using lazy serialisation. In particular, your memory allocation may prevent your local buffer pool from allocating a buffer large enough to fit your data at the time of serialisation. In this case you will get a `SerError::BufferError` and must decide how to handle that. You could either retry at a later time, or switch to *lazy serialisation* and hope the network's buffers still have capacity (assuming they likely have priority over component local buffer pools). +As you can see above, another feature of eager serialisation is that you can (and must) deal with serialisation errors, which you have no control over using lazy serialisation. In particular, your memory allocation may prevent your local buffer pool from allocating a buffer large enough to fit your data at the time of serialisation. In this case `SerError::kind()` returns `SerErrorKind::NoBuffersAvailable`, and you must decide how to handle that. You could either retry at a later time, or switch to *lazy serialisation* and hope the network's buffers still have capacity (assuming they likely have priority over component local buffer pools). diff --git a/network/Cargo.toml b/network/Cargo.toml index d9f02059..aafbf194 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -31,4 +31,5 @@ iprange = "0.6" lru = { workspace = true } mio = { version = "1", features = ["net", "os-poll"] } rustc-hash = { workspace = true } +snafu = { workspace = true } uuid = { workspace = true } diff --git a/network/src/dispatcher.rs b/network/src/dispatcher.rs index 27840a9d..d3ab4186 100644 --- a/network/src/dispatcher.rs +++ b/network/src/dispatcher.rs @@ -27,7 +27,7 @@ use crate::{ RegistrationEvent, RegistrationPromise, }, - net::{ConnectionState, NetworkBridgeErr, Protocol, SessionId, SocketAddr, buffers::*}, + net::{ConnectionState, NetworkBridgeError, Protocol, SessionId, SocketAddr, buffers::*}, queue_manager::QueueManager, }; use arc_swap::ArcSwap; @@ -676,7 +676,7 @@ impl NetworkDispatcher { &mut self, addr: SocketAddr, data: DispatchData, - ) -> Result<(), NetworkBridgeErr> { + ) -> Result<(), NetworkBridgeError> { if let Some(bridge) = &self.net_bridge { bridge.route(addr, data, net::Protocol::Udp)?; } else { @@ -692,7 +692,7 @@ impl NetworkDispatcher { &mut self, addr: SocketAddr, data: DispatchData, - ) -> Result<(), NetworkBridgeErr> { + ) -> Result<(), NetworkBridgeError> { let state: &mut ConnectionState = self.connections.entry(addr).or_insert(ConnectionState::New); let next: Option = match *state { @@ -782,7 +782,7 @@ impl NetworkDispatcher { /// Forwards `msg` to destination described by `dst`, routing it across the network /// if needed. - fn route(&mut self, dst: ActorPath, msg: DispatchData) -> Result<(), NetworkBridgeErr> { + fn route(&mut self, dst: ActorPath, msg: DispatchData) -> Result<(), NetworkBridgeError> { if self.system_path_ref() == dst.system() { self.route_local(dst, msg); Ok(()) diff --git a/network/src/lib.rs b/network/src/lib.rs index 992f785b..461bf23f 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -34,7 +34,7 @@ pub mod net { pub use crate::transport::{ Bridge, ConnectionState, - NetworkBridgeErr, + NetworkBridgeError, Protocol, events, network_thread, diff --git a/network/src/transport/mod.rs b/network/src/transport/mod.rs index be9a21e7..e7f47a87 100644 --- a/network/src/transport/mod.rs +++ b/network/src/transport/mod.rs @@ -12,7 +12,9 @@ use crate::{ }; use crossbeam_channel::{RecvError, SendError, Sender, unbounded as channel}; use ipnet::IpNet; +use kompact::runtime::{BacktraceSuffix, SourceCause}; use mio::{Interest, Waker}; +use snafu::{IntoError, ResultExt}; pub use std::net::SocketAddr; use std::{io, net::IpAddr, panic, sync::Arc, thread, time::Duration}; @@ -90,23 +92,6 @@ pub mod events { /// Tells the `NetworkThread` to allow the `IpNet` AllowIpNet(IpNet), } - - /// Errors emitted by the network `Bridge` - #[derive(Debug)] - pub enum NetworkError { - /// The protocol is not supported in this implementation - UnsupportedProtocol, - /// There is no executor to run the bridge on - MissingExecutor, - /// Some other IO error - Io(std::io::Error), - } - - impl From for NetworkError { - fn from(other: std::io::Error) -> Self { - NetworkError::Io(other) - } - } } /// The configuration for the network `Bridge` @@ -225,26 +210,32 @@ impl Bridge { } } + fn send_and_wake(&self, event: DispatchEvent) -> Result<(), NetworkBridgeError> { + self.network_input_queue + .send(event) + .context(network_bridge_error::SendSnafu)?; + self.waker.wake().context(network_bridge_error::IoSnafu)?; + Ok(()) + } + /// Sets the dispatcher reference, returning the previously stored one pub fn set_dispatcher(&mut self, dispatcher: DispatcherRef) -> Option { self.dispatcher.replace(dispatcher) } /// Stops the bridge gracefully - pub fn stop(self) -> Result<(), NetworkBridgeErr> { + pub fn stop(self) -> Result<(), NetworkBridgeError> { debug!(self.log, "Stopping NetworkBridge..."); - self.network_input_queue.send(DispatchEvent::Stop)?; - self.waker.wake()?; + self.send_and_wake(DispatchEvent::Stop)?; self.shutdown_future.wait(); // should block until something is sent debug!(self.log, "Stopped NetworkBridge."); Ok(()) } /// Kills the Network - pub fn kill(self) -> Result<(), NetworkBridgeErr> { + pub fn kill(self) -> Result<(), NetworkBridgeError> { debug!(self.log, "Killing NetworkBridge..."); - self.network_input_queue.send(DispatchEvent::Kill)?; - self.waker.wake()?; + self.send_and_wake(DispatchEvent::Kill)?; self.shutdown_future.wait(); // should block until something is sent debug!(self.log, "Stopped NetworkBridge."); Ok(()) @@ -261,19 +252,12 @@ impl Bridge { addr: SocketAddr, data: DispatchData, protocol: Protocol, - ) -> Result<(), NetworkBridgeErr> { - match protocol { - Protocol::Tcp => { - self.network_input_queue - .send(DispatchEvent::SendTcp(addr, data))?; - } - Protocol::Udp => { - self.network_input_queue - .send(DispatchEvent::SendUdp(addr, data))?; - } - } - self.waker.wake()?; - Ok(()) + ) -> Result<(), NetworkBridgeError> { + let event = match protocol { + Protocol::Tcp => DispatchEvent::SendTcp(addr, data), + Protocol::Udp => DispatchEvent::SendUdp(addr, data), + }; + self.send_and_wake(event) } /// Attempts to establish a TCP connection to the provided `addr`. @@ -285,80 +269,51 @@ impl Bridge { /// /// # Errors /// If the provided protocol is not supported - pub fn connect(&self, proto: Transport, addr: SocketAddr) -> Result<(), NetworkBridgeErr> { + pub fn connect(&self, proto: Transport, addr: SocketAddr) -> Result<(), NetworkBridgeError> { match proto { - Transport::Tcp => { - self.network_input_queue - .send(events::DispatchEvent::Connect(addr))?; - self.waker.wake()?; - Ok(()) - } - _other => Err(NetworkBridgeErr::Other("Bad Protocol".to_string())), + Transport::Tcp => self.send_and_wake(DispatchEvent::Connect(addr)), + other => Err(NetworkBridgeError::unsupported_protocol(other)), } } /// Acknowledges a closed channel, required to ensure FIFO ordering under connection loss - pub fn ack_closed(&self, addr: SocketAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::ClosedAck(addr))?; - self.waker.wake()?; - Ok(()) + pub fn ack_closed(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::ClosedAck(addr)) } /// Requests that the NetworkThread should be closed - pub fn close_channel(&self, addr: SocketAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::Close(addr))?; - self.waker.wake()?; - Ok(()) + pub fn close_channel(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::Close(addr)) } /// Requests the NetworkThread to block the socket addr - pub fn block_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::BlockSocket(addr))?; - self.waker.wake()?; - Ok(()) + pub fn block_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::BlockSocket(addr)) } /// Requests the NetworkThread to block the ip address ip_addr - pub fn block_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::BlockIpAddr(ip_addr))?; - self.waker.wake()?; - Ok(()) + pub fn block_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::BlockIpAddr(ip_addr)) } /// Requests the NetworkThread to block the ip subnet ip_net - pub fn block_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::BlockIpNet(ip_net))?; - self.waker.wake()?; - Ok(()) + pub fn block_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::BlockIpNet(ip_net)) } /// Requests the NetworkThread to unblock the socket addr - pub fn allow_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::AllowSocket(addr))?; - self.waker.wake()?; - Ok(()) + pub fn allow_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::AllowSocket(addr)) } /// Requests the NetworkThread to unblock the ip address ip_addr - pub fn allow_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::AllowIpAddr(ip_addr))?; - self.waker.wake()?; - Ok(()) + pub fn allow_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::AllowIpAddr(ip_addr)) } /// Requests the NetworkThread to unblock the ip subnet ip_net - pub fn allow_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeErr> { - self.network_input_queue - .send(events::DispatchEvent::AllowIpNet(ip_net))?; - self.waker.wake()?; - Ok(()) + pub fn allow_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> { + self.send_and_wake(DispatchEvent::AllowIpNet(ip_net)) } } @@ -397,38 +352,169 @@ fn run_network_thread( .map(|_| ()) } -/// Errors which the NetworkBridge might return, not used for now. -#[derive(Debug)] -pub enum NetworkBridgeErr { - /// Something went wrong while binding - Binding(String), - /// Something went wrong with the thread - Thread(String), - /// Something else went wrong - Other(String), +/// Error returned when the network bridge fails. +#[derive(Debug, snafu::Snafu)] +pub struct NetworkBridgeError(Box); + +#[derive(Debug, snafu::Snafu)] +#[snafu(module(network_bridge_error), visibility(pub(crate)))] +enum NetworkBridgeErrorKind { + #[snafu(display( + "network bridge does not support protocol {protocol:?} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + UnsupportedProtocol { + protocol: Transport, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "failed to send network bridge event ({location}).{}", + SourceCause(source, None, backtrace.as_ref()) + ))] + Send { + source: SendError, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "network bridge IO failed ({location}).{}", + SourceCause(source, None, backtrace.as_ref()) + ))] + Io { + source: io::Error, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "network bridge receive failed ({location}).{}", + SourceCause(source, None, backtrace.as_ref()) + ))] + Receive { + source: RecvError, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "network bridge serialisation failed ({location}).{}", + SourceCause(source, None, backtrace.as_ref()) + ))] + Serialisation { + source: SerError, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, + #[snafu(display( + "network bridge failed: {message} ({location}){}", + BacktraceSuffix(backtrace.as_ref()) + ))] + Message { + message: String, + #[snafu(implicit)] + location: snafu::Location, + backtrace: Option, + }, +} + +impl NetworkBridgeError { + /// Construct an unsupported-protocol bridge error. + #[track_caller] + pub fn unsupported_protocol(protocol: Transport) -> Self { + network_bridge_error::UnsupportedProtocolSnafu { protocol } + .build() + .into() + } + + /// Construct a message-only bridge error. + #[track_caller] + pub fn message(message: impl Into) -> Self { + network_bridge_error::MessageSnafu { + message: message.into(), + } + .build() + .into() + } +} + +impl From> for NetworkBridgeError { + #[track_caller] + fn from(source: SendError) -> Self { + network_bridge_error::SendSnafu.into_error(source).into() + } } -impl From> for NetworkBridgeErr { - fn from(error: SendError) -> Self { - NetworkBridgeErr::Other(format!("SendError: {:?}", error)) +impl From for NetworkBridgeError { + #[track_caller] + fn from(source: io::Error) -> Self { + network_bridge_error::IoSnafu.into_error(source).into() } } -impl From for NetworkBridgeErr { - fn from(error: io::Error) -> Self { - NetworkBridgeErr::Other(format!("io::Error: {:?}", error)) +impl From for NetworkBridgeError { + #[track_caller] + fn from(source: RecvError) -> Self { + network_bridge_error::ReceiveSnafu.into_error(source).into() } } -impl From for NetworkBridgeErr { - fn from(error: RecvError) -> Self { - NetworkBridgeErr::Other(format!("RecvError: {:?}", error)) +impl From for NetworkBridgeError { + #[track_caller] + fn from(source: SerError) -> Self { + network_bridge_error::SerialisationSnafu + .into_error(source) + .into() } } -impl From for NetworkBridgeErr { - fn from(error: SerError) -> Self { - NetworkBridgeErr::Other(format!("SerError: {:?}", error)) +#[cfg(test)] +mod bridge_error_tests { + use super::*; + use kompact::test_support::assert_display_matches; + use std::error::Error; + + #[test] + fn network_bridge_error_display_includes_source_and_location() { + let source = io::Error::other("waker failed"); + let error = NetworkBridgeError::from(source); + + assert_display_matches( + &error.to_string(), + "\ +network bridge IO failed {LOC}. + Caused by: waker failed.", + file!(), + ); + assert_eq!( + "waker failed", + error.source().expect("io source").to_string() + ); + } + + #[test] + fn network_bridge_send_error_preserves_send_error_source() { + let (sender, receiver) = channel::(); + drop(receiver); + let source = sender + .send(DispatchEvent::Stop) + .expect_err("send should fail without receiver"); + let error: NetworkBridgeError = network_bridge_error::SendSnafu.into_error(source).into(); + + assert_display_matches( + &error.to_string(), + "\ +failed to send network bridge event {LOC}. + Caused by: sending on a disconnected channel.", + file!(), + ); + assert_eq!( + "sending on a disconnected channel", + error.source().expect("send source").to_string() + ); } } @@ -456,7 +542,7 @@ pub(crate) fn broken_pipe(err: &io::Error) -> bool { } pub(crate) fn out_of_buffers(err: &SerError) -> bool { - matches!(err, SerError::NoBuffersAvailable(_)) + err.kind() == SerErrorKind::NoBuffersAvailable } /// A module with helper functions for testing network configurations/implementations @@ -577,7 +663,7 @@ pub mod net_test_helpers { fn deserialise(buf: &mut dyn Buf) -> Result { if buf.remaining() < 9 { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Serialised typed has 9bytes but only {}bytes remain in buffer.", buf.remaining() ))); @@ -587,10 +673,10 @@ pub mod net_test_helpers { let i = buf.get_u64(); Ok(PingMsg { i }) } - PONG_ID => Err(SerError::InvalidType( - "Found PongMsg, but expected PingMsg.".into(), + PONG_ID => Err(SerError::invalid_type( + "Found PongMsg, but expected PingMsg.", )), - id => Err(SerError::InvalidType(format!( + id => Err(SerError::invalid_type(format!( "Found unknown id {}, but expected PingMsg.", id ))), @@ -603,7 +689,7 @@ pub mod net_test_helpers { fn deserialise(buf: &mut dyn Buf) -> Result { if buf.remaining() < 9 { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Serialised typed has 9bytes but only {}bytes remain in buffer.", buf.remaining() ))); @@ -613,10 +699,10 @@ pub mod net_test_helpers { let i = buf.get_u64(); Ok(PongMsg { i }) } - PING_ID => Err(SerError::InvalidType( - "Found PingMsg, but expected PongMsg.".into(), + PING_ID => Err(SerError::invalid_type( + "Found PingMsg, but expected PongMsg.", )), - id => Err(SerError::InvalidType(format!( + id => Err(SerError::invalid_type(format!( "Found unknown id {}, but expected PingMsg.", id ))), @@ -975,7 +1061,7 @@ pub mod net_test_helpers { fn deserialise(buf: &mut dyn Buf) -> Result { if buf.remaining() < 18 { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Serialised typed has 18 bytes but only {} bytes remain in buffer.", buf.remaining() ))); @@ -992,10 +1078,10 @@ pub mod net_test_helpers { assert_eq!(buf.remaining(), 0, "Buffer too big! {}", buf.remaining()); Ok(BigPingMsg { i, data, sum }) } - PONG_ID => Err(SerError::InvalidType( - "Found BigPongMsg, but expected BigPingMsg.".into(), + PONG_ID => Err(SerError::invalid_type( + "Found BigPongMsg, but expected BigPingMsg.", )), - id => Err(SerError::InvalidType(format!( + id => Err(SerError::invalid_type(format!( "Found unknown id {}, but expected BigPingMsg.", id ))), @@ -1008,7 +1094,7 @@ pub mod net_test_helpers { fn deserialise(buf: &mut dyn Buf) -> Result { if buf.remaining() < 18 { - return Err(SerError::InvalidData(format!( + return Err(SerError::invalid_data(format!( "Serialised typed has 18 bytes but only {} bytes remain in buffer.", buf.remaining() ))); @@ -1025,10 +1111,10 @@ pub mod net_test_helpers { assert_eq!(buf.remaining(), 0, "Buffer too big!"); Ok(BigPongMsg { i, data, sum }) } - PING_ID => Err(SerError::InvalidType( - "Found BigPingMsg, but expected BigPongMsg.".into(), + PING_ID => Err(SerError::invalid_type( + "Found BigPingMsg, but expected BigPongMsg.", )), - id => Err(SerError::InvalidType(format!( + id => Err(SerError::invalid_type(format!( "Found unknown id {}, but expected BigPingMsg.", id ))), @@ -1370,7 +1456,7 @@ pub mod net_test_helpers { Ok(()) } else { - Err(SerError::Unknown("No Valid Receivers".to_string())) + Err(SerError::unknown("No Valid Receivers")) } } } diff --git a/network/src/transport/network_thread.rs b/network/src/transport/network_thread.rs index 0de3a388..b743632d 100644 --- a/network/src/transport/network_thread.rs +++ b/network/src/transport/network_thread.rs @@ -29,6 +29,7 @@ use mio::{ net::{TcpListener, TcpStream, UdpSocket}, }; use rustc_hash::{FxHashMap, FxHashSet}; +use snafu::ResultExt; use std::{ cell::{RefCell, RefMut}, collections::VecDeque, @@ -77,12 +78,15 @@ impl NetworkThreadBuilder { shutdown_promise: KPromise<()>, dispatcher_ref: DispatcherRef, network_config: NetworkConfig, - ) -> Result { + ) -> Result { let poll = Poll::new().expect("failed to create Poll instance in NetworkThread"); let waker = Waker::new(poll.registry(), DISPATCHER).expect("failed to create Waker for DISPATCHER"); - let tcp_listener = bind_with_retries(&address, MAX_BIND_RETRIES, &log)?; - let actual_address = tcp_listener.local_addr()?; + let tcp_listener = bind_with_retries(&address, MAX_BIND_RETRIES, &log) + .context(network_bridge_error::IoSnafu)?; + let actual_address = tcp_listener + .local_addr() + .context(network_bridge_error::IoSnafu)?; Ok(NetworkThreadBuilder { poll, tcp_listener, diff --git a/network/tests/dispatch_integration_tests.rs b/network/tests/dispatch_integration_tests.rs index 3ff17758..6cbed117 100644 --- a/network/tests/dispatch_integration_tests.rs +++ b/network/tests/dispatch_integration_tests.rs @@ -186,12 +186,12 @@ struct TestSystem { } impl TestSystem { - fn shutdown(self) -> Result<(), String> { + fn shutdown(self) -> Result<(), ShutdownError> { let TestSystem { inner, _slot } = self; inner.shutdown().wait() } - fn kill_system(self) -> Result<(), String> { + fn kill_system(self) -> Result<(), ShutdownError> { let TestSystem { inner, _slot } = self; inner.kill_system() }