diff --git a/async-limiter.gemspec b/async-limiter.gemspec index 980991f..5ce94ff 100644 --- a/async-limiter.gemspec +++ b/async-limiter.gemspec @@ -25,4 +25,5 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 3.3" spec.add_dependency "async", ">= 2.31" + spec.add_dependency "async-utilization", "~> 0.4" end diff --git a/lib/async/limiter/generic.rb b/lib/async/limiter/generic.rb index e2e52f1..b4378f9 100644 --- a/lib/async/limiter/generic.rb +++ b/lib/async/limiter/generic.rb @@ -6,6 +6,7 @@ require "async/task" require "async/deadline" +require "async/utilization" require "json" require_relative "timing/none" require_relative "timing/sliding_window" @@ -24,10 +25,12 @@ class Generic # Initialize a new generic limiter. # @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints. # @parameter parent [Async::Task, nil] Parent task for creating child tasks. - def initialize(timing: Timing::None, parent: nil, tags: nil) + # @parameter utilization [#metric] Registry-like object for utilization metrics. + def initialize(timing: Timing::None, parent: nil, tags: nil, utilization: Async::Utilization::Registry.new) @timing = timing @parent = parent @tags = tags + @utilization = utilization @mutex = Mutex.new end @@ -35,6 +38,9 @@ def initialize(timing: Timing::None, parent: nil, tags: nil) # @attribute [Array(String)] Tags associated with this limiter for identification or categorization. attr :tags + # @attribute [#metric] Registry-like object for utilization metrics. + attr :utilization + # @returns [Boolean] Whether this limiter is currently limiting concurrency. def limited? false @@ -67,7 +73,7 @@ def sync end # Manually acquire a resource with timing and concurrency coordination. - # + # # This method provides the core acquisition logic with support for: # - Flexible timeout handling (blocking, non-blocking, timed) # - Cost-based consumption for timing strategies diff --git a/lib/async/limiter/limited.rb b/lib/async/limiter/limited.rb index 277016c..ee1138f 100644 --- a/lib/async/limiter/limited.rb +++ b/lib/async/limiter/limited.rb @@ -20,18 +20,22 @@ module Limiter class Limited < Generic # Initialize a limited concurrency limiter. # @parameter limit [Integer] Maximum concurrent tasks allowed. - # @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints. - # @parameter parent [Async::Task, nil] Parent task for creating child tasks. + # @parameter options [Hash] Options passed to {Generic#initialize}. # @raises [ArgumentError] If limit is not positive. - def initialize(limit = 1, timing: Timing::None, parent: nil) - super(timing: timing, parent: parent) + def initialize(limit = 1, **options) + super(**options) @limit = limit @count = 0 - @waiting_count = 0 - @reacquire_waiting_count = 0 @available = ConditionVariable.new + + @acquired_count_metric = @utilization.metric(:acquired_count) + @available_count_metric = @utilization.metric(:available_count) + @waiting_count_metric = @utilization.metric(:waiting_count) + @reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count) + + update_utilization_metrics end # @attribute [Integer] The maximum number of concurrent tasks. @@ -52,12 +56,12 @@ def available_count # @returns [Integer] Current count of tasks waiting for capacity. def waiting_count - @mutex.synchronize{@waiting_count} + @waiting_count_metric.value end # @returns [Integer] Current count of reacquiring tasks waiting for capacity. def reacquire_waiting_count - @mutex.synchronize{@reacquire_waiting_count} + @reacquire_waiting_count_metric.value end # Check if a new task can be acquired. @@ -73,6 +77,7 @@ def limit=(new_limit) @mutex.synchronize do old_limit = @limit @limit = new_limit + update_utilization_metrics # Wake up waiting tasks if limit increased: @available.broadcast if new_limit > old_limit @@ -88,8 +93,8 @@ def statistics count: @count, acquired_count: @count, available_count: @limit - @count, - waiting_count: @waiting_count, - reacquire_waiting_count: @reacquire_waiting_count, + waiting_count: @waiting_count_metric.value, + reacquire_waiting_count: @reacquire_waiting_count_metric.value, timing: @timing.statistics } end @@ -103,6 +108,7 @@ def acquire_resource(deadline, reacquire: false, **options) return nil if deadline&.expired? && @count >= @limit waiting = false + acquired = false # Wait for capacity with deadline tracking while @count >= @limit @@ -110,8 +116,8 @@ def acquire_resource(deadline, reacquire: false, **options) return nil if remaining && remaining <= 0 unless waiting - @waiting_count += 1 - @reacquire_waiting_count += 1 if reacquire + @waiting_count_metric.increment + @reacquire_waiting_count_metric.increment if reacquire waiting = true end @@ -121,22 +127,33 @@ def acquire_resource(deadline, reacquire: false, **options) end @count += 1 + acquired = true return true ensure if waiting - @waiting_count -= 1 - @reacquire_waiting_count -= 1 if reacquire + @waiting_count_metric.decrement + @reacquire_waiting_count_metric.decrement if reacquire end + + update_utilization_metrics if acquired end # Release resource. def release_resource(resource) @mutex.synchronize do @count -= 1 + update_utilization_metrics @available.signal end end + + private + + def update_utilization_metrics + @acquired_count_metric.set(@count) + @available_count_metric.set(@limit - @count) + end end end end diff --git a/lib/async/limiter/queued.rb b/lib/async/limiter/queued.rb index a8c490e..fd060b9 100644 --- a/lib/async/limiter/queued.rb +++ b/lib/async/limiter/queued.rb @@ -28,13 +28,17 @@ def self.default_queue # Initialize a queue-based limiter. # @parameter queue [#push, #pop, #empty?] Thread-safe queue containing pre-existing resources. - # @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints. - # @parameter parent [Async::Task, nil] Parent task for creating child tasks. - def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil) - super(timing: timing, parent: parent) + # @parameter options [Hash] Options passed to {Generic#initialize}. + def initialize(queue = self.class.default_queue, **options) + super(**options) @queue = queue - @acquired_count = 0 - @reacquire_waiting_count = 0 + + @acquired_count_metric = @utilization.metric(:acquired_count) + @available_count_metric = @utilization.metric(:available_count) + @waiting_count_metric = @utilization.metric(:waiting_count) + @reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count) + + update_utilization_metrics end # @attribute [Queue] The queue managing resources. @@ -42,7 +46,7 @@ def initialize(queue = self.class.default_queue, timing: Timing::None, parent: n # @returns [Integer] Current count of acquired resources. def acquired_count - @mutex.synchronize{@acquired_count} + @acquired_count_metric.value end # @returns [Integer] Current count of available resources. @@ -57,7 +61,7 @@ def waiting_count # @returns [Integer] Current count of reacquiring tasks waiting for resources. def reacquire_waiting_count - @mutex.synchronize{@reacquire_waiting_count} + @reacquire_waiting_count_metric.value end # Check if a new task can be acquired. @@ -73,6 +77,8 @@ def expand(count, value = true) count.times do @queue.push(value) end + + update_utilization_metrics end # Get current limiter statistics. @@ -82,10 +88,10 @@ def statistics { waiting: @queue.waiting_count, available: @queue.size, - acquired_count: @acquired_count, + acquired_count: @acquired_count_metric.value, available_count: @queue.size, waiting_count: @queue.waiting_count, - reacquire_waiting_count: @reacquire_waiting_count, + reacquire_waiting_count: @reacquire_waiting_count_metric.value, timing: @timing.statistics } end @@ -95,25 +101,36 @@ def statistics # Acquire a resource from the queue with optional deadline. def acquire_resource(deadline, reacquire: false, **options) - @reacquire_waiting_count += 1 if reacquire + @reacquire_waiting_count_metric.increment if reacquire + update_utilization_metrics if reacquire @mutex.unlock resource = @queue.pop(timeout: deadline&.remaining, **options) return resource ensure @mutex.lock - @reacquire_waiting_count -= 1 if reacquire - @acquired_count += 1 if resource + @reacquire_waiting_count_metric.decrement if reacquire + @acquired_count_metric.increment if resource + update_utilization_metrics if reacquire || resource end # Release a previously acquired resource back to the queue. def release_resource(value) @mutex.synchronize do - @acquired_count -= 1 if @acquired_count > 0 + @acquired_count_metric.decrement if @acquired_count_metric.value > 0 + update_utilization_metrics end # Return a default resource to the queue: @queue.push(value) + update_utilization_metrics + end + + private + + def update_utilization_metrics + @available_count_metric.set(@queue.size) + @waiting_count_metric.set(@queue.waiting_count) end end end diff --git a/releases.md b/releases.md index 621691d..5be3274 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Add `async-utilization` metrics for limiter telemetry counters. + ## v2.1.0 - Add telemetry counters to `Async::Limiter::Limited` and `Async::Limiter::Queued`: `acquired_count`, `available_count`, `waiting_count`, and `reacquire_waiting_count` for observability into limiter state. diff --git a/test/async/limiter/token.rb b/test/async/limiter/token.rb new file mode 100644 index 0000000..2679c13 --- /dev/null +++ b/test/async/limiter/token.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Shopify Inc. +# Copyright, 2026, by Samuel Williams. + +require "async/limiter" + +describe Async::Limiter::Token do + let(:limiter) {Async::Limiter::Generic.new} + + it "reports whether it has acquired a resource" do + token = Async::Limiter::Token.acquire(limiter) + + expect(token).to be(:acquired?) + expect(token).not.to be(:released?) + + token.release + + expect(token).not.to be(:acquired?) + expect(token).to be(:released?) + end +end diff --git a/test/async/limiter/utilization.rb b/test/async/limiter/utilization.rb new file mode 100644 index 0000000..c8220d1 --- /dev/null +++ b/test/async/limiter/utilization.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Shopify Inc. +# Copyright, 2026, by Samuel Williams. + +require "async/limiter" +require "async/queue" +require "async/utilization" +require "sus/fixtures/async/scheduler_context" + +describe "Async::Limiter utilization metrics" do + let(:registry) {Async::Utilization::Registry.new} + + with Async::Limiter::Limited do + let(:limiter) {Async::Limiter::Limited.new(2, utilization: registry.namespace(:limited))} + + it "initializes utilization metrics" do + limiter + + expect(limiter.acquired_count).to be == 0 + expect(limiter.available_count).to be == 2 + expect(limiter.waiting_count).to be == 0 + expect(limiter.reacquire_waiting_count).to be == 0 + + expect(registry.values).to have_keys( + limited_acquired_count: be == 0, + limited_available_count: be == 2, + limited_waiting_count: be == 0, + limited_reacquire_waiting_count: be == 0 + ) + end + + it "updates utilization metrics when resources are acquired and released" do + resource = limiter.acquire + + expect(limiter.acquired_count).to be == 1 + expect(limiter.available_count).to be == 1 + + expect(registry.values).to have_keys( + limited_acquired_count: be == 1, + limited_available_count: be == 1 + ) + + limiter.release(resource) + + expect(registry.values).to have_keys( + limited_acquired_count: be == 0, + limited_available_count: be == 2 + ) + end + + it "updates utilization metrics when the limit changes" do + limiter.limit = 3 + + expect(registry.values).to have_keys( + limited_acquired_count: be == 0, + limited_available_count: be == 3 + ) + end + + it "updates waiting utilization metrics" do + limiter.acquire + limiter.acquire + + thread = Thread.new do + limiter.acquire(reacquire: true) + end + + Thread.pass until registry.values[:limited_reacquire_waiting_count] == 1 + + expect(registry.values).to have_keys( + limited_waiting_count: be == 1, + limited_reacquire_waiting_count: be == 1 + ) + + limiter.release + expect(thread.value).to be == true + + expect(registry.values).to have_keys( + limited_waiting_count: be == 0, + limited_reacquire_waiting_count: be == 0 + ) + end + end + + with Async::Limiter::Queued do + include Sus::Fixtures::Async::SchedulerContext + + let(:queue) {Async::Queue.new} + let(:limiter) {Async::Limiter::Queued.new(queue, utilization: registry.namespace(:queued))} + + it "initializes utilization metrics" do + limiter + + expect(limiter.acquired_count).to be == 0 + expect(limiter.available_count).to be == 0 + expect(limiter.waiting_count).to be == 0 + expect(limiter.reacquire_waiting_count).to be == 0 + + expect(registry.values).to have_keys( + queued_acquired_count: be == 0, + queued_available_count: be == 0, + queued_waiting_count: be == 0, + queued_reacquire_waiting_count: be == 0 + ) + end + + it "updates utilization metrics when resources are acquired and released" do + limiter.release("resource") + + expect(limiter.available_count).to be == 1 + + expect(registry.values).to have_keys( + queued_acquired_count: be == 0, + queued_available_count: be == 1 + ) + + resource = limiter.acquire(timeout: 0) + expect(resource).to be == "resource" + + expect(limiter.acquired_count).to be == 1 + expect(limiter.available_count).to be == 0 + + expect(registry.values).to have_keys( + queued_acquired_count: be == 1, + queued_available_count: be == 0 + ) + + limiter.release(resource) + + expect(registry.values).to have_keys( + queued_acquired_count: be == 0, + queued_available_count: be == 1 + ) + end + + it "updates reacquire utilization metrics" do + task = reactor.async do + limiter.acquire(reacquire: true) + end + + sleep 0.01 until registry.values[:queued_reacquire_waiting_count] == 1 + + expect(limiter.waiting_count).to be == 1 + expect(limiter.reacquire_waiting_count).to be == 1 + + expect(registry.values).to have_keys( + queued_reacquire_waiting_count: be == 1 + ) + + limiter.release("resource") + + expect(task.wait).to be == "resource" + + expect(registry.values).to have_keys( + queued_acquired_count: be == 1, + queued_reacquire_waiting_count: be == 0 + ) + end + end +end