diff --git a/CHANGELOG.md b/CHANGELOG.md index bfc32a0..e077a5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,13 @@ from v1.0.0 onwards. Prior 0.x releases may include breaking changes between min ## [Unreleased] +### Added + +- `SafeMemoize::Stores::Multilevel` — multi-level (L1/L2/…) cache store that checks faster layers first and promotes values from deeper layers into shallower ones on a miss ("read-through promotion"). Reads walk the list from first (fastest) to last; writes go to every level simultaneously; deletes and clears apply to all levels. Accepts `promote_expires_in:` to control the TTL of promoted entries (default: no TTL, relying on the L1 store's own eviction). Raises `ArgumentError` if fewer than two stores are supplied. +- `store: [l1, l2]` shorthand on `memoize` — passing an `Array` of `Stores::Base` instances is automatically converted to `Stores::Multilevel.new(*stores)`, enabling multi-level caching without explicit wrapper construction. +- `SafeMemoize::Stores::XFetch` — wraps any `Stores::Base` adapter with probabilistic early expiry (the XFetch algorithm) to prevent cache stampedes. Values are stored with an envelope that includes `expires_at`; on read the XFetch formula `now − (delta × beta × log(rand)) ≥ expires_at` decides whether to return the value or `MISS` (triggering early recomputation). Configurable via `beta:` (aggressiveness scalar, default 1.0) and `delta:` (estimated computation time in seconds, default 0.1). Composes naturally with `Multilevel` and `CircuitBreaker`. +- `stampede_protection:` option on `memoize` — enables the XFetch algorithm for the per-instance in-process cache. Pass `true` (default beta 1.0) or a `Numeric` for a custom beta. Records actual computation time as `delta` on each miss so the XFetch probability adapts to real observed latency. Requires `ttl:`. Incompatible with `store:` (use `Stores::XFetch` for external stores) and `ractor_safe:`. Accepted by `safe_memoize_options` as a class-wide default. + ## [1.6.0] - 2026-06-02 ### Added diff --git a/README.md b/README.md index c1794a5..5e88325 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,8 @@ SafeMemoize uses Ruby's `prepend` mechanism. When you call `memoize :method_name - [Copy-on-read via `copy_on_read: true` — returns a `dup`/`deep_dup` on every cache read to protect shared cached state from caller mutation](#copy-on-read) - [Cache invalidation groups via `group:` — tag related methods with a group name and bust them all with a single `reset_memo_group` call](#cache-invalidation-groups) - [Circuit breaker for external stores — `Stores::CircuitBreaker` wraps any store adapter and falls back to the per-instance cache when the store is down; configurable error threshold and probe interval](#circuit-breaker-for-external-stores) +- [Multi-level (L1/L2) caching — `Stores::Multilevel` or `store: [l1, l2]` shorthand; reads from the fastest layer first and promotes on miss](#multi-level-caching) +- [Stampede protection — `stampede_protection:` option applies the XFetch algorithm to the per-instance cache; `Stores::XFetch` applies it to external stores](#stampede-protection) ## Installation @@ -1533,6 +1535,102 @@ end [↑ Back to features](#features) +## Multi-level caching + +`SafeMemoize::Stores::Multilevel` chains two or more store adapters from fastest (L1) to slowest. Reads walk the chain until a hit is found; on a miss in an earlier layer the value is fetched from the next layer and written back ("promoted") into all shallower layers. Writes and deletes reach every layer. + +```ruby +l1 = SafeMemoize::Stores::Memory.new # fast, in-process +l2 = MyRedisStore.new # slower, cross-process + +class ProductService + prepend SafeMemoize + + def catalog = fetch_catalog_from_db + memoize :catalog, store: [l1, l2], ttl: 300 # Array shorthand +end +``` + +The `store: [l1, l2]` shorthand is equivalent to `store: Stores::Multilevel.new(l1, l2)`. + +### Promotion TTL + +By default promoted entries have no TTL (the L1 store's own eviction — e.g. LRU — handles memory bounds). Set `promote_expires_in:` to give L1 entries a shorter lifetime than L2: + +```ruby +store = SafeMemoize::Stores::Multilevel.new(l1, l2, promote_expires_in: 60) +memoize :catalog, store: store, ttl: 300 +``` + +### Composition + +`Multilevel` composes with `CircuitBreaker` and `XFetch`: + +```ruby +safe_l2 = SafeMemoize::Stores::CircuitBreaker.new(MyRedisStore.new) +store = SafeMemoize::Stores::Multilevel.new(SafeMemoize::Stores::Memory.new, safe_l2) +memoize :catalog, store: store, ttl: 300 +``` + +[↑ Back to features](#features) + +## Stampede protection + +Cache stampedes (a.k.a. thundering-herd) happen when a popular entry expires and many processes simultaneously recompute it. SafeMemoize offers two mechanisms: + +### `stampede_protection:` — per-instance cache + +The `stampede_protection:` option applies the **XFetch algorithm** to the per-instance in-process cache. Instead of expiring at a hard deadline, each cache read probabilistically triggers early recomputation as the entry approaches its TTL: + +```ruby +class ApiClient + prepend SafeMemoize + + def catalog = fetch_catalog + memoize :catalog, ttl: 300, stampede_protection: true # default beta=1.0 + # or + memoize :catalog, ttl: 300, stampede_protection: 2.0 # custom beta (more aggressive) +end +``` + +The measured computation time from each cache miss is stored as `delta` and used in subsequent reads, so the XFetch probability adapts to real observed latency automatically. + +**Requirements:** `ttl:` must be set. Incompatible with `store:` (see `Stores::XFetch` below) and `ractor_safe:`. + +### `Stores::XFetch` — external stores + +For external stores (Redis, Rails.cache, etc.) wrap the adapter with `Stores::XFetch`. Values are stored with an `expires_at` envelope so the wrapper can apply the formula even though the store's `read` returns only the plain value: + +```ruby +store = SafeMemoize::Stores::XFetch.new( + MyRedisStore.new, + delta: 0.2, # estimated typical computation time (seconds) + beta: 1.5 # aggressiveness scalar +) + +class CatalogService + prepend SafeMemoize + + def products = db_fetch + memoize :products, store: store, ttl: 300 +end +``` + +**XFetch formula:** `now − (delta × beta × log(rand)) ≥ expires_at` + +A higher `beta` triggers early recomputation more aggressively. A larger `delta` (longer computation) also increases the recomputation window. + +`Stores::XFetch` composes with `Multilevel` and `CircuitBreaker`: + +```ruby +store = SafeMemoize::Stores::XFetch.new( + SafeMemoize::Stores::CircuitBreaker.new(MyRedisStore.new), + delta: 0.1 +) +``` + +[↑ Back to features](#features) + ## Per-class default options (`safe_memoize_options`) `safe_memoize_options` sets option defaults for every `memoize` call on the class, eliminating repetition when many methods share the same TTL, LRU cap, or other option. Per-call options still take precedence; class defaults take precedence over global `SafeMemoize.configure` defaults. @@ -1772,6 +1870,7 @@ Anything **not** listed here — internal modules, private methods, `@__safe_mem | `copy_on_read:` | `Boolean` | `false` | Return a `dup`/`deep_dup` of the cached value on every read; protects shared state from caller mutation; nil and frozen values pass through; incompatible with `ractor_safe:` | | `group:` | `Symbol \| String \| nil` | `nil` | Assigns the method to a named invalidation group; call `reset_memo_group` / `reset_shared_memo_group` to bust all methods in the group at once; a method belongs to at most one group | | `circuit_breaker:` | `true \| Hash \| nil` | `nil` | Wraps the configured `store:` in a `Stores::CircuitBreaker`; `true` uses defaults (`error_threshold: 5`, `probe_interval: 30`); pass a Hash to customise; requires a store to be set; does not double-wrap | +| `stampede_protection:` | `true \| Numeric \| nil` | `nil` | Enables XFetch probabilistic early expiry on the per-instance cache; `true` uses beta=1.0; pass a `Numeric` for a custom beta; requires `ttl:`; incompatible with `store:` and `ractor_safe:` | | *(extension options)* | any | — | Unknown kwargs are validated against registered extensions; raise `ArgumentError` if unclaimed | ### `memoize_all` options (class method) @@ -1789,7 +1888,7 @@ All `memoize` option keys above, plus: | Option key | Type | Default | Notes | |---|---|---|---| -| any `memoize` key except mode-switches | — | — | Accepts `ttl:`, `max_size:`, `ttl_refresh:`, `if:`, `unless:`, `key:`, `cache_bust:`, `copy_on_read:`, `namespace:`, `store:`, `group:`, `circuit_breaker:`; raises `ArgumentError` for `shared:`, `fiber_local:`, `ractor_safe:`, `shared_cache:` | +| any `memoize` key except mode-switches | — | — | Accepts `ttl:`, `max_size:`, `ttl_refresh:`, `if:`, `unless:`, `key:`, `cache_bust:`, `copy_on_read:`, `namespace:`, `store:`, `group:`, `circuit_breaker:`, `stampede_protection:`; raises `ArgumentError` for `shared:`, `fiber_local:`, `ractor_safe:`, `shared_cache:` | ### Instance methods (public) diff --git a/ROADMAP.md b/ROADMAP.md index d67fad8..1d7b89e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -10,8 +10,8 @@ This document tracks the planned evolution of SafeMemoize through v1.0.0 and bey | Feature | Description | Status | |---|---|---| -| Multi-level (L1/L2) caching | `store: [memory_store, redis_store]` — check in-process first, fall back to the remote store on miss, and promote to L1 on read; each level can have independent TTL and eviction settings | Planned | -| Stampede protection | Probabilistic early expiry (XFetch algorithm) for external stores; recomputes slightly before a TTL expires to prevent multiple processes hitting a cold miss simultaneously | Planned | +| Multi-level (L1/L2) caching | `store: [memory_store, redis_store]` — check in-process first, fall back to the remote store on miss, and promote to L1 on read; each level can have independent TTL and eviction settings | Shipped | +| Stampede protection | Probabilistic early expiry (XFetch algorithm) for external stores; recomputes slightly before a TTL expires to prevent multiple processes hitting a cold miss simultaneously | Shipped | --- diff --git a/lib/safe_memoize.rb b/lib/safe_memoize.rb index 9974945..3746899 100644 --- a/lib/safe_memoize.rb +++ b/lib/safe_memoize.rb @@ -6,6 +6,8 @@ require_relative "safe_memoize/stores/base" require_relative "safe_memoize/stores/memory" require_relative "safe_memoize/stores/circuit_breaker" +require_relative "safe_memoize/stores/multilevel" +require_relative "safe_memoize/stores/xfetch" require_relative "safe_memoize/adapters/statsd" require_relative "safe_memoize/adapters/opentelemetry" require_relative "safe_memoize/adapters/concurrent_ruby" diff --git a/lib/safe_memoize/cache_record_methods.rb b/lib/safe_memoize/cache_record_methods.rb index aa1e0a9..41ed2fb 100644 --- a/lib/safe_memoize/cache_record_methods.rb +++ b/lib/safe_memoize/cache_record_methods.rb @@ -22,8 +22,10 @@ def memo_expires_at(ttl) Process.clock_gettime(Process::CLOCK_MONOTONIC) + ttl end - def memo_record(value, expires_at:) - {value: value, expires_at: expires_at, cached_at: Process.clock_gettime(Process::CLOCK_MONOTONIC)} + def memo_record(value, expires_at:, delta: nil) + rec = {value: value, expires_at: expires_at, cached_at: Process.clock_gettime(Process::CLOCK_MONOTONIC)} + rec[:delta] = delta if delta + rec end def memo_record_value(record) diff --git a/lib/safe_memoize/class_methods.rb b/lib/safe_memoize/class_methods.rb index 46edd88..0c5112a 100644 --- a/lib/safe_memoize/class_methods.rb +++ b/lib/safe_memoize/class_methods.rb @@ -74,6 +74,12 @@ module ClassMethods # +probe_interval: 30+), or a +Hash+ with +:error_threshold+ and/or +:probe_interval+ # keys to customise. Requires a +store:+ to be set (per-method, class-level, or # global default); raises +ArgumentError+ otherwise. + # @param stampede_protection [Boolean, Numeric, nil] enables probabilistic early + # expiry (XFetch algorithm) on the per-instance in-process cache. Pass +true+ to + # use the default beta scalar (1.0), or a +Numeric+ to set a custom beta. A higher + # beta causes early recomputation to trigger more aggressively. Requires +ttl:+ to + # be set. Incompatible with +store:+ (use {Stores::XFetch} instead) and + # +ractor_safe:+. For the store-backed path, wrap the store with {Stores::XFetch}. # @return [void] # @raise [ArgumentError] if the method does not exist, or option values are invalid # @@ -94,7 +100,7 @@ module ClassMethods # @example With a custom store # STORE = SafeMemoize::Stores::Memory.new # memoize :fetch, store: STORE, ttl: 300 - def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UNSET, unless: UNSET, shared: UNSET, key: UNSET, store: UNSET, fiber_local: UNSET, ractor_safe: UNSET, namespace: UNSET, shared_cache: UNSET, cache_bust: UNSET, copy_on_read: UNSET, group: UNSET, circuit_breaker: UNSET, **extension_options) + def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UNSET, unless: UNSET, shared: UNSET, key: UNSET, store: UNSET, fiber_local: UNSET, ractor_safe: UNSET, namespace: UNSET, shared_cache: UNSET, cache_bust: UNSET, copy_on_read: UNSET, group: UNSET, circuit_breaker: UNSET, stampede_protection: UNSET, **extension_options) method_name = method_name.to_sym unless method_defined?(method_name) || private_method_defined?(method_name) || protected_method_defined?(method_name) @@ -138,6 +144,7 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN store = cls_defaults[:store] if store.equal?(UNSET) && cls_defaults.key?(:store) group = cls_defaults[:group] if group.equal?(UNSET) && cls_defaults.key?(:group) circuit_breaker = cls_defaults[:circuit_breaker] if circuit_breaker.equal?(UNSET) && cls_defaults.key?(:circuit_breaker) + stampede_protection = cls_defaults[:stampede_protection] if stampede_protection.equal?(UNSET) && cls_defaults.key?(:stampede_protection) end # Normalize remaining UNSET to original per-call defaults @@ -155,6 +162,7 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN copy_on_read = false if copy_on_read.equal?(UNSET) group = nil if group.equal?(UNSET) circuit_breaker = nil if circuit_breaker.equal?(UNSET) + stampede_protection = nil if stampede_protection.equal?(UNSET) cond_if = nil if cond_if.equal?(UNSET) cond_unless = nil if cond_unless.equal?(UNSET) @@ -199,6 +207,24 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN raise ArgumentError, "cache_bust: and key: cannot be combined" if key end + if store.is_a?(Array) + store.each_with_index do |s, i| + unless s.is_a?(SafeMemoize::Stores::Base) + raise ArgumentError, "store: Array entry [#{i}] must be a Stores::Base instance (got #{s.class})" + end + end + store = SafeMemoize::Stores::Multilevel.new(*store) + end + + if stampede_protection + unless stampede_protection == true || stampede_protection.is_a?(Numeric) + raise ArgumentError, "stampede_protection: must be true or a Numeric beta value (got #{stampede_protection.class})" + end + raise ArgumentError, "stampede_protection: requires ttl: to be set" if ttl.nil? + raise ArgumentError, "stampede_protection: is incompatible with store: — use Stores::XFetch instead" if store + raise ArgumentError, "stampede_protection: is incompatible with ractor_safe:" if ractor_safe + end + if store raise ArgumentError, "store: must be a SafeMemoize::Stores::Base instance (got #{store.class})" unless store.is_a?(SafeMemoize::Stores::Base) raise ArgumentError, "max_size: is not supported with store: — use the store adapter's own eviction" if max_size @@ -354,7 +380,15 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN fiber_cache = fiber_memo_cache! record = fiber_cache[cache_key] - if memo_record_live?(record) + sp_early = stampede_protection && memo_record_live?(record) && record[:expires_at] && + (record[:delta].to_f > 0) && + begin + sp_beta = stampede_protection.is_a?(Numeric) ? stampede_protection.to_f : 1.0 + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + now - record[:delta].to_f * sp_beta * Math.log(rand) >= record[:expires_at] + end + + if memo_record_live?(record) && !sp_early if max_size lru = fiber_memo_lru![method_name] ||= {} lru.delete(cache_key) @@ -365,7 +399,7 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN call_memo_hooks(:on_hit, cache_key, record) dup_fn.call(memo_record_value(record)) else - call_memo_hooks(:on_expire, cache_key, record) if record + call_memo_hooks(:on_expire, cache_key, record) if record && (!memo_record_live?(record) || sp_early) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) value = Adapters::OpenTelemetry.trace( @@ -373,7 +407,7 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN ) { super(*args, **kwargs) } elapsed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time - new_record = memo_record(value, expires_at: memo_expires_at(ttl)) + new_record = memo_record(value, expires_at: memo_expires_at(ttl), delta: stampede_protection ? elapsed_time : nil) if !condition || condition.call(value) if max_size @@ -432,7 +466,14 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN now = Process.clock_gettime(Process::CLOCK_MONOTONIC) record_live = record && (record[:expires_at].nil? || record[:expires_at] > now) - if record_live + sp_early = record_live && stampede_protection && record[:expires_at] && + (record[:delta].to_f > 0) && + begin + sp_beta = stampede_protection.is_a?(Numeric) ? stampede_protection.to_f : 1.0 + now - record[:delta].to_f * sp_beta * Math.log(rand) >= record[:expires_at] + end + + if record_live && !sp_early if max_size lru = klass.send(:__safe_memo_shared_lru_order__)[method_name] ||= {} lru.delete(cache_key) @@ -443,13 +484,13 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN call_memo_hooks(:on_hit, cache_key, record) dup_fn.call(record[:value]) else - call_memo_hooks(:on_expire, cache_key, record) if record && !record_live + call_memo_hooks(:on_expire, cache_key, record) if record && (!record_live || sp_early) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) value = Adapters::OpenTelemetry.trace(SafeMemoize.configuration.opentelemetry_tracer, method_name, klass.name) { super(*args, **kwargs) } elapsed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time - new_record = memo_record(value, expires_at: memo_expires_at(ttl)) + new_record = memo_record(value, expires_at: memo_expires_at(ttl), delta: stampede_protection ? elapsed_time : nil) if !condition || condition.call(value) if max_size @@ -493,10 +534,22 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN cache_key = compute_cache_key(method_name, args, kwargs) - if max_size || condition || ttl_refresh - # Locked path: used when LRU tracking, conditional storage, or TTL refresh is needed. + if max_size || condition || ttl_refresh || stampede_protection + # Locked path: LRU, conditional storage, TTL refresh, or stampede protection. + sp_beta = stampede_protection.is_a?(Numeric) ? stampede_protection.to_f : 1.0 + memo_mutex!.synchronize do record = memo_cache_record(cache_key) + + if record && stampede_protection && record[:expires_at] + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delta = record[:delta].to_f + if delta > 0 && now - delta * sp_beta * Math.log(rand) >= record[:expires_at] + call_memo_hooks(:on_expire, cache_key, record) + record = nil + end + end + if record lru_touch(method_name, cache_key) if max_size record[:expires_at] = memo_expires_at(ttl) if ttl_refresh @@ -508,7 +561,7 @@ def memoize(method_name, ttl: UNSET, max_size: UNSET, ttl_refresh: UNSET, if: UN value = Adapters::OpenTelemetry.trace(SafeMemoize.configuration.opentelemetry_tracer, method_name, self.class.name) { super(*args, **kwargs) } elapsed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time - new_record = memo_record(value, expires_at: memo_expires_at(ttl)) + new_record = memo_record(value, expires_at: memo_expires_at(ttl), delta: stampede_protection ? elapsed_time : nil) if !condition || condition.call(value) lru_evict_if_over_limit(method_name, max_size) if max_size @__safe_memo_cache__ ||= {} diff --git a/lib/safe_memoize/stores/multilevel.rb b/lib/safe_memoize/stores/multilevel.rb new file mode 100644 index 0000000..246256b --- /dev/null +++ b/lib/safe_memoize/stores/multilevel.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +module SafeMemoize + module Stores + # Multi-level (L1/L2/…) cache store that checks faster layers first and + # promotes values up on a miss, reducing latency and load on slower backends. + # + # Reads walk the store list from first (fastest) to last (slowest). On a + # miss at level N the value is read from level N+1 and written back into + # all preceding levels ("read-through promotion"). Writes always go to every + # level so all layers stay consistent. + # + # @example In-process L1 + Redis L2 + # l1 = SafeMemoize::Stores::Memory.new + # l2 = MyRedisStore.new + # + # memoize :fetch, store: SafeMemoize::Stores::Multilevel.new(l1, l2) + # + # @example Via the store: Array shorthand + # memoize :fetch, store: [l1, l2], ttl: 300 + # + # @example With a short promote_expires_in for the L1 layer + # memoize :fetch, store: SafeMemoize::Stores::Multilevel.new(l1, l2, promote_expires_in: 60) + class Multilevel < Base + # @return [Array] the ordered store layers (fastest first) + attr_reader :stores + + # @param stores [Array] two or more store instances, ordered + # from fastest (L1) to slowest (last) + # @param promote_expires_in [Numeric, nil] TTL applied when promoting a + # value from a deeper layer into a shallower one; +nil+ means no expiry + # on the promoted entry (the L1 store's own eviction — e.g. LRU — handles + # memory bounds instead) + # @raise [ArgumentError] if fewer than two stores are supplied, or any + # element is not a {Stores::Base} instance + def initialize(*stores, promote_expires_in: nil) + raise ArgumentError, "Multilevel requires at least 2 stores" if stores.size < 2 + + stores.each_with_index do |s, i| + unless s.is_a?(Base) + raise ArgumentError, + "Multilevel store[#{i}] must be a Stores::Base instance (got #{s.class})" + end + end + + @stores = stores.freeze + @promote_expires_in = promote_expires_in ? Float(promote_expires_in) : nil + end + + # Walk levels from fastest to slowest; return the first hit, promoting + # the value into all shallower layers. + def read(key) + @stores.each_with_index do |store, i| + result = store.read(key) + next if result.equal?(MISS) + + # Promote into every shallower level + @stores.first(i).each { |s| s.write(key, result, expires_in: @promote_expires_in) } + return result + end + + MISS + end + + # Write to every level simultaneously. + def write(key, value, expires_in: nil) + @stores.each { |s| s.write(key, value, expires_in: expires_in) } + end + + # Delete from every level. + def delete(key) + @stores.each { |s| s.delete(key) } + end + + # Clear every level. + def clear + @stores.each(&:clear) + end + + # Union of live keys across all levels. + def keys + @stores.flat_map(&:keys).uniq + end + end + end +end diff --git a/lib/safe_memoize/stores/xfetch.rb b/lib/safe_memoize/stores/xfetch.rb new file mode 100644 index 0000000..b458ed9 --- /dev/null +++ b/lib/safe_memoize/stores/xfetch.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +module SafeMemoize + module Stores + # Wraps any {Base} store adapter with probabilistic early expiry (the + # XFetch algorithm) to prevent cache stampedes — the thundering-herd + # problem where many processes simultaneously recompute a value the moment + # it expires under high load. + # + # Instead of waiting until {#read} returns {MISS} at the hard expiry + # deadline, the wrapper stochastically returns {MISS} slightly before + # expiry, giving one process a head start on recomputation while everyone + # else still gets the cached value. The probability of early expiry rises + # as the entry approaches its deadline. + # + # === XFetch formula + # + # early_expire = now − (delta × beta × log(rand)) ≥ expires_at + # + # * +delta+ — estimated computation time in seconds (default 0.1 s). + # Configure this to the typical duration of the underlying computation. + # * +beta+ — aggressiveness scalar (default 1.0); higher values trigger + # early recomputation more eagerly. + # + # Values are stored internally as an envelope +{value:, expires_at:}+ so + # the wrapper always knows the hard deadline regardless of what the inner + # store exposes on read. The envelope survives standard Ruby Marshal + # serialization (Redis via the +redis-store+ or +redis-client+ gems, + # Rails.cache, etc.). Values that cannot be serialized alongside a small + # hash are not supported. + # + # @example Wrap a Redis store + # store = SafeMemoize::Stores::XFetch.new( + # MyRedisStore.new, + # delta: 0.2, # typical computation time in seconds + # beta: 1.5 # slightly aggressive early expiry + # ) + # memoize :fetch, store: store, ttl: 300 + # + # @example Compose with CircuitBreaker + # store = SafeMemoize::Stores::XFetch.new( + # SafeMemoize::Stores::CircuitBreaker.new(MyRedisStore.new), + # delta: 0.1 + # ) + # memoize :fetch, store: store, ttl: 60 + class XFetch < Base + ENVELOPE_KEY = :__sm_xfetch_v1__ + + DEFAULT_BETA = 1.0 + DEFAULT_DELTA = 0.1 + + # @return [Stores::Base] the wrapped inner store + attr_reader :wrapped_store + # @return [Float] aggressiveness scalar + attr_reader :beta + # @return [Float] estimated computation time in seconds + attr_reader :delta + + # @param store [Stores::Base] the backing store to wrap + # @param beta [Numeric] aggressiveness scalar (default 1.0) + # @param delta [Numeric] estimated computation time in seconds (default 0.1) + # @raise [ArgumentError] if +store+ is not a {Stores::Base} instance, or + # if +beta+/+delta+ are not positive numbers + def initialize(store, beta: DEFAULT_BETA, delta: DEFAULT_DELTA) + unless store.is_a?(Base) + raise ArgumentError, "XFetch requires a Stores::Base instance (got #{store.class})" + end + + @wrapped_store = store + @beta = Float(beta) + @delta = Float(delta) + + raise ArgumentError, "beta must be positive" unless @beta > 0 + raise ArgumentError, "delta must be positive" unless @delta > 0 + end + + # Read from the wrapped store and apply the XFetch probabilistic check. + # + # Returns {MISS} when: + # * the inner store has no entry for +key+ + # * the stored value is not an XFetch envelope (possibly written by an + # older version or a different store wrapper) + # * the XFetch formula triggers early expiry + def read(key) + raw = @wrapped_store.read(key) + return MISS if raw.equal?(MISS) + return MISS unless envelope?(raw) + + expires_at = raw[:expires_at] + + if expires_at + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + early = now - @delta * @beta * Math.log(rand) >= expires_at + return MISS if early + end + + raw[:value] + end + + # Write the value to the wrapped store inside an XFetch envelope. + def write(key, value, expires_in: nil) + expires_at = expires_in ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + expires_in.to_f : nil + envelope = {ENVELOPE_KEY => true, :value => value, :expires_at => expires_at} + @wrapped_store.write(key, envelope, expires_in: expires_in) + end + + # Delete from the wrapped store. + def delete(key) + @wrapped_store.delete(key) + end + + # Clear the wrapped store. + def clear + @wrapped_store.clear + end + + # Returns live keys from the wrapped store. + def keys + @wrapped_store.keys + end + + private + + def envelope?(value) + value.is_a?(Hash) && value[ENVELOPE_KEY] + end + end + end +end diff --git a/sig/safe_memoize.rbs b/sig/safe_memoize.rbs index 3c0ce6b..518c0e6 100644 --- a/sig/safe_memoize.rbs +++ b/sig/safe_memoize.rbs @@ -66,7 +66,7 @@ module SafeMemoize end module ClassMethods - def memoize: (Symbol | String method_name, ?ttl: Numeric?, ?max_size: Integer?, ?ttl_refresh: bool, ?if: (^(untyped result) -> boolish)?, ?unless: (^(untyped result) -> boolish)?, ?shared: bool, ?key: (^(*untyped args, **untyped kwargs) -> untyped)?, ?store: Stores::Base?, ?fiber_local: bool, ?ractor_safe: bool, ?namespace: String?, ?shared_cache: String?, ?cache_bust: (^() -> untyped) | Symbol | nil, ?copy_on_read: bool, ?group: Symbol | String | nil, ?circuit_breaker: bool | Hash[Symbol, untyped] | nil, **untyped extension_options) -> void + def memoize: (Symbol | String method_name, ?ttl: Numeric?, ?max_size: Integer?, ?ttl_refresh: bool, ?if: (^(untyped result) -> boolish)?, ?unless: (^(untyped result) -> boolish)?, ?shared: bool, ?key: (^(*untyped args, **untyped kwargs) -> untyped)?, ?store: Stores::Base | Array[Stores::Base] | nil, ?fiber_local: bool, ?ractor_safe: bool, ?namespace: String?, ?shared_cache: String?, ?cache_bust: (^() -> untyped) | Symbol | nil, ?copy_on_read: bool, ?group: Symbol | String | nil, ?circuit_breaker: bool | Hash[Symbol, untyped] | nil, ?stampede_protection: bool | Numeric | nil, **untyped extension_options) -> void def safe_memoize_store: () -> Stores::Base? def safe_memoize_store=: (Stores::Base?) -> Stores::Base? def safe_memoize_namespace: () -> String? @@ -148,7 +148,7 @@ module SafeMemoize def memo_ttl: (Numeric? ttl) -> Float? def memo_expires_at: (Float? ttl) -> Float? - def memo_record: (untyped value, expires_at: Float?) -> memo_record + def memo_record: (untyped value, expires_at: Float?, ?delta: Float?) -> memo_record def memo_record_value: (memo_record record) -> untyped def memo_record_live?: (memo_record? record) -> bool def memo_prune_expired_entries!: (Hash[memo_key, memo_record] cache) -> void @@ -301,6 +301,38 @@ module SafeMemoize def expired?: ({ expires_at: Float?, value: untyped, cached_at: Float }) -> bool end + class Multilevel < Base + attr_reader stores: Array[Base] + + def initialize: (*Base stores, ?promote_expires_in: Numeric?) -> void + def read: (untyped key) -> untyped + def write: (untyped key, untyped value, ?expires_in: Numeric?) -> void + def delete: (untyped key) -> void + def clear: () -> void + def keys: () -> Array[untyped] + end + + class XFetch < Base + ENVELOPE_KEY: Symbol + DEFAULT_BETA: Float + DEFAULT_DELTA: Float + + attr_reader wrapped_store: Base + attr_reader beta: Float + attr_reader delta: Float + + def initialize: (Base store, ?beta: Numeric, ?delta: Numeric) -> void + def read: (untyped key) -> untyped + def write: (untyped key, untyped value, ?expires_in: Numeric?) -> void + def delete: (untyped key) -> void + def clear: () -> void + def keys: () -> Array[untyped] + + private + + def envelope?: (untyped value) -> bool + end + class CircuitBreaker < Base DEFAULT_ERROR_THRESHOLD: Integer DEFAULT_PROBE_INTERVAL: Float diff --git a/spec/safe_memoize_spec.rb b/spec/safe_memoize_spec.rb index 4f489b0..30a2014 100644 --- a/spec/safe_memoize_spec.rb +++ b/spec/safe_memoize_spec.rb @@ -2160,6 +2160,45 @@ def make_store_raise end end + context "write failure" do + it "records the failure and does not re-raise" do + allow(inner_store).to receive(:write).and_raise(RuntimeError, "write error") + expect { cb.write(:k, "v") }.not_to raise_error + expect(cb.error_count).to eq(1) + end + + it "trips the circuit after threshold write failures" do + allow(inner_store).to receive(:write).and_raise(RuntimeError) + 3.times { cb.write(:k, "v") } + expect(cb.state).to eq(:open) + end + end + + context "delete failure" do + it "records the failure and does not re-raise" do + allow(inner_store).to receive(:delete).and_raise(RuntimeError, "delete error") + expect { cb.delete(:k) }.not_to raise_error + expect(cb.error_count).to eq(1) + end + end + + context "clear failure" do + it "records the failure and does not re-raise" do + allow(inner_store).to receive(:clear).and_raise(RuntimeError, "clear error") + expect { cb.clear }.not_to raise_error + expect(cb.error_count).to eq(1) + end + end + + context "keys failure" do + it "returns an empty array and records the failure without re-raising" do + allow(inner_store).to receive(:keys).and_raise(RuntimeError, "keys error") + result = cb.keys + expect(result).to eq([]) + expect(cb.error_count).to eq(1) + end + end + context "tripping to open state" do before { make_store_raise } @@ -2446,4 +2485,487 @@ def data = 1 end end end + + describe "SafeMemoize::Stores::Multilevel" do + let(:l1) { SafeMemoize::Stores::Memory.new } + let(:l2) { SafeMemoize::Stores::Memory.new } + let(:ml) { SafeMemoize::Stores::Multilevel.new(l1, l2) } + + context "basic read-through and promotion" do + it "reads from L1 when present" do + l1.write(:k, "l1_value") + l2.write(:k, "l2_value") + expect(ml.read(:k)).to eq("l1_value") + end + + it "falls back to L2 on L1 miss and promotes to L1" do + l2.write(:k, "l2_value") + expect(ml.read(:k)).to eq("l2_value") + expect(l1.read(:k)).to eq("l2_value") + end + + it "returns MISS when all levels miss" do + expect(ml.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + end + + it "does not promote when L1 already has the entry" do + l1.write(:k, "l1_value") + ml.read(:k) + # L2 was never written — promotion only goes to shallower layers + expect(l2.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + end + end + + context "write" do + it "writes to all levels" do + ml.write(:k, "v") + expect(l1.read(:k)).to eq("v") + expect(l2.read(:k)).to eq("v") + end + + it "passes expires_in to all levels" do + ml.write(:k, "v", expires_in: 60) + expect(l1.read(:k)).to eq("v") + expect(l2.read(:k)).to eq("v") + end + end + + context "delete" do + it "deletes from all levels" do + ml.write(:k, "v") + ml.delete(:k) + expect(l1.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + expect(l2.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + end + end + + context "clear" do + it "clears all levels" do + ml.write(:a, 1) + ml.write(:b, 2) + ml.clear + expect(ml.read(:a)).to eq(SafeMemoize::Stores::Base::MISS) + expect(ml.read(:b)).to eq(SafeMemoize::Stores::Base::MISS) + end + end + + context "keys" do + it "returns the union of live keys across all levels" do + l1.write(:a, 1) + l2.write(:b, 2) + expect(ml.keys).to match_array(%i[a b]) + end + end + + context "promote_expires_in" do + it "applies promote_expires_in when promoting from L2 to L1" do + ml_with_ttl = SafeMemoize::Stores::Multilevel.new(l1, l2, promote_expires_in: 60) + l2.write(:k, "v") + ml_with_ttl.read(:k) + # L1 entry should now exist — we can't easily inspect TTL but can verify the value + expect(l1.read(:k)).to eq("v") + end + end + + context "validation" do + it "raises ArgumentError with fewer than 2 stores" do + expect do + SafeMemoize::Stores::Multilevel.new(SafeMemoize::Stores::Memory.new) + end.to raise_error(ArgumentError, /at least 2 stores/) + end + + it "raises ArgumentError if an entry is not a Stores::Base instance" do + expect do + SafeMemoize::Stores::Multilevel.new(SafeMemoize::Stores::Memory.new, "not a store") + end.to raise_error(ArgumentError, /Stores::Base/) + end + end + + context "store: Array shorthand on memoize" do + it "auto-creates a Multilevel store from an array" do + l1_store = SafeMemoize::Stores::Memory.new + l2_store = SafeMemoize::Stores::Memory.new + + klass = Class.new do + prepend SafeMemoize + + attr_reader :calls + + def initialize + @calls = 0 + end + + define_method(:fetch) do |id| + @calls += 1 + "result_#{id}" + end + + memoize :fetch, store: [l1_store, l2_store] + end + + obj = klass.new + expect(obj.fetch(1)).to eq("result_1") + expect(obj.fetch(1)).to eq("result_1") + expect(obj.calls).to eq(1) + expect(l1_store.read([:fetch, [1], {}])).to eq("result_1") + end + + it "raises ArgumentError if Array entries are not Stores::Base instances" do + expect do + Class.new do + prepend SafeMemoize + + def data = 1 + memoize :data, store: [SafeMemoize::Stores::Memory.new, "bad"] + end + end.to raise_error(ArgumentError, /Array entry/) + end + end + end + + describe "SafeMemoize::Stores::XFetch" do + let(:inner) { SafeMemoize::Stores::Memory.new } + + context "basic read/write" do + it "stores and retrieves values" do + xf = SafeMemoize::Stores::XFetch.new(inner, beta: 1.0, delta: 0.001) + xf.write(:k, "value", expires_in: 60) + expect(xf.read(:k)).to eq("value") + end + + it "returns MISS for absent keys" do + xf = SafeMemoize::Stores::XFetch.new(inner) + expect(xf.read(:missing)).to eq(SafeMemoize::Stores::Base::MISS) + end + + it "returns MISS for expired entries" do + xf = SafeMemoize::Stores::XFetch.new(inner, delta: 0.001) + xf.write(:k, "v", expires_in: 0.001) + sleep(0.01) + expect(xf.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + end + + it "stores no-TTL values and reads them back" do + xf = SafeMemoize::Stores::XFetch.new(inner) + xf.write(:k, "v") + expect(xf.read(:k)).to eq("v") + end + end + + context "early expiry (XFetch probabilistic)" do + it "eventually triggers early expiry before the hard deadline" do + # Use a very high beta so the XFetch check fires almost every read + xf = SafeMemoize::Stores::XFetch.new(inner, beta: 1_000_000.0, delta: 1.0) + xf.write(:k, "v", expires_in: 300) + + # With a huge beta, log(rand) is always negative and the formula fires + results = 20.times.map { xf.read(:k) } + expect(results).to include(SafeMemoize::Stores::Base::MISS) + end + + it "never triggers early expiry when beta is effectively zero" do + xf = SafeMemoize::Stores::XFetch.new(inner, beta: Float::EPSILON, delta: Float::EPSILON) + xf.write(:k, "v", expires_in: 300) + + # With negligible beta, should never trigger early for a 5-minute TTL + results = 50.times.map { xf.read(:k) } + expect(results).to all(eq("v")) + end + end + + context "delete / clear / keys" do + it "delete removes the entry" do + xf = SafeMemoize::Stores::XFetch.new(inner) + xf.write(:k, "v") + xf.delete(:k) + expect(xf.read(:k)).to eq(SafeMemoize::Stores::Base::MISS) + end + + it "clear removes all entries" do + xf = SafeMemoize::Stores::XFetch.new(inner) + xf.write(:a, 1) + xf.write(:b, 2) + xf.clear + expect(xf.read(:a)).to eq(SafeMemoize::Stores::Base::MISS) + end + + it "keys delegates to the inner store" do + xf = SafeMemoize::Stores::XFetch.new(inner) + xf.write(:a, 1) + xf.write(:b, 2) + expect(xf.keys.size).to eq(2) + end + end + + context "validation" do + it "raises ArgumentError if wrapped object is not a Stores::Base" do + expect do + SafeMemoize::Stores::XFetch.new("not a store") + end.to raise_error(ArgumentError, /Stores::Base/) + end + + it "raises ArgumentError for non-positive beta" do + expect do + SafeMemoize::Stores::XFetch.new(inner, beta: 0) + end.to raise_error(ArgumentError, /beta/) + end + + it "raises ArgumentError for non-positive delta" do + expect do + SafeMemoize::Stores::XFetch.new(inner, delta: 0) + end.to raise_error(ArgumentError, /delta/) + end + end + end + + describe "stampede_protection: option" do + def make_klass(ttl:, beta: true) + Class.new do + prepend SafeMemoize + + attr_reader :calls + + def initialize + @calls = 0 + end + + def compute + @calls += 1 + "value" + end + + memoize :compute, ttl: ttl, stampede_protection: beta + end + end + + context "basic behaviour" do + it "caches normally (XFetch does not fire for fresh entries)" do + klass = make_klass(ttl: 300) + obj = klass.new + obj.compute + # Fresh entry — XFetch with default beta should not fire + result = obj.compute + expect(result).to eq("value") + expect(obj.calls).to be <= 2 + end + + it "stores delta in the cache record" do + klass = make_klass(ttl: 300) + obj = klass.new + obj.compute + # Peek at the internal record + cache = obj.instance_variable_get(:@__safe_memo_cache__) + record = cache&.values&.first + expect(record).not_to be_nil + expect(record[:delta]).to be_a(Float) + expect(record[:delta]).to be >= 0 + end + end + + context "probabilistic early expiry" do + it "eventually triggers early recomputation near the expiry deadline" do + # Use a very high beta to force early expiry almost every read + klass = Class.new do + prepend SafeMemoize + + attr_reader :calls + + def initialize + @calls = 0 + end + + def compute + @calls += 1 + "value" + end + + # Very short TTL + artificially large delta forces near-certain early expiry + memoize :compute, ttl: 300, stampede_protection: 1_000_000.0 + end + + obj = klass.new + obj.compute # prime: stores delta + + # Manually set a large delta in the record so XFetch fires + cache = obj.instance_variable_get(:@__safe_memo_cache__) + cache&.each_value { |r| r[:delta] = 1000.0 } + + recomputed = 20.times.any? { + obj.compute + obj.calls > 1 + } + expect(recomputed).to be true + end + end + + context "validation" do + it "raises ArgumentError when ttl: is not set" do + expect do + Class.new do + prepend SafeMemoize + + def data = 1 + memoize :data, stampede_protection: true + end + end.to raise_error(ArgumentError, /requires ttl:/) + end + + it "raises ArgumentError when store: is also set" do + expect do + Class.new do + prepend SafeMemoize + + def data = 1 + memoize :data, ttl: 60, store: SafeMemoize::Stores::Memory.new, stampede_protection: true + end + end.to raise_error(ArgumentError, /incompatible with store:/) + end + + it "raises ArgumentError when ractor_safe: is also set" do + expect do + Class.new do + prepend SafeMemoize + + def data = 1 + memoize :data, ttl: 60, shared: true, ractor_safe: true, stampede_protection: true + end + end.to raise_error(ArgumentError, /incompatible with ractor_safe:/) + end + + it "raises ArgumentError for invalid stampede_protection value" do + expect do + Class.new do + prepend SafeMemoize + + def data = 1 + memoize :data, ttl: 60, stampede_protection: "bad" + end + end.to raise_error(ArgumentError, /must be true or a Numeric/) + end + end + + context "safe_memoize_options with stampede_protection:" do + it "applies to all subsequently memoized methods" do + klass = Class.new do + prepend SafeMemoize + + safe_memoize_options ttl: 300, stampede_protection: true + + def foo = 1 + def bar = 2 + memoize :foo + memoize :bar + end + + obj = klass.new + expect(obj.foo).to eq(1) + expect(obj.bar).to eq(2) + end + end + + context "fiber_local: true with stampede_protection:" do + it "caches normally for a fresh fiber-local entry" do + klass = Class.new do + prepend SafeMemoize + + attr_reader :calls + + def initialize + @calls = 0 + end + + def fetch + @calls += 1 + "value" + end + + memoize :fetch, ttl: 300, fiber_local: true, stampede_protection: true + end + + obj = klass.new + expect(obj.fetch).to eq("value") + expect(obj.fetch).to eq("value") + expect(obj.calls).to eq(1) + end + + it "triggers early recomputation when delta is large and beta is high" do + klass = Class.new do + prepend SafeMemoize + + attr_reader :calls + + def initialize + @calls = 0 + end + + def fetch + @calls += 1 + "value" + end + + memoize :fetch, ttl: 300, fiber_local: true, stampede_protection: 1_000_000.0 + end + + obj = klass.new + obj.fetch # prime with a delta + + # Force a large delta so XFetch fires + store = Fiber[:__safe_memoize__] + store&.dig(obj.object_id, :cache)&.each_value { |r| r[:delta] = 1000.0 } + + recomputed = 20.times.any? { + obj.fetch + obj.calls > 1 + } + expect(recomputed).to be true + end + end + + context "shared: true with stampede_protection:" do + it "caches normally for a fresh shared entry" do + klass = Class.new do + prepend SafeMemoize + + def fetch + "shared_value" + end + + memoize :fetch, ttl: 300, shared: true, stampede_protection: true + end + + obj = klass.new + expect(obj.fetch).to eq("shared_value") + expect(obj.fetch).to eq("shared_value") + end + + it "triggers early recomputation in shared cache when delta is large and beta is high" do + klass = Class.new do + prepend SafeMemoize + + @call_count = 0 + class << self + attr_accessor :call_count + end + + def fetch + self.class.call_count += 1 + "value" + end + + memoize :fetch, ttl: 300, shared: true, stampede_protection: 1_000_000.0 + end + + obj = klass.new + obj.fetch # prime + + # Force a large delta in the shared cache record + shared_cache = klass.send(:__safe_memo_shared_cache__) + shared_cache.each_value { |r| r[:delta] = 1000.0 } + + calls_before = klass.call_count + 20.times { obj.fetch } + expect(klass.call_count).to be > calls_before + end + end + end end