diff --git a/README.md b/README.md index 0895247..50a549f 100644 --- a/README.md +++ b/README.md @@ -442,6 +442,20 @@ workers will sleep for 5 seconds. ```ruby # The max number of jobs a worker may lock at a time (also the size of the thread pool): +# NOTE: This should be less than your ActiveRecord connection pool size. The worker process +# itself needs one connection for polling and locking, so if max_claims >= pool_size, a job +# thread that cannot check out a connection within the checkout timeout may raise +# ActiveRecord::ConnectionTimeoutError. The worker will log a warning at startup if this +# is detected. +# +# The general rule is: if each job thread needs N connections and max_claims is M, +# your pool should have at least N * (M + 1) connections. The most common case is N=1 +# (one connection per thread), so pool_size should be at least max_claims + 1. +# +# For example, if your pool size is 5, set max_claims to 4: +# +# # config/initializers/delayed_job.rb +# Delayed::Worker.max_claims = ActiveRecord::Base.connection_pool.size - 1 Delayed::Worker.max_claims = 5 # The number of jobs to which a worker may "read ahead" when locking jobs (mysql only!): diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index bc0baea..174eb54 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -63,6 +63,11 @@ def name # Setting the name to nil will reset the default worker name attr_writer :name + def start + check_connection_pool_config! + super + end + def run! @realtime = Benchmark.realtime do @result = work_off @@ -235,6 +240,21 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + def check_connection_pool_config! + return unless Delayed::Job.respond_to?(:connection_pool) + + pool_size = Delayed::Job.connection_pool.size + return unless pool_size + return if self.class.max_claims < pool_size + + say "WARNING: Delayed::Worker.max_claims (#{self.class.max_claims}) >= ActiveRecord connection pool size (#{pool_size}). " \ + "The worker process itself also needs a connection for polling and locking, so at least one job thread " \ + "will likely fail with ActiveRecord::ConnectionTimeoutError. " \ + "Set Delayed::Worker.max_claims to #{[pool_size - 1, 1].max} or less, or increase your connection pool size.", 'warn' + rescue StandardError + nil + end + def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index b79160a..2b6c481 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -19,6 +19,83 @@ expect(performances).to eq [true, nil, nil] expect(Delayed::Job).to have_received(:reserve) end + + describe 'connection pool config check' do + let(:connection_pool) { instance_double(ActiveRecord::ConnectionAdapters::ConnectionPool, size: pool_size) } + + around do |example| + max_claims_was = described_class.max_claims + described_class.max_claims = max_claims + example.run + ensure + described_class.max_claims = max_claims_was + end + + before do + allow(Delayed.logger).to receive(:warn) + subject.send(:stop) # prevent start from running more than one loop + allow(Delayed::Job).to receive(:reserve).and_return([]) + allow(Delayed::Job).to receive(:connection_pool).and_return(connection_pool) + allow(Delayed::Job).to receive(:clear_locks!) + end + + context 'when max_claims equals pool size' do + let(:max_claims) { 5 } + let(:pool_size) { 5 } + + it 'logs a warning at startup' do + subject.start + expect(Delayed.logger).to have_received(:warn).with( + include( + 'WARNING: Delayed::Worker.max_claims (5) >= ActiveRecord connection pool size (5). ' \ + 'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \ + 'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \ + 'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.', + ), + ) + end + end + + context 'when max_claims exceeds pool size' do + let(:max_claims) { 6 } + let(:pool_size) { 5 } + + it 'logs a warning at startup' do + subject.start + expect(Delayed.logger).to have_received(:warn).with( + include( + 'WARNING: Delayed::Worker.max_claims (6) >= ActiveRecord connection pool size (5). ' \ + 'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \ + 'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \ + 'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.', + ), + ) + end + end + + context 'when max_claims is less than pool size' do + let(:max_claims) { 4 } + let(:pool_size) { 5 } + + it 'does not log a warning' do + subject.start + expect(Delayed.logger).not_to have_received(:warn).with(/WARNING/i) + end + end + + context 'when connection_pool introspection raises' do + let(:max_claims) { 5 } + let(:pool_size) { 5 } + + before do + allow(Delayed::Job).to receive(:connection_pool).and_raise(StandardError, 'unavailable') + end + + it 'starts without raising' do + expect { subject.start }.not_to raise_error + end + end + end end # rubocop:disable RSpec/SubjectStub