From 0101b39aeb27c4250925d74df937ebb357bfb0f6 Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Thu, 28 May 2026 09:26:21 +0545 Subject: [PATCH 1/7] wip --- lib/delayed/worker.rb | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index bc0baea..08bdfa5 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -92,10 +92,12 @@ def work_off(num = 100) break if jobs.empty? total += jobs.length - pool = Concurrent::FixedThreadPool.new(jobs.length) + pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) jobs.each do |job| pool.post do + thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do + thread_started = true success.increment if perform(job) rescue DeserializationError => e handle_unrecoverable_error(job, e) @@ -103,7 +105,7 @@ def work_off(num = 100) handle_erroring_job(job, e) end rescue Exception => e # rubocop:disable Lint/RescueException - say "Job thread crashed with #{e.class.name}: #{e.message}", 'error' + handle_thread_error(job, e, thread_started) end end @@ -215,6 +217,16 @@ def handle_unrecoverable_error(job, error) failed(job) end + def handle_thread_error(job, error, thread_started) + phase = thread_started ? 'after perform' : 'before perform' + job_say job, "thread crashed #{phase} with #{error.class.name}: #{error.message}", 'error' + return if thread_started + + handle_erroring_job(job, error) + rescue Exception => inner_error # rubocop:disable Lint/RescueException + job_say job, "could not record pre-perform thread crash: #{inner_error.class.name}: #{inner_error.message}", 'error' + end + # The backend adapter may return either a list or a single job # In some backends, this can be controlled with the `max_claims` config # Either way, we map this to an array of job instances @@ -235,6 +247,17 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + def thread_pool_size(job_count) + return job_count unless Delayed::Job.respond_to?(:connection_pool) + + pool_size = Delayed::Job.connection_pool.size + return job_count unless pool_size + + [job_count, [pool_size - 1, 1].max].min + rescue StandardError + job_count + end + def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end From 8e3a182d6614fe4f639f9a09b6833cf93e3a242d Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Thu, 28 May 2026 10:24:25 +0545 Subject: [PATCH 2/7] verbose logging --- lib/delayed/worker.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 08bdfa5..123badc 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -88,12 +88,16 @@ def work_off(num = 100) while total < num start = clock_time + say "Attempting to reserve up to #{num - total} job(s)", 'debug' jobs = reserve_jobs + say 'No jobs reserved; exiting work_off loop', 'debug' if jobs.empty? break if jobs.empty? total += jobs.length + say "Reserved #{jobs.length} job(s); dispatching batch", 'debug' pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) jobs.each do |job| + job_say job, 'Queued for thread execution', 'debug' pool.post do thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do @@ -109,12 +113,15 @@ def work_off(num = 100) end end + say 'Waiting for worker threads to finish', 'debug' pool.shutdown pool.wait_for_termination + say "Batch finished with #{success.value} successful job(s) out of #{total} attempted so far", 'debug' break if stop? # leave if we're exiting elapsed = clock_time - start + say format('Batch elapsed %.4f seconds', elapsed), 'debug' interruptable_sleep(self.class.min_reserve_interval - elapsed) end From 2ec85e1a90ed75f219c4382c301be4c42679c6f7 Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Tue, 9 Jun 2026 12:56:51 +0545 Subject: [PATCH 3/7] revert explicit raise of thread error --- lib/delayed/worker.rb | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 123badc..c9031f7 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -224,16 +224,6 @@ def handle_unrecoverable_error(job, error) failed(job) end - def handle_thread_error(job, error, thread_started) - phase = thread_started ? 'after perform' : 'before perform' - job_say job, "thread crashed #{phase} with #{error.class.name}: #{error.message}", 'error' - return if thread_started - - handle_erroring_job(job, error) - rescue Exception => inner_error # rubocop:disable Lint/RescueException - job_say job, "could not record pre-perform thread crash: #{inner_error.class.name}: #{inner_error.message}", 'error' - end - # The backend adapter may return either a list or a single job # In some backends, this can be controlled with the `max_claims` config # Either way, we map this to an array of job instances From f472191b6ea2a6b0b44cf015eeb062c63a65113f Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Tue, 9 Jun 2026 13:27:09 +0545 Subject: [PATCH 4/7] chore: revert unnecessary changes from PR (debug logs, thread_pool_size, handle_thread_error) --- lib/delayed/worker.rb | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index c9031f7..bc0baea 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -88,20 +88,14 @@ def work_off(num = 100) while total < num start = clock_time - say "Attempting to reserve up to #{num - total} job(s)", 'debug' jobs = reserve_jobs - say 'No jobs reserved; exiting work_off loop', 'debug' if jobs.empty? break if jobs.empty? total += jobs.length - say "Reserved #{jobs.length} job(s); dispatching batch", 'debug' - pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) + pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| - job_say job, 'Queued for thread execution', 'debug' pool.post do - thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do - thread_started = true success.increment if perform(job) rescue DeserializationError => e handle_unrecoverable_error(job, e) @@ -109,19 +103,16 @@ def work_off(num = 100) handle_erroring_job(job, e) end rescue Exception => e # rubocop:disable Lint/RescueException - handle_thread_error(job, e, thread_started) + say "Job thread crashed with #{e.class.name}: #{e.message}", 'error' end end - say 'Waiting for worker threads to finish', 'debug' pool.shutdown pool.wait_for_termination - say "Batch finished with #{success.value} successful job(s) out of #{total} attempted so far", 'debug' break if stop? # leave if we're exiting elapsed = clock_time - start - say format('Batch elapsed %.4f seconds', elapsed), 'debug' interruptable_sleep(self.class.min_reserve_interval - elapsed) end @@ -244,17 +235,6 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end - def thread_pool_size(job_count) - return job_count unless Delayed::Job.respond_to?(:connection_pool) - - pool_size = Delayed::Job.connection_pool.size - return job_count unless pool_size - - [job_count, [pool_size - 1, 1].max].min - rescue StandardError - job_count - end - def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end From bc6c96f6abfe1a53e5a5fecc06aae5d38b23b64d Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Tue, 9 Jun 2026 13:28:05 +0545 Subject: [PATCH 5/7] feat: warn at startup when max_claims >= DB connection pool size --- lib/delayed/worker.rb | 19 +++++++++++ spec/worker_spec.rb | 73 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index bc0baea..9212b9d 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -63,6 +63,11 @@ def name # Setting the name to nil will reset the default worker name attr_writer :name + def start + check_connection_pool_config! + super + end + def run! @realtime = Benchmark.realtime do @result = work_off @@ -235,6 +240,20 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + def check_connection_pool_config! + return unless Delayed::Job.respond_to?(:connection_pool) + + pool_size = Delayed::Job.connection_pool.size + return unless pool_size + return if self.class.max_claims < pool_size + + say "WARNING: max_claims (#{self.class.max_claims}) >= DB connection pool size (#{pool_size}). " \ + "The worker process needs at least 1 connection for its own housekeeping, so job threads may " \ + "starve waiting for a connection. Set Delayed::Worker.max_claims to at most #{[pool_size - 1, 1].max}.", 'warn' + rescue StandardError + nil + end + def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index b79160a..1a5eb28 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -19,6 +19,79 @@ expect(performances).to eq [true, nil, nil] expect(Delayed::Job).to have_received(:reserve) end + + describe 'connection pool config check' do + let(:connection_pool) { instance_double(ActiveRecord::ConnectionAdapters::ConnectionPool, size: pool_size) } + + before do + subject.send(:stop) # prevent start from running more than one loop + allow(Delayed::Job).to receive(:reserve).and_return([]) + allow(Delayed::Job).to receive(:connection_pool).and_return(connection_pool) + allow(Delayed::Job).to receive(:clear_locks!) + allow(subject).to receive(:say).and_call_original + end + + around do |example| + max_claims_was = described_class.max_claims + described_class.max_claims = max_claims + example.run + ensure + described_class.max_claims = max_claims_was + end + + context 'when max_claims equals pool size' do + let(:max_claims) { 5 } + let(:pool_size) { 5 } + + it 'logs a warning at startup' do + subject.start + expect(subject).to have_received(:say).with( + 'WARNING: max_claims (5) >= DB connection pool size (5). ' \ + 'The worker process needs at least 1 connection for its own housekeeping, so job threads may ' \ + 'starve waiting for a connection. Set Delayed::Worker.max_claims to at most 4.', + 'warn', + ) + end + end + + context 'when max_claims exceeds pool size' do + let(:max_claims) { 6 } + let(:pool_size) { 5 } + + it 'logs a warning at startup' do + subject.start + expect(subject).to have_received(:say).with( + 'WARNING: max_claims (6) >= DB connection pool size (5). ' \ + 'The worker process needs at least 1 connection for its own housekeeping, so job threads may ' \ + 'starve waiting for a connection. Set Delayed::Worker.max_claims to at most 4.', + 'warn', + ) + end + end + + context 'when max_claims is less than pool size' do + let(:max_claims) { 4 } + let(:pool_size) { 5 } + + it 'does not log a warning' do + subject.start + expect(subject).not_to have_received(:say).with(/WARNING/i, 'warn') + end + end + + context 'when connection_pool introspection raises' do + let(:max_claims) { 5 } + let(:pool_size) { 5 } + + before do + allow(Delayed::Job).to receive(:connection_pool).and_raise(StandardError, 'unavailable') + end + + it 'starts without raising' do + expect { subject.start }.not_to raise_error + end + end + end end # rubocop:disable RSpec/SubjectStub From dea0914ef999c1f0cc926efb5fef27b43777f32c Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Tue, 9 Jun 2026 13:55:57 +0545 Subject: [PATCH 6/7] fix: resolve RSpec/SubjectStub offense in connection pool config check spec --- lib/delayed/worker.rb | 7 ++++--- spec/worker_spec.rb | 42 +++++++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 9212b9d..174eb54 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -247,9 +247,10 @@ def check_connection_pool_config! return unless pool_size return if self.class.max_claims < pool_size - say "WARNING: max_claims (#{self.class.max_claims}) >= DB connection pool size (#{pool_size}). " \ - "The worker process needs at least 1 connection for its own housekeeping, so job threads may " \ - "starve waiting for a connection. Set Delayed::Worker.max_claims to at most #{[pool_size - 1, 1].max}.", 'warn' + say "WARNING: Delayed::Worker.max_claims (#{self.class.max_claims}) >= ActiveRecord connection pool size (#{pool_size}). " \ + "The worker process itself also needs a connection for polling and locking, so at least one job thread " \ + "will likely fail with ActiveRecord::ConnectionTimeoutError. " \ + "Set Delayed::Worker.max_claims to #{[pool_size - 1, 1].max} or less, or increase your connection pool size.", 'warn' rescue StandardError nil end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index 1a5eb28..2b6c481 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -23,14 +23,6 @@ describe 'connection pool config check' do let(:connection_pool) { instance_double(ActiveRecord::ConnectionAdapters::ConnectionPool, size: pool_size) } - before do - subject.send(:stop) # prevent start from running more than one loop - allow(Delayed::Job).to receive(:reserve).and_return([]) - allow(Delayed::Job).to receive(:connection_pool).and_return(connection_pool) - allow(Delayed::Job).to receive(:clear_locks!) - allow(subject).to receive(:say).and_call_original - end - around do |example| max_claims_was = described_class.max_claims described_class.max_claims = max_claims @@ -39,17 +31,27 @@ described_class.max_claims = max_claims_was end + before do + allow(Delayed.logger).to receive(:warn) + subject.send(:stop) # prevent start from running more than one loop + allow(Delayed::Job).to receive(:reserve).and_return([]) + allow(Delayed::Job).to receive(:connection_pool).and_return(connection_pool) + allow(Delayed::Job).to receive(:clear_locks!) + end + context 'when max_claims equals pool size' do let(:max_claims) { 5 } let(:pool_size) { 5 } it 'logs a warning at startup' do subject.start - expect(subject).to have_received(:say).with( - 'WARNING: max_claims (5) >= DB connection pool size (5). ' \ - 'The worker process needs at least 1 connection for its own housekeeping, so job threads may ' \ - 'starve waiting for a connection. Set Delayed::Worker.max_claims to at most 4.', - 'warn', + expect(Delayed.logger).to have_received(:warn).with( + include( + 'WARNING: Delayed::Worker.max_claims (5) >= ActiveRecord connection pool size (5). ' \ + 'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \ + 'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \ + 'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.', + ), ) end end @@ -60,11 +62,13 @@ it 'logs a warning at startup' do subject.start - expect(subject).to have_received(:say).with( - 'WARNING: max_claims (6) >= DB connection pool size (5). ' \ - 'The worker process needs at least 1 connection for its own housekeeping, so job threads may ' \ - 'starve waiting for a connection. Set Delayed::Worker.max_claims to at most 4.', - 'warn', + expect(Delayed.logger).to have_received(:warn).with( + include( + 'WARNING: Delayed::Worker.max_claims (6) >= ActiveRecord connection pool size (5). ' \ + 'The worker process itself also needs a connection for polling and locking, so at least one job thread ' \ + 'will likely fail with ActiveRecord::ConnectionTimeoutError. ' \ + 'Set Delayed::Worker.max_claims to 4 or less, or increase your connection pool size.', + ), ) end end @@ -75,7 +79,7 @@ it 'does not log a warning' do subject.start - expect(subject).not_to have_received(:say).with(/WARNING/i, 'warn') + expect(Delayed.logger).not_to have_received(:warn).with(/WARNING/i) end end From bcc42122c4739241478548b1e8486289782e65ed Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Tue, 9 Jun 2026 14:25:59 +0545 Subject: [PATCH 7/7] document connection pool size constraint for max_claims --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 0895247..50a549f 100644 --- a/README.md +++ b/README.md @@ -442,6 +442,20 @@ workers will sleep for 5 seconds. ```ruby # The max number of jobs a worker may lock at a time (also the size of the thread pool): +# NOTE: This should be less than your ActiveRecord connection pool size. The worker process +# itself needs one connection for polling and locking, so if max_claims >= pool_size, a job +# thread that cannot check out a connection within the checkout timeout may raise +# ActiveRecord::ConnectionTimeoutError. The worker will log a warning at startup if this +# is detected. +# +# The general rule is: if each job thread needs N connections and max_claims is M, +# your pool should have at least N * (M + 1) connections. The most common case is N=1 +# (one connection per thread), so pool_size should be at least max_claims + 1. +# +# For example, if your pool size is 5, set max_claims to 4: +# +# # config/initializers/delayed_job.rb +# Delayed::Worker.max_claims = ActiveRecord::Base.connection_pool.size - 1 Delayed::Worker.max_claims = 5 # The number of jobs to which a worker may "read ahead" when locking jobs (mysql only!):