Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Agent Instructions

- Run repository-wide checks with `./run_checks.sh`.
- Run repository-wide tests with `./run_tests.sh` outside the sandbox, after confirming single tests of affected changes inside the sandbox.

## Rust Rules

- Do not run multiple `cargo` instances in parallel! They anyway lock.
- Inside the sandbox, `cargo test` may run either one explicit test or a broader test selection only when passed `-- --test-threads=1`. Any grouped or repeated multi-threaded `cargo test` run must be executed outside the sandbox.
- Format Rust code according to `rustfmt.toml`.
- Keep Rust changes clippy-clean.
- Avoid immediately executed anonymous functions such as `(|| { ... })()`. Prefer ordinary `Result` or `Option` handling when the body is short; extract a named helper for larger bodies.
- Prefer readable control flow over chained iterator side effects.
- Use Snafu-derived error types (`#[derive(Snafu)]`) for Rust error enums.
- Prefer `context(...)` / `with_context(...)` over manual `map_err(...)` when the target error still wraps the original source. Use `with_context(...)` when building the context captures clones, allocations, or other non-trivial work.
- If the only reason to introduce a new error variant is to differentiate the use-site of an existing variant, prefer adding `location: Location` to the existing variant instead.
- Use `#[snafu(module(...))]` plus module-qualified selector names when otherwise identical selector names would collide. Do not introduce custom selector aliases like `FooBarBazSnafu` just to disambiguate use sites.
- Keep Snafu variant names generic inside one error enum. Do not bake call-site names like `PublishStoreAccess` into the variant when the enum type or selector module already provides that context.
- Reserve manual `map_err(...)` for real error translation cases that `context(...)` cannot express cleanly.
- Do not manually construct Snafu boxed-source variants with `Box::new(source)`; use `result.boxed().context(SelectorSnafu)` or `context(...)`/`with_context(...)` instead.
- When splitting a single-file Rust module into a folder module, move the original module contents to `mod.rs` in the new folder.
- Avoid nesting `?` into expressions. It's easier to read if they only occur at the end of a line. Refactor the expression into a field where needed.
- Document non-public Rust helpers, fields, variants, and local types whenever their role, invariants, lifecycle, or preconditions are non-trivial or non-obvious. Prefer documenting what the item is supposed to do before adding code that explains how it does it.
- Add loop labels when control flow spans non-trivial nested loops or retries.
- Prefer the following top-level grouping within Rust files unless there is a strong local reason not to:
1. public items (`pub`)
2. restricted-visibility items (`pub(<qualifier>)`)
3. macros
4. private items
5. exposed test helpers
6. tests
- Within each group, use this order:
1. constants
2. traits
3. functions
4. structs/enums, each followed immediately by all associated `impl` blocks
- Imports should remain at the very top of the file/module/function.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ crossbeam-channel = "0.5"
futures = "0.3"
ipnet = "2.12"
lru = "0.16"
regex = "1"
rustc-hash = "2"
snafu = "0.9"
uuid = { version = "1.23", features = ["v4"] }
7 changes: 4 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ slog = "2"
slog-async = "2"
slog-term = "2"
rustc-hash = { workspace = true }
snafu = { workspace = true }
toml = "1"
humantime = "2"
byte-unit = "5"
Expand All @@ -75,15 +76,15 @@ futures = { workspace = true }
async-std = { workspace = true }
executors = "0.10"
lru = { workspace = true }
bytes = { workspace = true }
ipnet = { workspace = true }
regex = { workspace = true }

# Optional
protobuf = { version = "3", optional = true, features = ["with-bytes"] }
serde = { version = "1.0", optional = true }
core_affinity = { version = "0.8", optional = true }

bytes = { workspace = true }
ipnet = { workspace = true }

[dev-dependencies]
tempfile = "3"
serde = { version = "1.0", features = ["derive"] }
Expand Down
12 changes: 2 additions & 10 deletions core/src/actors/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ impl fmt::Display for TransportParseError {
}
}

impl Error for TransportParseError {
fn description(&self) -> &str {
"Transport must be one of [local,tcp,udp]"
}
}
impl Error for TransportParseError {}

/// Error type for parsing [paths](ActorPath) from a string
#[derive(Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -105,11 +101,7 @@ impl fmt::Display for PathParseError {
}

impl Error for PathParseError {
fn description(&self) -> &str {
"Path could not be parsed"
}

fn cause(&self) -> Option<&dyn Error> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
PathParseError::Form(_) => None,
PathParseError::Transport(e) => Some(e),
Expand Down
8 changes: 5 additions & 3 deletions core/src/dedicated_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
component::{Component, ComponentDefinition, CoreContainer, SchedulingDecision},
runtime::Scheduler,
runtime::{Scheduler, SchedulerShutdownError},
utils,
};
use std::{
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Scheduler for DedicatedThreadScheduler {
self.stop.store(true, Ordering::Relaxed);
}

fn shutdown(&self) -> Result<(), String> {
fn shutdown(&self) -> Result<(), SchedulerShutdownError> {
self.stop.store(true, Ordering::Relaxed);
loop {
if self.stopped.load(Ordering::Relaxed) {
Expand All @@ -150,7 +150,9 @@ impl Scheduler for DedicatedThreadScheduler {
}
}

fn shutdown_notify(&self) -> futures::future::BoxFuture<'static, Result<(), String>> {
fn shutdown_notify(
&self,
) -> futures::future::BoxFuture<'static, Result<(), SchedulerShutdownError>> {
let stop = self.stop.clone();
let stopped = self.stopped.clone();
async move {
Expand Down
20 changes: 12 additions & 8 deletions core/src/default_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl SystemComponents for DefaultComponents {
fn stop_notify<'a>(
&'a self,
system: &'a KompactSystem,
) -> futures::future::BoxFuture<'a, Result<(), String>> {
) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>> {
async move {
system.kill(self.dispatcher.clone());
system.kill(self.deadletter_box.clone());
Expand Down Expand Up @@ -126,17 +126,19 @@ impl TimerRefFactory for DefaultTimer {
}

impl TimerComponent for DefaultTimer {
fn shutdown(&self) -> Result<(), String> {
fn shutdown(&self) -> Result<(), TimerShutdownError> {
self.inner
.shutdown_async()
.map_err(|e| format!("Error during timer shutdown: {:?}", e))
.map_err(TimerShutdownError::from_debug)
}

fn shutdown_notify<'a>(&'a self) -> futures::future::BoxFuture<'a, Result<(), String>> {
fn shutdown_notify<'a>(
&'a self,
) -> futures::future::BoxFuture<'a, Result<(), TimerShutdownError>> {
future::ready(
self.inner
.shutdown_async()
.map_err(|e| format!("Error during timer shutdown: {:?}", e)),
.map_err(TimerShutdownError::from_debug),
)
.boxed()
}
Expand All @@ -159,12 +161,14 @@ impl TimerRefFactory for ManualTimerComponent {
}

impl TimerComponent for ManualTimerComponent {
fn shutdown(&self) -> Result<(), String> {
fn shutdown(&self) -> Result<(), TimerShutdownError> {
self.inner.stop();
Ok(())
}

fn shutdown_notify<'a>(&'a self) -> futures::future::BoxFuture<'a, Result<(), String>> {
fn shutdown_notify<'a>(
&'a self,
) -> futures::future::BoxFuture<'a, Result<(), TimerShutdownError>> {
// This actually locks a Mutex, so there's a small deadlock risk here.
self.inner.stop();
future::ready(Ok(())).boxed()
Expand Down Expand Up @@ -232,7 +236,7 @@ where
fn stop_notify<'a>(
&'a self,
system: &'a KompactSystem,
) -> futures::future::BoxFuture<'a, Result<(), String>> {
) -> futures::future::BoxFuture<'a, Result<(), SystemComponentsShutdownError>> {
async move {
// Gracefully stop the dispatcher/Network layer.
system.stop(&self.dispatcher.clone());
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub mod prelude {
TryLockError,
TwoWayChannel,
},
runtime::{KompactConfig, KompactSystem, SystemHandle},
runtime::{KompactConfig, KompactSystem, ShutdownError, SystemHandle},
supervision::{Fault, FaultContext, RecoveryHandler},
};

Expand Down
6 changes: 3 additions & 3 deletions core/src/messaging/deser_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ mod deser_macro_tests {

fn deserialise(buf: &mut dyn Buf) -> Result<MsgA, SerError> {
if buf.remaining() < 8 {
return Err(SerError::InvalidData(
"Less than 8bytes remaining in buffer!".to_string(),
return Err(SerError::invalid_data(
"Less than 8bytes remaining in buffer!",
));
}
let index = buf.get_u64();
Expand Down Expand Up @@ -799,7 +799,7 @@ mod deser_macro_tests {
fn deserialise(buf: &mut dyn Buf) -> Result<Self, SerError> {
let magic_byte = buf.get_u8();
if magic_byte != Self::MAGIC_BYTE {
return Err(SerError::InvalidData(format!(
return Err(SerError::invalid_data(format!(
"Expected MAGIC_BYTE={}, but got {} instead",
Self::MAGIC_BYTE,
magic_byte
Expand Down
42 changes: 19 additions & 23 deletions core/src/messaging/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl TryFrom<u8> for AddressType {
match x {
x if x == AddressType::IPv4 as u8 => Ok(AddressType::IPv4),
x if x == AddressType::IPv6 as u8 => Ok(AddressType::IPv6),
_ => Err(SerError::InvalidType("Unsupported AddressType".into())),
_ => Err(SerError::invalid_type("Unsupported AddressType")),
}
}
}
Expand All @@ -89,7 +89,7 @@ impl TryFrom<u8> for PathType {
match x {
x if x == PathType::Unique as u8 => Ok(PathType::Unique),
x if x == PathType::Named as u8 => Ok(PathType::Named),
_ => Err(SerError::InvalidType("Unsupported PathType".into())),
_ => Err(SerError::invalid_type("Unsupported PathType")),
}
}
}
Expand All @@ -102,9 +102,7 @@ impl TryFrom<u8> for Transport {
x if x == Transport::Local as u8 => Ok(Transport::Local),
x if x == Transport::Udp as u8 => Ok(Transport::Udp),
x if x == Transport::Tcp as u8 => Ok(Transport::Tcp),
_ => Err(SerError::InvalidType(
"Unsupported transport protocol".into(),
)),
_ => Err(SerError::invalid_type("Unsupported transport protocol")),
}
}
}
Expand Down Expand Up @@ -192,13 +190,13 @@ impl TryFrom<u8> for SystemPathHeader {
let storage = [value];
let path_type = storage
.get_as::<PathType>()
.map_err(|_| SerError::InvalidData("System Path could not be read.".to_owned()))?;
let protocol = storage.get_as::<Transport>().map_err(|_| {
SerError::InvalidData("System Path Transport could not be read.".to_owned())
})?;
let address_type = storage.get_as::<AddressType>().map_err(|_| {
SerError::InvalidData("System Path AddressType could not be read.".to_owned())
})?;
.map_err(|_| SerError::invalid_data("System Path could not be read."))?;
let protocol = storage
.get_as::<Transport>()
.map_err(|_| SerError::invalid_data("System Path Transport could not be read."))?;
let address_type = storage
.get_as::<AddressType>()
.map_err(|_| SerError::invalid_data("System Path AddressType could not be read."))?;

let header = SystemPathHeader {
storage,
Expand Down Expand Up @@ -286,8 +284,8 @@ fn system_path_from_buf(buf: &mut dyn Buf) -> Result<(SystemPathHeader, SystemPa
let address: IpAddr = match header.address_type {
AddressType::IPv4 => {
if buf.remaining() < 4 {
return Err(SerError::InvalidData(
"Could not parse 4 bytes for IPv4 address".into(),
return Err(SerError::invalid_data(
"Could not parse 4 bytes for IPv4 address",
));
} else {
let mut ip_bytes = [0u8; 4];
Expand All @@ -297,8 +295,8 @@ fn system_path_from_buf(buf: &mut dyn Buf) -> Result<(SystemPathHeader, SystemPa
}
AddressType::IPv6 => {
if buf.remaining() < 16 {
return Err(SerError::InvalidData(
"Could not parse 16 bytes for IPv6 address".into(),
return Err(SerError::invalid_data(
"Could not parse 16 bytes for IPv6 address",
));
} else {
let mut ip_bytes = [0u8; 16];
Expand Down Expand Up @@ -370,7 +368,7 @@ impl Serialisable for ActorPath {
let path = np.path_ref().join("/");
let data = path.as_bytes();
let name_len: u16 = u16::try_from(data.len()).map_err(|_| {
SerError::InvalidData("Named path overflows designated 2 bytes length.".into())
SerError::invalid_data("Named path overflows designated 2 bytes length.")
})?;
buf.put_u16(name_len);
buf.put_slice(data);
Expand All @@ -392,9 +390,7 @@ impl Deserialiser<ActorPath> for ActorPath {
let path = match header.path_type {
PathType::Unique => {
if buf.remaining() < 16 {
return Err(SerError::InvalidData(
"Could not get 16 bytes for UUID".into(),
));
return Err(SerError::invalid_data("Could not get 16 bytes for UUID"));
} else {
let mut uuid_bytes = [0u8; 16];
buf.copy_to_slice(&mut uuid_bytes);
Expand All @@ -405,7 +401,7 @@ impl Deserialiser<ActorPath> for ActorPath {
PathType::Named => {
let name_len = buf.get_u16() as usize;
if buf.remaining() < name_len {
return Err(SerError::InvalidData(format!(
return Err(SerError::invalid_data(format!(
"Could not get {} bytes for path name",
name_len
)));
Expand All @@ -420,8 +416,8 @@ impl Deserialiser<ActorPath> for ActorPath {
};
let parts: Vec<&str> = name.split('/').collect();
if parts.is_empty() {
return Err(SerError::InvalidData(
"Could not determine name for Named path type".into(),
return Err(SerError::invalid_data(
"Could not determine name for Named path type",
));
} else {
let path = parts.into_iter().map(|s| s.to_string()).collect();
Expand Down
2 changes: 1 addition & 1 deletion core/src/messaging/net_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl NetData {
match data {
HeapOrSer::Boxed(boxed_ser) => {
let b = boxed_ser.local().map_err(|_| {
UnpackError::DeserError(SerError::Unknown(format!(
UnpackError::DeserError(SerError::unknown(format!(
"Serialisable with id={} can't be converted to local!",
ser_id
)))
Expand Down
Loading
Loading