Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,20 @@ workers will sleep for 5 seconds.

```ruby
# The max number of jobs a worker may lock at a time (also the size of the thread pool):
# NOTE: This should be less than your ActiveRecord connection pool size. The worker process
# itself needs one connection for polling and locking, so if max_claims >= pool_size, a job
# thread that cannot check out a connection within the checkout timeout may raise
# ActiveRecord::ConnectionTimeoutError. The worker will log a warning at startup if this
# is detected.
#
# The general rule is: if each job thread needs N connections and max_claims is M,
# your pool should have at least N * (M + 1) connections. The most common case is N=1
# (one connection per thread), so pool_size should be at least max_claims + 1.
#
# For example, if your pool size is 5, set max_claims to 4:
#
# # config/initializers/delayed_job.rb
# Delayed::Worker.max_claims = ActiveRecord::Base.connection_pool.size - 1
Delayed::Worker.max_claims = 5

# The number of jobs to which a worker may "read ahead" when locking jobs (mysql only!):
Expand Down
20 changes: 20 additions & 0 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def name
# Setting the name to nil will reset the default worker name
attr_writer :name

def start
check_connection_pool_config!
super
end

def run!
@realtime = Benchmark.realtime do
@result = work_off
Expand Down Expand Up @@ -235,6 +240,21 @@ def reload!
Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check!
end

def check_connection_pool_config!
return unless Delayed::Job.respond_to?(:connection_pool)

pool_size = Delayed::Job.connection_pool.size
return unless pool_size
return if self.class.max_claims < pool_size

say "WARNING: Delayed::Worker.max_claims (#{self.class.max_claims}) >= ActiveRecord connection pool size (#{pool_size}). " \
"The worker process itself also needs a connection for polling and locking, so at least one job thread " \
"will likely fail with ActiveRecord::ConnectionTimeoutError. " \
"Set Delayed::Worker.max_claims to #{[pool_size - 1, 1].max} or less, or increase your connection pool size.", 'warn'
rescue StandardError
nil
end

def clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
Expand Down
77 changes: 77 additions & 0 deletions spec/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,83 @@
expect(performances).to eq [true, nil, nil]
expect(Delayed::Job).to have_received(:reserve)
end

describe 'connection pool config check' do
let(:connection_pool) { instance_double(ActiveRecord::ConnectionAdapters::ConnectionPool, size: pool_size) }

around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

before do
allow(Delayed.logger).to receive(:warn)
subject.send(:stop) # prevent start from running more than one loop
allow(Delayed::Job).to receive(:reserve).and_return([])
allow(Delayed::Job).to receive(:connection_pool).and_return(connection_pool)
allow(Delayed::Job).to receive(:clear_locks!)
end

context 'when max_claims equals pool size' do
let(:max_claims) { 5 }
let(:pool_size) { 5 }

it 'logs a warning at startup' do
subject.start
expect(Delayed.logger).to have_received(:warn).with(
include(
'WARNING: Delayed::Worker.max_claims (5) >= ActiveRecord connection pool size (5). ' \
'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \
'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \
'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.',
),
)
end
end

context 'when max_claims exceeds pool size' do
let(:max_claims) { 6 }
let(:pool_size) { 5 }

it 'logs a warning at startup' do
subject.start
expect(Delayed.logger).to have_received(:warn).with(
include(
'WARNING: Delayed::Worker.max_claims (6) >= ActiveRecord connection pool size (5). ' \
'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \
'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \
'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.',
),
)
end
end

context 'when max_claims is less than pool size' do
let(:max_claims) { 4 }
let(:pool_size) { 5 }

it 'does not log a warning' do
subject.start
expect(Delayed.logger).not_to have_received(:warn).with(/WARNING/i)
end
end

context 'when connection_pool introspection raises' do
let(:max_claims) { 5 }
let(:pool_size) { 5 }

before do
allow(Delayed::Job).to receive(:connection_pool).and_raise(StandardError, 'unavailable')
end

it 'starts without raising' do
expect { subject.start }.not_to raise_error
end
end
end
end

# rubocop:disable RSpec/SubjectStub
Expand Down