Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions async-limiter.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.3"

spec.add_dependency "async", ">= 2.31"
spec.add_dependency "async-utilization", "~> 0.4"
end
10 changes: 8 additions & 2 deletions lib/async/limiter/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require "async/task"
require "async/deadline"
require "async/utilization"
require "json"
require_relative "timing/none"
require_relative "timing/sliding_window"
Expand All @@ -24,17 +25,22 @@ class Generic
# Initialize a new generic limiter.
# @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints.
# @parameter parent [Async::Task, nil] Parent task for creating child tasks.
def initialize(timing: Timing::None, parent: nil, tags: nil)
# @parameter utilization [#metric] Registry-like object for utilization metrics.
def initialize(timing: Timing::None, parent: nil, tags: nil, utilization: Async::Utilization::Registry.new)
@timing = timing
@parent = parent
@tags = tags
@utilization = utilization

@mutex = Mutex.new
end

# @attribute [Array(String)] Tags associated with this limiter for identification or categorization.
attr :tags

# @attribute [#metric] Registry-like object for utilization metrics.
attr :utilization

# @returns [Boolean] Whether this limiter is currently limiting concurrency.
def limited?
false
Expand Down Expand Up @@ -67,7 +73,7 @@ def sync
end

# Manually acquire a resource with timing and concurrency coordination.
#
#
# This method provides the core acquisition logic with support for:
# - Flexible timeout handling (blocking, non-blocking, timed)
# - Cost-based consumption for timing strategies
Expand Down
45 changes: 31 additions & 14 deletions lib/async/limiter/limited.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ module Limiter
class Limited < Generic
# Initialize a limited concurrency limiter.
# @parameter limit [Integer] Maximum concurrent tasks allowed.
# @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints.
# @parameter parent [Async::Task, nil] Parent task for creating child tasks.
# @parameter options [Hash] Options passed to {Generic#initialize}.
# @raises [ArgumentError] If limit is not positive.
def initialize(limit = 1, timing: Timing::None, parent: nil)
super(timing: timing, parent: parent)
def initialize(limit = 1, **options)
super(**options)

@limit = limit
@count = 0
@waiting_count = 0
@reacquire_waiting_count = 0

@available = ConditionVariable.new

@acquired_count_metric = @utilization.metric(:acquired_count)
@available_count_metric = @utilization.metric(:available_count)
@waiting_count_metric = @utilization.metric(:waiting_count)
@reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count)

update_utilization_metrics
end

# @attribute [Integer] The maximum number of concurrent tasks.
Expand All @@ -52,12 +56,12 @@ def available_count

# @returns [Integer] Current count of tasks waiting for capacity.
def waiting_count
@mutex.synchronize{@waiting_count}
@waiting_count_metric.value
end

# @returns [Integer] Current count of reacquiring tasks waiting for capacity.
def reacquire_waiting_count
@mutex.synchronize{@reacquire_waiting_count}
@reacquire_waiting_count_metric.value
end

# Check if a new task can be acquired.
Expand All @@ -73,6 +77,7 @@ def limit=(new_limit)
@mutex.synchronize do
old_limit = @limit
@limit = new_limit
update_utilization_metrics

# Wake up waiting tasks if limit increased:
@available.broadcast if new_limit > old_limit
Expand All @@ -88,8 +93,8 @@ def statistics
count: @count,
acquired_count: @count,
available_count: @limit - @count,
waiting_count: @waiting_count,
reacquire_waiting_count: @reacquire_waiting_count,
waiting_count: @waiting_count_metric.value,
reacquire_waiting_count: @reacquire_waiting_count_metric.value,
timing: @timing.statistics
}
end
Expand All @@ -103,15 +108,16 @@ def acquire_resource(deadline, reacquire: false, **options)
return nil if deadline&.expired? && @count >= @limit

waiting = false
acquired = false

# Wait for capacity with deadline tracking
while @count >= @limit
remaining = deadline&.remaining
return nil if remaining && remaining <= 0

unless waiting
@waiting_count += 1
@reacquire_waiting_count += 1 if reacquire
@waiting_count_metric.increment
@reacquire_waiting_count_metric.increment if reacquire
waiting = true
end

Expand All @@ -121,22 +127,33 @@ def acquire_resource(deadline, reacquire: false, **options)
end

@count += 1
acquired = true

return true
ensure
if waiting
@waiting_count -= 1
@reacquire_waiting_count -= 1 if reacquire
@waiting_count_metric.decrement
@reacquire_waiting_count_metric.decrement if reacquire
end

update_utilization_metrics if acquired
end

# Release resource.
def release_resource(resource)
@mutex.synchronize do
@count -= 1
update_utilization_metrics
@available.signal
end
end

private

def update_utilization_metrics
@acquired_count_metric.set(@count)
@available_count_metric.set(@limit - @count)
end
end
end
end
45 changes: 31 additions & 14 deletions lib/async/limiter/queued.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,25 @@ def self.default_queue

# Initialize a queue-based limiter.
# @parameter queue [#push, #pop, #empty?] Thread-safe queue containing pre-existing resources.
# @parameter timing [#acquire, #wait, #maximum_cost] Strategy for timing constraints.
# @parameter parent [Async::Task, nil] Parent task for creating child tasks.
def initialize(queue = self.class.default_queue, timing: Timing::None, parent: nil)
super(timing: timing, parent: parent)
# @parameter options [Hash] Options passed to {Generic#initialize}.
def initialize(queue = self.class.default_queue, **options)
super(**options)
@queue = queue
@acquired_count = 0
@reacquire_waiting_count = 0

@acquired_count_metric = @utilization.metric(:acquired_count)
@available_count_metric = @utilization.metric(:available_count)
@waiting_count_metric = @utilization.metric(:waiting_count)
@reacquire_waiting_count_metric = @utilization.metric(:reacquire_waiting_count)

update_utilization_metrics
end

# @attribute [Queue] The queue managing resources.
attr_reader :queue

# @returns [Integer] Current count of acquired resources.
def acquired_count
@mutex.synchronize{@acquired_count}
@acquired_count_metric.value
end

# @returns [Integer] Current count of available resources.
Expand All @@ -57,7 +61,7 @@ def waiting_count

# @returns [Integer] Current count of reacquiring tasks waiting for resources.
def reacquire_waiting_count
@mutex.synchronize{@reacquire_waiting_count}
@reacquire_waiting_count_metric.value
end

# Check if a new task can be acquired.
Expand All @@ -73,6 +77,8 @@ def expand(count, value = true)
count.times do
@queue.push(value)
end

update_utilization_metrics
end

# Get current limiter statistics.
Expand All @@ -82,10 +88,10 @@ def statistics
{
waiting: @queue.waiting_count,
available: @queue.size,
acquired_count: @acquired_count,
acquired_count: @acquired_count_metric.value,
available_count: @queue.size,
waiting_count: @queue.waiting_count,
reacquire_waiting_count: @reacquire_waiting_count,
reacquire_waiting_count: @reacquire_waiting_count_metric.value,
timing: @timing.statistics
}
end
Expand All @@ -95,25 +101,36 @@ def statistics

# Acquire a resource from the queue with optional deadline.
def acquire_resource(deadline, reacquire: false, **options)
@reacquire_waiting_count += 1 if reacquire
@reacquire_waiting_count_metric.increment if reacquire
update_utilization_metrics if reacquire

@mutex.unlock
resource = @queue.pop(timeout: deadline&.remaining, **options)
return resource
ensure
@mutex.lock
@reacquire_waiting_count -= 1 if reacquire
@acquired_count += 1 if resource
@reacquire_waiting_count_metric.decrement if reacquire
@acquired_count_metric.increment if resource
update_utilization_metrics if reacquire || resource
end

# Release a previously acquired resource back to the queue.
def release_resource(value)
@mutex.synchronize do
@acquired_count -= 1 if @acquired_count > 0
@acquired_count_metric.decrement if @acquired_count_metric.value > 0
update_utilization_metrics
end

# Return a default resource to the queue:
@queue.push(value)
update_utilization_metrics
end

private

def update_utilization_metrics
@available_count_metric.set(@queue.size)
@waiting_count_metric.set(@queue.waiting_count)
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- Add `async-utilization` metrics for limiter telemetry counters.

## v2.1.0

- Add telemetry counters to `Async::Limiter::Limited` and `Async::Limiter::Queued`: `acquired_count`, `available_count`, `waiting_count`, and `reacquire_waiting_count` for observability into limiter state.
Expand Down
23 changes: 23 additions & 0 deletions test/async/limiter/token.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Shopify Inc.
# Copyright, 2026, by Samuel Williams.

require "async/limiter"

describe Async::Limiter::Token do
let(:limiter) {Async::Limiter::Generic.new}

it "reports whether it has acquired a resource" do
token = Async::Limiter::Token.acquire(limiter)

expect(token).to be(:acquired?)
expect(token).not.to be(:released?)

token.release

expect(token).not.to be(:acquired?)
expect(token).to be(:released?)
end
end
Loading
Loading