diff --git a/Cargo.lock b/Cargo.lock index f9b6dc90c..9ec799500 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -472,7 +472,7 @@ dependencies = [ [[package]] name = "cachet" -version = "0.5.0" +version = "0.5.1" dependencies = [ "alloc_tracker", "anyspawn", @@ -505,7 +505,7 @@ dependencies = [ [[package]] name = "cachet_memory" -version = "0.2.0" +version = "0.2.1" dependencies = [ "cachet_tier", "criterion", diff --git a/Cargo.toml b/Cargo.toml index 97b33c7ea..86504f015 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,8 @@ homepage = "https://github.com/microsoft/oxidizer" anyspawn = { path = "crates/anyspawn", default-features = false, version = "0.5.0" } bytesbuf = { path = "crates/bytesbuf", default-features = false, version = "0.5.0" } bytesbuf_io = { path = "crates/bytesbuf_io", default-features = false, version = "0.5.0" } -cachet = { path = "crates/cachet", default-features = false, version = "0.5.0" } -cachet_memory = { path = "crates/cachet_memory", default-features = false, version = "0.2.0" } +cachet = { path = "crates/cachet", default-features = false, version = "0.5.1" } +cachet_memory = { path = "crates/cachet_memory", default-features = false, version = "0.2.1" } cachet_service = { path = "crates/cachet_service", default-features = false, version = "0.1.0" } cachet_tier = { path = "crates/cachet_tier", default-features = false, version = "0.1.0" } data_privacy = { path = "crates/data_privacy", default-features = false, version = "0.11.0" } diff --git a/crates/cachet/CHANGELOG.md b/crates/cachet/CHANGELOG.md index 2ee1bb021..ce0d644d1 100644 --- a/crates/cachet/CHANGELOG.md +++ b/crates/cachet/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.5.1] - 2026-05-21 + +- ✨ Features + + - Add `get_or_insert_with` and `try_get_or_insert_with` methods that accept closures returning `CacheEntry`, enabling per-entry TTL control on cache-miss computations. + - Add eviction telemetry via `cache.eviction` and `cache.expired`, opt-in through `InMemoryCacheBuilder::with_eviction_telemetry` together with the new `CacheBuilder::memory_with` helper. + ## [0.5.0] - 2026-05-19 - ✔️ Tasks diff --git a/crates/cachet/Cargo.toml b/crates/cachet/Cargo.toml index 1b9b64f84..9b35c18f3 100644 --- a/crates/cachet/Cargo.toml +++ b/crates/cachet/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "cachet" description = "A composable, customizable multi-tier caching library with rich feature support." -version = "0.5.0" +version = "0.5.1" readme = "README.md" keywords = ["oxidizer", "caching", "concurrency"] categories = ["caching", "concurrency"] @@ -99,6 +99,10 @@ name = "refresh" harness = false required-features = ["test-util"] +[[test]] +name = "eviction" +required-features = ["memory", "logs"] + [[example]] name = "simple" required-features = ["memory"] diff --git a/crates/cachet/README.md b/crates/cachet/README.md index bca66ac12..ec68cd21b 100644 --- a/crates/cachet/README.md +++ b/crates/cachet/README.md @@ -256,7 +256,7 @@ See the `telemetry_subscriber` example for a complete demonstration. |Level|Events| |-----|------| |ERROR|`cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error`| -|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`| +|INFO|`cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction`| |DEBUG|`cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared`| @@ -265,26 +265,26 @@ See the `telemetry_subscriber` example for a complete demonstration. This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG-n6c1asXb8uG6CeIkUqNVu-GxjETKbvQrZwG4I5xAXRHmeEYWSIgmhieXRlc2J1ZmUwLjUuMIJmY2FjaGV0ZTAuNS4wgm1jYWNoZXRfbWVtb3J5ZTAuMi4wgm5jYWNoZXRfc2VydmljZWUwLjEuMIJrY2FjaGV0X3RpZXJlMC4xLjCCZHRpY2tlMC4zLjCCZ3RyYWNpbmdmMC4xLjQ0gml1bmlmbGlnaHRlMC4yLjA - [__link0]: https://docs.rs/cachet/0.5.0/cachet/?search=TimeToRefresh + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbg_hDqE88LP4bMh0J5Y4y4Osb0zDJ1kwqOsoblCGrm49Rx2thZIiCaGJ5dGVzYnVmZTAuNS4wgmZjYWNoZXRlMC41LjGCbWNhY2hldF9tZW1vcnllMC4yLjGCbmNhY2hldF9zZXJ2aWNlZTAuMS4wgmtjYWNoZXRfdGllcmUwLjEuMIJkdGlja2UwLjMuMIJndHJhY2luZ2YwLjEuNDSCaXVuaWZsaWdodGUwLjIuMA + [__link0]: https://docs.rs/cachet/0.5.1/cachet/?search=TimeToRefresh [__link1]: https://crates.io/crates/uniflight/0.2.0 [__link10]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheTier - [__link11]: https://docs.rs/cachet/0.5.0/cachet/?search=InsertPolicy - [__link12]: https://docs.rs/cachet/0.5.0/cachet/?search=TimeToRefresh + [__link11]: https://docs.rs/cachet/0.5.1/cachet/?search=InsertPolicy + [__link12]: https://docs.rs/cachet/0.5.1/cachet/?search=TimeToRefresh [__link13]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=Error [__link14]: https://crates.io/crates/cachet_tier/0.1.0 - [__link15]: https://crates.io/crates/cachet_memory/0.2.0 + [__link15]: https://crates.io/crates/cachet_memory/0.2.1 [__link16]: https://docs.rs/moka [__link17]: https://crates.io/crates/cachet_service/0.1.0 - [__link18]: https://docs.rs/cachet/0.5.0/cachet/?search=telemetry::attributes + [__link18]: https://docs.rs/cachet/0.5.1/cachet/?search=telemetry::attributes [__link19]: https://docs.rs/bytesbuf/0.5.0/bytesbuf/?search=BytesView - [__link2]: https://docs.rs/cachet/0.5.0/cachet/?search=CacheBuilder::stampede_protection + [__link2]: https://docs.rs/cachet/0.5.1/cachet/?search=CacheBuilder::stampede_protection [__link20]: https://crates.io/crates/tracing/0.1.44 - [__link21]: https://docs.rs/cachet/0.5.0/cachet/?search=telemetry::attributes + [__link21]: https://docs.rs/cachet/0.5.1/cachet/?search=telemetry::attributes [__link3]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheTier [__link4]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=DynamicCache - [__link5]: https://docs.rs/cachet/0.5.0/cachet/?search=InsertPolicy + [__link5]: https://docs.rs/cachet/0.5.1/cachet/?search=InsertPolicy [__link6]: https://docs.rs/tick/0.3.0/tick/?search=Clock - [__link7]: https://docs.rs/cachet/0.5.0/cachet/?search=Cache - [__link8]: https://docs.rs/cachet/0.5.0/cachet/?search=CacheBuilder + [__link7]: https://docs.rs/cachet/0.5.1/cachet/?search=Cache + [__link8]: https://docs.rs/cachet/0.5.1/cachet/?search=CacheBuilder [__link9]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheEntry diff --git a/crates/cachet/src/builder/buildable.rs b/crates/cachet/src/builder/buildable.rs index 7c405b710..befc93b83 100644 --- a/crates/cachet/src/builder/buildable.rs +++ b/crates/cachet/src/builder/buildable.rs @@ -42,7 +42,12 @@ where } fn build_tier(self, clock: Clock, telemetry: CacheTelemetry) -> Self::TierOutput { - CacheWrapper::new(type_name::(self.name), self.storage, clock, self.ttl, telemetry, self.policy) + let name = type_name::(self.name); + #[cfg(feature = "memory")] + if let Some(hook) = &self.eviction_hook { + hook.init(telemetry.clone(), name); + } + CacheWrapper::new(name, self.storage, clock, self.ttl, telemetry, self.policy) } } diff --git a/crates/cachet/src/builder/cache.rs b/crates/cachet/src/builder/cache.rs index f9d05b583..df092b376 100644 --- a/crates/cachet/src/builder/cache.rs +++ b/crates/cachet/src/builder/cache.rs @@ -3,15 +3,19 @@ use std::hash::Hash; use std::marker::PhantomData; +#[cfg(feature = "memory")] +use std::sync::Arc; use std::time::Duration; #[cfg(feature = "memory")] -use cachet_memory::InMemoryCache; +use cachet_memory::{InMemoryCache, InMemoryCacheBuilder}; use tick::Clock; use super::buildable::Buildable; use super::fallback::FallbackBuilder; use super::sealed::{CacheTierBuilder, Sealed}; +#[cfg(feature = "memory")] +use crate::eviction::EvictionHook; use crate::policy::InsertPolicy; use crate::telemetry::CacheTelemetry; use crate::{Cache, CacheTier}; @@ -44,6 +48,8 @@ pub struct CacheBuilder { pub(crate) clock: Clock, pub(crate) telemetry: CacheTelemetry, pub(crate) stampede_protection: bool, + #[cfg(feature = "memory")] + pub(crate) eviction_hook: Option>, pub(crate) _phantom: PhantomData<(K, V)>, } @@ -57,6 +63,8 @@ impl CacheBuilder { clock, telemetry: CacheTelemetry::new(), stampede_protection: false, + #[cfg(feature = "memory")] + eviction_hook: None, _phantom: PhantomData, } } @@ -90,6 +98,8 @@ impl CacheBuilder { clock: self.clock, telemetry: self.telemetry, stampede_protection: self.stampede_protection, + #[cfg(feature = "memory")] + eviction_hook: self.eviction_hook, _phantom: PhantomData, } } @@ -115,7 +125,50 @@ impl CacheBuilder { K: Hash + Eq + Clone + Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - self.storage(InMemoryCache::::new()) + self.memory_with(|b| b) + } + + /// Configures the cache to use in-memory storage, exposing the inner + /// [`InMemoryCacheBuilder`] for additional configuration (capacity, TTL, + /// eviction policy, custom hasher, etc.). + /// + /// Call [`InMemoryCacheBuilder::with_eviction_telemetry`] inside the + /// closure to emit `cache.eviction` on capacity evictions and + /// `cache.expired` on background TTL/TTI expiry. + /// + /// # Panics + /// + /// Panics if the configured [`InMemoryCacheBuilder`] fails validation + /// (for example, `max_capacity < initial_capacity`). + /// + /// # Examples + /// + /// ```no_run + /// use cachet::Cache; + /// use tick::Clock; + /// + /// let clock = Clock::new_tokio(); + /// let cache = Cache::builder::(clock) + /// .memory_with(|b| b.max_capacity(1_000).with_eviction_telemetry()) + /// .build(); + /// ``` + #[cfg(feature = "memory")] + #[must_use] + pub fn memory_with(mut self, configure: F) -> CacheBuilder> + where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + F: FnOnce(InMemoryCacheBuilder) -> InMemoryCacheBuilder, + { + let mut builder = configure(InMemoryCacheBuilder::::new()); + if builder.eviction_telemetry_enabled() { + let hook = Arc::new(EvictionHook::new()); + let hook_for_listener = Arc::clone(&hook); + builder = builder.on_eviction(move |cause| hook_for_listener.handle(cause)); + self.eviction_hook = Some(hook); + } + let storage = builder.build().expect("InMemoryCacheBuilder configuration must be valid"); + self.storage(storage) } /// Configures the cache to use a service as the storage backend. diff --git a/crates/cachet/src/cache.rs b/crates/cachet/src/cache.rs index 61cc9d1e7..9b3c653d3 100644 --- a/crates/cachet/src/cache.rs +++ b/crates/cachet/src/cache.rs @@ -27,7 +27,9 @@ struct Mergers { get: Merger>, Error>>, invalidate: Merger>, get_or_insert: Merger, Error>>, + get_or_insert_with: Merger, Error>>, try_get_or_insert: Merger, Error>>, + try_get_or_insert_with: Merger, Error>>, optionally_get_or_insert: Merger>, Error>>, } @@ -41,7 +43,9 @@ where get: Merger::new(), invalidate: Merger::new(), get_or_insert: Merger::new(), + get_or_insert_with: Merger::new(), try_get_or_insert: Merger::new(), + try_get_or_insert_with: Merger::new(), optionally_get_or_insert: Merger::new(), } } @@ -428,7 +432,172 @@ where return Ok(entry); } let value = f().await; - let entry = CacheEntry::new(value); + let mut entry = CacheEntry::new(value); + entry.ensure_cached_at(self.clock.system_time()); + self.insert(key.clone(), entry.clone()).await?; + Ok(entry) + } + + /// Retrieves a value from cache, or computes and caches a [`CacheEntry`] if missing. + /// + /// Like [`get_or_insert`](Self::get_or_insert), but the closure returns a full + /// [`CacheEntry`] instead of a raw `V`. This gives callers control over + /// per-entry metadata such as TTL via [`CacheEntry::expires_after`]. + /// + /// # Per-Entry TTL + /// + /// When the closure returns a `CacheEntry` with a TTL set (e.g., via + /// [`CacheEntry::expires_after`]), that TTL takes precedence over any tier-level + /// TTL configured on the cache. This is useful when the compute function returns + /// data with a known validity period. + /// + /// # Concurrency + /// + /// Subject to the same TOCTOU window as [`get_or_insert`](Self::get_or_insert) - + /// see its Concurrency section for details. + /// + /// # Stampede Protection + /// + /// When enabled via [`stampede_protection()`](crate::CacheBuilder::stampede_protection), + /// concurrent calls for the same missing key are coalesced - only one caller + /// computes the value while others wait and share the result. + /// + /// # Errors + /// + /// Returns an error if the underlying cache operation fails or (with stampede + /// protection) if the leader task panics. + /// + /// # Examples + /// + /// ```no_run + /// use std::time::Duration; + /// + /// use cachet::{Cache, CacheEntry}; + /// use tick::Clock; + /// # async { + /// + /// let clock = Clock::new_tokio(); + /// let cache = Cache::builder::(clock).memory().build(); + /// + /// let entry = cache + /// .get_or_insert_with("key", || async { + /// let value = 42; // expensive computation + /// let ttl = Duration::from_secs(300); // determined by response + /// CacheEntry::expires_after(value, ttl) + /// }) + /// .await?; + /// assert_eq!(*entry.value(), 42); + /// # Ok::<(), cachet::Error>(()) + /// # }; + /// ``` + pub async fn get_or_insert_with(&self, key: &Q, f: impl FnOnce() -> Fut + Send) -> Result, Error> + where + K: Borrow, + Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, + Fut: Future> + Send, + { + let owned = key.to_owned(); + if let Some(mergers) = &self.mergers { + mergers + .get_or_insert_with + .execute(key, move || async move { self.do_get_or_insert_with(&owned, f).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) + } else { + self.do_get_or_insert_with(&owned, f).await + } + } + + async fn do_get_or_insert_with(&self, key: &K, f: impl FnOnce() -> Fut) -> Result, Error> + where + Fut: Future>, + { + if let Some(entry) = self.storage.get(key).await? { + return Ok(entry); + } + let mut entry = f().await; + entry.ensure_cached_at(self.clock.system_time()); + self.insert(key.clone(), entry.clone()).await?; + Ok(entry) + } + + /// Retrieves a value from cache, or computes and caches a [`CacheEntry`] if missing. + /// + /// Like [`get_or_insert_with`](Self::get_or_insert_with), but the closure can fail. + /// Only successful results are cached — errors are not cached, allowing retries. + /// + /// # Per-Entry TTL + /// + /// When the closure returns `Ok(CacheEntry)` with a TTL set, that TTL takes + /// precedence over any tier-level TTL. See [`get_or_insert_with`](Self::get_or_insert_with) + /// for details. + /// + /// # Stampede Protection + /// + /// When enabled via [`stampede_protection()`](crate::CacheBuilder::stampede_protection), + /// concurrent calls for the same missing key are coalesced. If the computation + /// fails, the error is shared with all waiters but not cached. + /// + /// # Errors + /// + /// Returns an error if: + /// - The provided function returns an error (wrapped via [`Error::from_source`]) + /// - The underlying cache operation fails + /// - With stampede protection, if the leader task panics + /// + /// Use [`Error::source_as`] to extract the original error type. + /// + /// # Examples + /// + /// ```no_run + /// use std::time::Duration; + /// + /// use cachet::{Cache, CacheEntry, Error}; + /// use tick::Clock; + /// # async { + /// + /// let clock = Clock::new_tokio(); + /// let cache = Cache::builder::(clock).memory().build(); + /// + /// let result = cache + /// .try_get_or_insert_with("key", || async { + /// let value = 42; + /// let ttl = Duration::from_secs(60); + /// Ok::<_, std::io::Error>(CacheEntry::expires_after(value, ttl)) + /// }) + /// .await; + /// assert!(result.is_ok()); + /// # }; + /// ``` + pub async fn try_get_or_insert_with(&self, key: &Q, f: impl FnOnce() -> Fut + Send) -> Result, Error> + where + K: Borrow, + Q: Hash + Eq + ToOwned + ?Sized + Send + Sync, + E: std::error::Error + Send + Sync + 'static, + Fut: Future, E>> + Send, + { + let owned = key.to_owned(); + if let Some(mergers) = &self.mergers { + mergers + .try_get_or_insert_with + .execute(key, move || async move { self.do_try_get_or_insert_with(&owned, f).await }) + .await + .unwrap_or_else(|panicked| Err(Error::from_source(panicked))) + } else { + self.do_try_get_or_insert_with(&owned, f).await + } + } + + async fn do_try_get_or_insert_with(&self, key: &K, f: impl FnOnce() -> Fut) -> Result, Error> + where + E: std::error::Error + Send + Sync + 'static, + Fut: Future, E>>, + { + if let Some(entry) = self.storage.get(key).await? { + return Ok(entry); + } + let mut entry = f().await.map_err(Error::from_source)?; + entry.ensure_cached_at(self.clock.system_time()); self.insert(key.clone(), entry.clone()).await?; Ok(entry) } @@ -502,7 +671,8 @@ where return Ok(entry); } let value = f().await.map_err(Error::from_source)?; - let entry = CacheEntry::new(value); + let mut entry = CacheEntry::new(value); + entry.ensure_cached_at(self.clock.system_time()); self.insert(key.clone(), entry.clone()).await?; Ok(entry) } @@ -579,7 +749,8 @@ where } match f().await { Some(value) => { - let entry = CacheEntry::new(value); + let mut entry = CacheEntry::new(value); + entry.ensure_cached_at(self.clock.system_time()); self.insert(key.clone(), entry.clone()).await?; Ok(Some(entry)) } diff --git a/crates/cachet/src/eviction.rs b/crates/cachet/src/eviction.rs new file mode 100644 index 000000000..f586546d1 --- /dev/null +++ b/crates/cachet/src/eviction.rs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Bridge between an in-memory tier's eviction listener and cache telemetry. +//! +//! The cache builder is configured incrementally: storage is selected before +//! `name`/`enable_logs` may be called. We therefore install a stable listener +//! at storage-construction time that defers to a [`OnceLock`] populated when +//! the cache is finally built. + +use std::sync::OnceLock; +use std::time::Duration; + +use cachet_memory::RemovalCause; + +use crate::cache::CacheName; +use crate::telemetry::CacheTelemetry; + +/// Bridges moka crate's eviction listener to the cachet telemetry layer. +#[derive(Debug)] +pub(crate) struct EvictionHook { + state: OnceLock, +} + +#[derive(Debug)] +struct HookState { + telemetry: CacheTelemetry, + name: CacheName, +} + +impl EvictionHook { + pub(crate) fn new() -> Self { + Self { state: OnceLock::new() } + } + + /// Binds the hook to a telemetry sink and cache name. Subsequent calls are no-ops. + pub(crate) fn init(&self, telemetry: CacheTelemetry, name: CacheName) { + let _ = self.state.set(HookState { telemetry, name }); + } + + /// Routes a removal cause to the appropriate telemetry event. + /// + /// `Explicit` and `Replaced` are ignored because they are already covered + /// by the wrapper's `cache.invalidated` / `cache.inserted` events. + pub(crate) fn handle(&self, cause: RemovalCause) { + let Some(state) = self.state.get() else { + return; + }; + match cause { + RemovalCause::Size => state.telemetry.cache_eviction(state.name, Duration::ZERO), + RemovalCause::Expired => state.telemetry.cache_expired(state.name, Duration::ZERO), + RemovalCause::Explicit | RemovalCause::Replaced => {} + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use testing_aids::LogCapture; + + use super::*; + use crate::telemetry::attributes; + + #[cfg_attr(miri, ignore)] + #[test] + fn handle_before_init_is_noop() { + let capture = LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + let hook = EvictionHook::new(); + hook.handle(RemovalCause::Size); + + assert!(capture.output().is_empty(), "no event should fire before init"); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn handle_after_init_routes_by_cause() { + let capture = LogCapture::new(); + let _guard = tracing::subscriber::set_default(capture.subscriber()); + + let hook = Arc::new(EvictionHook::new()); + hook.init(CacheTelemetry::with_logging(), "hook_test"); + + hook.handle(RemovalCause::Explicit); + hook.handle(RemovalCause::Replaced); + assert!( + !capture.output().contains(attributes::EVENT_EVICTION) && !capture.output().contains(attributes::EVENT_EXPIRED), + "Explicit/Replaced must not emit eviction or expired events" + ); + + hook.handle(RemovalCause::Size); + capture.assert_contains(attributes::EVENT_EVICTION); + + hook.handle(RemovalCause::Expired); + capture.assert_contains(attributes::EVENT_EXPIRED); + } +} diff --git a/crates/cachet/src/lib.rs b/crates/cachet/src/lib.rs index d227d3965..999c7f85a 100644 --- a/crates/cachet/src/lib.rs +++ b/crates/cachet/src/lib.rs @@ -253,11 +253,13 @@ //! | Level | Events | //! |-------|--------| //! | ERROR | `cache.get_error`, `cache.insert_error`, `cache.invalidate_error`, `cache.clear_error` | -//! | INFO | `cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback` | +//! | INFO | `cache.expired`, `cache.refresh_miss`, `cache.inserted`, `cache.insert_rejected`, `cache.invalidated`, `cache.fallback`, `cache.eviction` | //! | DEBUG | `cache.hit`, `cache.miss`, `cache.refresh_hit`, `cache.cleared` | mod builder; mod cache; +#[cfg(feature = "memory")] +mod eviction; mod fallback; mod policy; mod refresh; diff --git a/crates/cachet/src/telemetry/attributes.rs b/crates/cachet/src/telemetry/attributes.rs index f7f7733b7..24946216b 100644 --- a/crates/cachet/src/telemetry/attributes.rs +++ b/crates/cachet/src/telemetry/attributes.rs @@ -85,6 +85,10 @@ pub const EVENT_REFRESH_HIT: &str = "cache.refresh_hit"; /// A background refresh did not find data in the fallback tier. pub const EVENT_REFRESH_MISS: &str = "cache.refresh_miss"; +/// An entry was evicted/removed due to cache size constraints. +/// Only emitted when eviction telemetry is enabled. +pub const EVENT_EVICTION: &str = "cache.eviction"; + #[cfg(test)] mod tests { use super::*; @@ -114,6 +118,7 @@ mod tests { EVENT_CLEAR_ERROR, EVENT_REFRESH_HIT, EVENT_REFRESH_MISS, + EVENT_EVICTION, ]; for (i, a) in events.iter().enumerate() { diff --git a/crates/cachet/src/telemetry/cache.rs b/crates/cachet/src/telemetry/cache.rs index 2179f6a09..c93fff32a 100644 --- a/crates/cachet/src/telemetry/cache.rs +++ b/crates/cachet/src/telemetry/cache.rs @@ -165,6 +165,14 @@ impl CacheTelemetry { self.inner.info(cache_name, attributes::EVENT_INSERTED, duration); } + /// Records that an entry was evicted from the cache because it reached + /// the configured capacity limit. + #[cfg(any(feature = "memory", test))] + #[inline] + pub(crate) fn cache_eviction(&self, cache_name: CacheName, duration: Duration) { + self.inner.info(cache_name, attributes::EVENT_EVICTION, duration); + } + /// Records a cache insert that was rejected by the insert policy. #[inline] pub(crate) fn insert_rejected(&self, cache_name: CacheName, duration: Duration) { @@ -290,6 +298,7 @@ mod tests { assert_emits(|t| t.refresh_hit("c", Duration::ZERO), attributes::EVENT_REFRESH_HIT); assert_emits(|t| t.refresh_miss("c", Duration::ZERO), attributes::EVENT_REFRESH_MISS); assert_emits(|t| t.cache_inserted("c", Duration::ZERO), attributes::EVENT_INSERTED); + assert_emits(|t| t.cache_eviction("c", Duration::ZERO), attributes::EVENT_EVICTION); assert_emits(|t| t.insert_rejected("c", Duration::ZERO), attributes::EVENT_INSERT_REJECTED); assert_emits(|t| t.insert_error("c", Duration::ZERO), attributes::EVENT_INSERT_ERROR); assert_emits(|t| t.cache_invalidated("c", Duration::ZERO), attributes::EVENT_INVALIDATED); diff --git a/crates/cachet/tests/cache.rs b/crates/cachet/tests/cache.rs index 8b8b3770c..51a31dcf0 100644 --- a/crates/cachet/tests/cache.rs +++ b/crates/cachet/tests/cache.rs @@ -338,6 +338,194 @@ fn cache_with_memory_is_sync() { assert_sync::>(); } +// ============================================================================= +// get_or_insert_with tests +// ============================================================================= + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn get_or_insert_with_computes_and_caches() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().build(); + + let key = "key".to_string(); + + let entry = cache + .get_or_insert_with(&key, || async { + CacheEntry::expires_after(42, std::time::Duration::from_secs(300)) + }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); + assert_eq!(entry.ttl(), Some(std::time::Duration::from_secs(300))); + + // Second call returns cached value, not the new closure result + let entry = cache.get_or_insert_with(&key, || async { CacheEntry::new(100) }).await.unwrap(); + assert_eq!(*entry.value(), 42); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn get_or_insert_with_preserves_per_entry_ttl() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().build(); + + let key = "key".to_string(); + let ttl = std::time::Duration::from_secs(60); + + let entry = cache + .get_or_insert_with(&key, || async { CacheEntry::expires_after(7, ttl) }) + .await + .unwrap(); + + assert_eq!(*entry.value(), 7); + assert_eq!(entry.ttl(), Some(ttl)); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn or_insert_family_populates_cached_at_on_miss() { + let clock = Clock::new_frozen(); + let now = clock.system_time(); + let cache = Cache::builder::(clock).memory().build(); + + let entry = cache.get_or_insert(&"a".to_string(), || async { 1 }).await.unwrap(); + assert_eq!(entry.cached_at(), Some(now)); + + let entry = cache + .get_or_insert_with(&"b".to_string(), || async { CacheEntry::new(2) }) + .await + .unwrap(); + assert_eq!(entry.cached_at(), Some(now)); + + let entry = cache + .try_get_or_insert(&"c".to_string(), || async { Ok::<_, Error>(3) }) + .await + .unwrap(); + assert_eq!(entry.cached_at(), Some(now)); + + let entry = cache + .try_get_or_insert_with(&"d".to_string(), || async { Ok::<_, Error>(CacheEntry::new(4)) }) + .await + .unwrap(); + assert_eq!(entry.cached_at(), Some(now)); + + let entry = cache + .optionally_get_or_insert(&"e".to_string(), || async { Some(5) }) + .await + .unwrap() + .unwrap(); + assert_eq!(entry.cached_at(), Some(now)); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn stampede_protection_get_or_insert_with() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().stampede_protection().build(); + + let key = "key".to_string(); + + let entry = cache + .get_or_insert_with(&key, || async { + CacheEntry::expires_after(42, std::time::Duration::from_secs(120)) + }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); + + // Second call returns cached value + let entry = cache.get_or_insert_with(&key, || async { CacheEntry::new(100) }).await.unwrap(); + assert_eq!(*entry.value(), 42); +} + +// ============================================================================= +// try_get_or_insert_with tests +// ============================================================================= + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn try_get_or_insert_with_success() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().build(); + + let key = "key".to_string(); + let ttl = std::time::Duration::from_secs(600); + + let entry = cache + .try_get_or_insert_with(&key, || async { Ok::<_, Error>(CacheEntry::expires_after(42, ttl)) }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); + assert_eq!(entry.ttl(), Some(ttl)); + + // Cached on second call + let entry = cache + .try_get_or_insert_with(&key, || async { Ok::<_, Error>(CacheEntry::new(100)) }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn try_get_or_insert_with_error_not_cached() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().build(); + + let key = "key".to_string(); + + let result: Result, Error> = cache + .try_get_or_insert_with(&key, || async { Err(Error::from_message("computation failed")) }) + .await; + result.expect_err("error should propagate"); + + // Not cached — second call with success should work + let entry = cache + .try_get_or_insert_with(&key, || async { Ok::<_, Error>(CacheEntry::new(99)) }) + .await + .unwrap(); + assert_eq!(*entry.value(), 99); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn stampede_protection_try_get_or_insert_with_success() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().stampede_protection().build(); + + let key = "key".to_string(); + + let entry = cache + .try_get_or_insert_with(&key, || async { + Ok::<_, Error>(CacheEntry::expires_after(42, std::time::Duration::from_secs(60))) + }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); + + // Cached on second call + let entry = cache + .try_get_or_insert_with(&key, || async { Ok::<_, Error>(CacheEntry::new(100)) }) + .await + .unwrap(); + assert_eq!(*entry.value(), 42); +} + +#[cfg_attr(miri, ignore)] +#[tokio::test] +async fn stampede_protection_try_get_or_insert_with_error() { + let clock = Clock::new_frozen(); + let cache = Cache::builder::(clock).memory().stampede_protection().build(); + + let key = "key".to_string(); + + let result: Result, Error> = cache + .try_get_or_insert_with(&key, || async { Err(Error::from_message("test error")) }) + .await; + result.expect_err("error should propagate through stampede protection"); +} + /// Verifies that `CacheEntry` is Send. #[test] fn cache_entry_is_send() { diff --git a/crates/cachet/tests/eviction.rs b/crates/cachet/tests/eviction.rs new file mode 100644 index 000000000..aa0f3ff33 --- /dev/null +++ b/crates/cachet/tests/eviction.rs @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Integration tests for eviction telemetry emitted by the in-memory tier's +//! eviction listener. + +#![cfg(feature = "memory")] + +use std::time::Duration; + +use cachet::{Cache, CacheEntry}; +use testing_aids::{LogCapture, TEST_TIMEOUT}; +use tick::Clock; +use tracing_subscriber::Registry; +use tracing_subscriber::layer::SubscriberExt; + +/// Inserting past the configured `max_capacity` of the underlying moka cache +/// must eventually emit a `cache.eviction` event for the size-based removals. +#[cfg_attr(miri, ignore)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn memory_size_eviction_emits_telemetry() { + let capture = LogCapture::new(); + // moka runs the eviction listener on its own background task / worker thread, + // so a thread-local subscriber (`set_default`) won't see those events. This + // test owns its own test binary, so we can safely install a process-global + // subscriber. + let subscriber = Registry::default().with(tracing_subscriber::fmt::layer().with_writer(capture.clone()).with_ansi(false)); + tracing::subscriber::set_global_default(subscriber).expect("no other global subscriber should be installed in this test binary"); + + let clock = Clock::new_tokio(); + let cache: Cache = Cache::builder::(clock) + .name("eviction-test") + .enable_logs() + .memory_with(|b| b.max_capacity(2).with_eviction_telemetry()) + .build(); + + // Drive enough churn to force size-based evictions. Moka's housekeeping + // runs periodically (and as a side effect of cache operations), so we keep + // exercising the cache while waiting for an eviction event to surface. + let deadline = std::time::Instant::now() + TEST_TIMEOUT; + let mut i: i32 = 0; + while std::time::Instant::now() < deadline { + for _ in 0..256 { + cache.insert(format!("k{i}"), CacheEntry::new(i)).await.unwrap(); + i += 1; + } + if capture.output().contains(cachet::telemetry::attributes::EVENT_EVICTION) { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!( + "expected `{}` event after exceeding max_capacity; captured output:\n{}", + cachet::telemetry::attributes::EVENT_EVICTION, + capture.output() + ); +} diff --git a/crates/cachet_memory/CHANGELOG.md b/crates/cachet_memory/CHANGELOG.md index 3c21ff451..a9b87b941 100644 --- a/crates/cachet_memory/CHANGELOG.md +++ b/crates/cachet_memory/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.2.1] + +- ✨ Features + + - Add `InMemoryCacheBuilder::on_eviction` for observing entry removals, along with the new public [`RemovalCause`] enum. + - Add `InMemoryCacheBuilder::with_eviction_telemetry` as a marker for the `cachet` host crate to install built-in eviction telemetry via `CacheBuilder::memory_with`. + ## [0.2.0] - 2026-05-19 - ✔️ Tasks diff --git a/crates/cachet_memory/Cargo.toml b/crates/cachet_memory/Cargo.toml index 56aaff6dd..83e4e206f 100644 --- a/crates/cachet_memory/Cargo.toml +++ b/crates/cachet_memory/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "cachet_memory" description = "In-memory cache tier backed by Moka for the cachet caching library." -version = "0.2.0" +version = "0.2.1" readme = "README.md" keywords = ["oxidizer", "caching", "concurrency"] categories = ["caching", "concurrency"] diff --git a/crates/cachet_memory/README.md b/crates/cachet_memory/README.md index 7383c256c..0cac02680 100644 --- a/crates/cachet_memory/README.md +++ b/crates/cachet_memory/README.md @@ -50,17 +50,35 @@ assert_eq!(*value.unwrap().value(), 42); * **TTL/TTI**: Configure time-to-live and time-to-idle expiration * **Per-entry TTL**: Honors [`CacheEntry::expires_after`][__link3] for per-entry expiration +* **Eviction notifications**: Observe removals via + [`InMemoryCacheBuilder::on_eviction`][__link4] or opt into host-side telemetry with + [`InMemoryCacheBuilder::with_eviction_telemetry`][__link5] * **Thread-safe**: Safe for concurrent access from multiple tasks * **Zero external types**: Builder API keeps implementation details private +## Eviction Notifications + +Two complementary hooks are available for observing entry removals: + +* [`InMemoryCacheBuilder::on_eviction`][__link6] takes a closure invoked with a + [`RemovalCause`][__link7] for every removal (capacity, expiry, explicit, or replace). + Use this for custom side effects. +* [`InMemoryCacheBuilder::with_eviction_telemetry`][__link8] is a marker that the host + crate (`cachet`) recognizes via `CacheBuilder::memory_with` and uses to + install a built-in listener that emits `cache.eviction` for capacity + removals and `cache.expired` for background TTL/TTI expiry. Explicit and + replaced removals are intentionally not surfaced — they are already covered + by the host’s `cache.invalidated` and `cache.inserted` events. The marker + has no effect when [`InMemoryCache`][__link9] is built directly without a host. + ## Expiration Behavior This tier supports three independent expiration mechanisms. When multiple are active, the **shortest duration wins** - an entry is evicted at the earliest of: -1. The per-entry TTL from [`CacheEntry::expires_after`][__link4] -1. The cache-wide TTL from [`InMemoryCacheBuilder::time_to_live`][__link5] -1. The cache-wide TTI from [`InMemoryCacheBuilder::time_to_idle`][__link6] +1. The per-entry TTL from [`CacheEntry::expires_after`][__link10] +1. The cache-wide TTL from [`InMemoryCacheBuilder::time_to_live`][__link11] +1. The cache-wide TTI from [`InMemoryCacheBuilder::time_to_idle`][__link12] This means the builder-level TTL/TTI acts as an **upper bound** on per-entry TTL. A per-entry TTL longer than the builder TTL will be silently clamped to the @@ -73,11 +91,17 @@ TTL/TTI unset or set them to a sufficiently high ceiling. This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEGx97UkpE8tyEG0w3jevrQF8SG5D28UVlbZVEG3A-UY200y_0YWSCgm1jYWNoZXRfbWVtb3J5ZTAuMi4wgmtjYWNoZXRfdGllcmUwLjEuMA - [__link0]: https://docs.rs/cachet_memory/0.2.0/cachet_memory/?search=InMemoryCache - [__link1]: https://docs.rs/cachet_memory/0.2.0/cachet_memory/?search=InMemoryCacheBuilder - [__link2]: https://docs.rs/cachet_memory/0.2.0/cachet_memory/?search=policy::EvictionPolicy + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbN0kpRlU_G9QbWC713oa4KjsbRG6BIsW3BU8bzI21NivEBVphZIKCbWNhY2hldF9tZW1vcnllMC4yLjGCa2NhY2hldF90aWVyZTAuMS4w + [__link0]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCache + [__link1]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder + [__link10]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheEntry::expires_after + [__link11]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::time_to_live + [__link12]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::time_to_idle + [__link2]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=policy::EvictionPolicy [__link3]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheEntry::expires_after - [__link4]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheEntry::expires_after - [__link5]: https://docs.rs/cachet_memory/0.2.0/cachet_memory/?search=InMemoryCacheBuilder::time_to_live - [__link6]: https://docs.rs/cachet_memory/0.2.0/cachet_memory/?search=InMemoryCacheBuilder::time_to_idle + [__link4]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::on_eviction + [__link5]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::with_eviction_telemetry + [__link6]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::on_eviction + [__link7]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=notification::RemovalCause + [__link8]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCacheBuilder::with_eviction_telemetry + [__link9]: https://docs.rs/cachet_memory/0.2.1/cachet_memory/?search=InMemoryCache diff --git a/crates/cachet_memory/src/builder.rs b/crates/cachet_memory/src/builder.rs index 2edfb1e66..0e1e8684b 100644 --- a/crates/cachet_memory/src/builder.rs +++ b/crates/cachet_memory/src/builder.rs @@ -7,15 +7,22 @@ //! the underlying cache configuration, providing a stable API surface //! without exposing implementation details. +use std::fmt; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::Arc; use std::time::Duration; use foldhash::fast::RandomState; +use crate::notification::RemovalCause; use crate::policy::EvictionPolicy; use crate::tier::InMemoryCache; +/// Type-erased eviction listener. +pub(crate) type EvictionListener = Arc; + /// Builder for configuring an `InMemoryCache`. /// /// This builder provides a stable API for common cache configuration @@ -36,7 +43,6 @@ use crate::tier::InMemoryCache; /// .name("my-cache") /// .build(); /// ``` -#[derive(Debug)] pub struct InMemoryCacheBuilder { pub(crate) max_capacity: Option, pub(crate) initial_capacity: Option, @@ -44,10 +50,35 @@ pub struct InMemoryCacheBuilder { pub(crate) time_to_idle: Option, pub(crate) name: Option<&'static str>, pub(crate) eviction_policy: EvictionPolicy, + pub(crate) eviction_listener: Option, + pub(crate) eviction_telemetry: bool, pub(crate) hasher: H, _phantom: PhantomData<(K, V)>, } +impl fmt::Debug for InMemoryCacheBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InMemoryCacheBuilder") + .field("max_capacity", &self.max_capacity) + .field("initial_capacity", &self.initial_capacity) + .field("time_to_live", &self.time_to_live) + .field("time_to_idle", &self.time_to_idle) + .field("name", &self.name) + .field("eviction_policy", &self.eviction_policy) + .field("eviction_listener", &self.eviction_listener.as_ref().map(|_| "")) + .field("eviction_telemetry", &self.eviction_telemetry) + .field("hasher", &self.hasher) + .finish() + } +} + +// `eviction_listener` holds a `dyn Fn`, which is not auto-`UnwindSafe`/`RefUnwindSafe`. +// Assert both explicitly so adding the listener doesn't break downstream code that +// relied on the auto impls. The closure is invoked by moka as a fire-and-forget +// callback; a panic inside it cannot leave observable state in the builder. +impl UnwindSafe for InMemoryCacheBuilder {} +impl RefUnwindSafe for InMemoryCacheBuilder {} + impl Default for InMemoryCacheBuilder { fn default() -> Self { Self::new() @@ -68,6 +99,8 @@ impl InMemoryCacheBuilder { time_to_idle: None, name: None, eviction_policy: EvictionPolicy::default(), + eviction_listener: None, + eviction_telemetry: false, hasher: RandomState::default(), _phantom: PhantomData, } @@ -218,6 +251,83 @@ impl InMemoryCacheBuilder { self } + /// Registers a listener that is called when an entry is removed from the cache. + /// + /// The listener receives a [`RemovalCause`] indicating why the entry was removed: + /// `Size` for capacity-driven evictions, `Expired` for TTL/TTI expiration, + /// `Explicit` for [`invalidate`](cachet_tier::CacheTier::invalidate) or + /// [`clear`](cachet_tier::CacheTier::clear) calls, and `Replaced` for inserts + /// that overwrote an existing key. + /// + /// The listener runs on the cache's background maintenance task. Keep the + /// closure cheap; expensive work should be offloaded to a separate task. + /// + /// If a listener was already registered (for example via an earlier + /// `on_eviction` call, or by the host crate when + /// [`with_eviction_telemetry`](Self::with_eviction_telemetry) is enabled), + /// the new listener is chained — both run on every removal, in registration + /// order. + /// + /// # Examples + /// + /// ```no_run + /// use std::sync::Arc; + /// use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// use cachet_memory::{InMemoryCache, RemovalCause}; + /// + /// let evictions = Arc::new(AtomicUsize::new(0)); + /// let counter = Arc::clone(&evictions); + /// + /// let cache = InMemoryCache::::builder() + /// .max_capacity(100) + /// .on_eviction(move |cause| { + /// if matches!(cause, RemovalCause::Size | RemovalCause::Expired) { + /// counter.fetch_add(1, Ordering::Relaxed); + /// } + /// }) + /// .build() + /// .expect("Failed to build cache"); + /// ``` + #[must_use] + pub fn on_eviction(mut self, listener: F) -> Self + where + F: Fn(RemovalCause) + Send + Sync + 'static, + { + self.eviction_listener = Some(match self.eviction_listener.take() { + Some(previous) => Arc::new(move |cause| { + previous(cause); + listener(cause); + }), + None => Arc::new(listener), + }); + self + } + + /// Requests that the host crate install eviction telemetry for this cache. + /// + /// This is a marker for `cachet::CacheBuilder::memory_with` to recognize: + /// when set, the host installs a listener that emits `cache.eviction` on + /// capacity evictions ([`RemovalCause::Size`]) and `cache.expired` on + /// background TTL/TTI expiry ([`RemovalCause::Expired`]). + /// [`RemovalCause::Explicit`] and [`RemovalCause::Replaced`] are + /// intentionally not surfaced, as they are already covered by the host's + /// `cache.invalidated` and `cache.inserted` events. + /// + /// When `InMemoryCache` is constructed directly via [`Self::build`] without + /// a host, this flag has no effect — use [`Self::on_eviction`] instead. + #[must_use] + pub fn with_eviction_telemetry(mut self) -> Self { + self.eviction_telemetry = true; + self + } + + /// Returns whether [`Self::with_eviction_telemetry`] was called on this builder. + #[must_use] + pub fn eviction_telemetry_enabled(&self) -> bool { + self.eviction_telemetry + } + /// Sets a custom hash builder for the cache. /// /// By default, the cache uses [`foldhash::fast::RandomState`] for high-performance @@ -244,6 +354,8 @@ impl InMemoryCacheBuilder { time_to_idle: self.time_to_idle, name: self.name, eviction_policy: self.eviction_policy, + eviction_listener: self.eviction_listener, + eviction_telemetry: self.eviction_telemetry, hasher, _phantom: PhantomData, } @@ -351,6 +463,79 @@ mod tests { assert_eq!(builder.name, Some("test")); } + #[test] + fn eviction_telemetry_defaults_false() { + let builder = InMemoryCacheBuilder::::new(); + assert!(!builder.eviction_telemetry_enabled()); + } + + #[test] + fn with_eviction_telemetry_sets_flag() { + let builder = InMemoryCacheBuilder::::new().with_eviction_telemetry(); + assert!(builder.eviction_telemetry_enabled()); + } + + #[test] + fn with_hasher_preserves_eviction_telemetry_flag() { + let builder = InMemoryCacheBuilder::::new() + .with_eviction_telemetry() + .with_hasher(std::collections::hash_map::RandomState::new()); + assert!(builder.eviction_telemetry_enabled()); + } + + #[test] + fn on_eviction_chains_existing_listener() { + use std::sync::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + + let first_count = Arc::new(AtomicUsize::new(0)); + let second_count = Arc::new(AtomicUsize::new(0)); + let order: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let first_count_cb = Arc::clone(&first_count); + let second_count_cb = Arc::clone(&second_count); + let order_first = Arc::clone(&order); + let order_second = Arc::clone(&order); + + let builder = InMemoryCacheBuilder::::new() + .on_eviction(move |_| { + first_count_cb.fetch_add(1, Ordering::Relaxed); + order_first.lock().unwrap().push("first"); + }) + .on_eviction(move |_| { + second_count_cb.fetch_add(1, Ordering::Relaxed); + order_second.lock().unwrap().push("second"); + }); + + let listener = builder.eviction_listener.expect("listener should be installed"); + listener(RemovalCause::Size); + + assert_eq!(first_count.load(Ordering::Relaxed), 1); + assert_eq!(second_count.load(Ordering::Relaxed), 1); + assert_eq!(*order.lock().unwrap(), vec!["first", "second"]); + } + + #[test] + fn debug_impl_renders_all_fields() { + let builder = InMemoryCacheBuilder::::new() + .max_capacity(100) + .initial_capacity(10) + .time_to_live(Duration::from_secs(60)) + .time_to_idle(Duration::from_secs(30)) + .name("my_cache") + .with_eviction_telemetry() + .on_eviction(|_| {}); + let rendered = format!("{builder:?}"); + assert!(rendered.contains("InMemoryCacheBuilder")); + assert!(rendered.contains("max_capacity: Some(100)")); + assert!(rendered.contains("initial_capacity: Some(10)")); + assert!(rendered.contains("time_to_live: Some(60s)")); + assert!(rendered.contains("time_to_idle: Some(30s)")); + assert!(rendered.contains("name: Some(\"my_cache\")")); + assert!(rendered.contains("eviction_telemetry: true")); + assert!(rendered.contains("eviction_listener: Some(\"\")")); + } + #[test] fn build_max_capacity_lt_initial_capacity_returns_validation_error() { let result = InMemoryCacheBuilder::::new() diff --git a/crates/cachet_memory/src/lib.rs b/crates/cachet_memory/src/lib.rs index 4cb87b460..cf26119ef 100644 --- a/crates/cachet_memory/src/lib.rs +++ b/crates/cachet_memory/src/lib.rs @@ -42,9 +42,27 @@ //! - **TTL/TTI**: Configure time-to-live and time-to-idle expiration //! - **Per-entry TTL**: Honors [`CacheEntry::expires_after`][cachet_tier::CacheEntry::expires_after] //! for per-entry expiration +//! - **Eviction notifications**: Observe removals via +//! [`InMemoryCacheBuilder::on_eviction`] or opt into host-side telemetry with +//! [`InMemoryCacheBuilder::with_eviction_telemetry`] //! - **Thread-safe**: Safe for concurrent access from multiple tasks //! - **Zero external types**: Builder API keeps implementation details private //! +//! # Eviction Notifications +//! +//! Two complementary hooks are available for observing entry removals: +//! +//! - [`InMemoryCacheBuilder::on_eviction`] takes a closure invoked with a +//! [`RemovalCause`] for every removal (capacity, expiry, explicit, or replace). +//! Use this for custom side effects. +//! - [`InMemoryCacheBuilder::with_eviction_telemetry`] is a marker that the host +//! crate (`cachet`) recognizes via `CacheBuilder::memory_with` and uses to +//! install a built-in listener that emits `cache.eviction` for capacity +//! removals and `cache.expired` for background TTL/TTI expiry. Explicit and +//! replaced removals are intentionally not surfaced — they are already covered +//! by the host's `cache.invalidated` and `cache.inserted` events. The marker +//! has no effect when [`InMemoryCache`] is built directly without a host. +//! //! # Expiration Behavior //! //! This tier supports three independent expiration mechanisms. When multiple are @@ -60,10 +78,13 @@ //! TTL/TTI unset or set them to a sufficiently high ceiling. mod builder; +pub mod notification; pub mod policy; mod tier; #[doc(inline)] pub use builder::InMemoryCacheBuilder; #[doc(inline)] +pub use notification::RemovalCause; +#[doc(inline)] pub use tier::InMemoryCache; diff --git a/crates/cachet_memory/src/notification.rs b/crates/cachet_memory/src/notification.rs new file mode 100644 index 000000000..3762ec8fb --- /dev/null +++ b/crates/cachet_memory/src/notification.rs @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Notifications emitted by the in-memory cache. + +/// The reason an entry was removed from the cache. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum RemovalCause { + /// The entry's TTL or TTI passed and the cache's background maintenance + /// reaped it. This is distinct from a get-time expiration check. + Expired, + + /// The cache was at capacity and the eviction policy selected this entry + /// for removal. + Size, + + /// The entry was removed by an explicit + /// [`invalidate`](cachet_tier::CacheTier::invalidate) or + /// [`clear`](cachet_tier::CacheTier::clear) call. + Explicit, + + /// The entry's value was replaced by a subsequent + /// [`insert`](cachet_tier::CacheTier::insert) with the same key. + Replaced, +} + +pub(crate) fn from_moka(cause: moka::notification::RemovalCause) -> RemovalCause { + match cause { + moka::notification::RemovalCause::Expired => RemovalCause::Expired, + moka::notification::RemovalCause::Size => RemovalCause::Size, + moka::notification::RemovalCause::Explicit => RemovalCause::Explicit, + moka::notification::RemovalCause::Replaced => RemovalCause::Replaced, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_moka_maps_all_variants() { + assert_eq!(from_moka(moka::notification::RemovalCause::Expired), RemovalCause::Expired); + assert_eq!(from_moka(moka::notification::RemovalCause::Size), RemovalCause::Size); + assert_eq!(from_moka(moka::notification::RemovalCause::Explicit), RemovalCause::Explicit); + assert_eq!(from_moka(moka::notification::RemovalCause::Replaced), RemovalCause::Replaced); + } +} diff --git a/crates/cachet_memory/src/tier.rs b/crates/cachet_memory/src/tier.rs index 9ab8042c6..1af08e731 100644 --- a/crates/cachet_memory/src/tier.rs +++ b/crates/cachet_memory/src/tier.rs @@ -163,6 +163,12 @@ where moka_builder = moka_builder.name(name); } + if let Some(listener) = builder.eviction_listener { + moka_builder = moka_builder.eviction_listener(move |_key, _value, cause| { + listener(crate::notification::from_moka(cause)); + }); + } + Self { inner: Arc::from_unaware( moka_builder diff --git a/crates/cachet_service/README.md b/crates/cachet_service/README.md index 36412f5d9..feae87876 100644 --- a/crates/cachet_service/README.md +++ b/crates/cachet_service/README.md @@ -45,7 +45,7 @@ let tier = ServiceAdapter::new(my_service); This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG3K5S_LB5wBuG9aH2I-oE91BG6p757n6ShIyG2QJsgO5MU4kYWSCgm5jYWNoZXRfc2VydmljZWUwLjEuMIJrY2FjaGV0X3RpZXJlMC4xLjA + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbcrlL8sHnAG4b1ofYj6gT3UEbqnvnufpKEjIbZAmyA7kxTiRhZIKCbmNhY2hldF9zZXJ2aWNlZTAuMS4wgmtjYWNoZXRfdGllcmUwLjEuMA [__link0]: https://docs.rs/cachet_service/0.1.0/cachet_service/?search=ServiceAdapter [__link1]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheTier [__link2]: https://docs.rs/cachet_service/0.1.0/cachet_service/?search=ServiceAdapter diff --git a/crates/cachet_tier/README.md b/crates/cachet_tier/README.md index 92a8caa71..b88b74933 100644 --- a/crates/cachet_tier/README.md +++ b/crates/cachet_tier/README.md @@ -74,7 +74,7 @@ for multi-tier caches with heterogeneous storage backends. This crate was developed as part of The Oxidizer Project. Browse this crate's source code. - [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG0hRqDfWg1oDG5BT1ZI-3omTG5WE4GB0Mg57G-G4ebzGeSk5YWSBgmtjYWNoZXRfdGllcmUwLjEuMA + [__cargo_doc2readme_dependencies_info]: ggGmYW0CYXZlMC43LjJhdIQbLiTyV0MU86EbZU15e0PmecoboQ9jo59bnAEbyDXw04U13GlhYvRhcoQbSFGoN9aDWgMbkFPVkj7eiZMblYTgYHQyDnsb4bh5vMZ5KTlhZIGCa2NhY2hldF90aWVyZTAuMS4w [__link0]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheTier [__link1]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=CacheEntry [__link2]: https://docs.rs/cachet_tier/0.1.0/cachet_tier/?search=Error