From 0101b39aeb27c4250925d74df937ebb357bfb0f6 Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Thu, 28 May 2026 09:26:21 +0545 Subject: [PATCH 1/2] wip --- lib/delayed/worker.rb | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index bc0baea..08bdfa5 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -92,10 +92,12 @@ def work_off(num = 100) break if jobs.empty? total += jobs.length - pool = Concurrent::FixedThreadPool.new(jobs.length) + pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) jobs.each do |job| pool.post do + thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do + thread_started = true success.increment if perform(job) rescue DeserializationError => e handle_unrecoverable_error(job, e) @@ -103,7 +105,7 @@ def work_off(num = 100) handle_erroring_job(job, e) end rescue Exception => e # rubocop:disable Lint/RescueException - say "Job thread crashed with #{e.class.name}: #{e.message}", 'error' + handle_thread_error(job, e, thread_started) end end @@ -215,6 +217,16 @@ def handle_unrecoverable_error(job, error) failed(job) end + def handle_thread_error(job, error, thread_started) + phase = thread_started ? 'after perform' : 'before perform' + job_say job, "thread crashed #{phase} with #{error.class.name}: #{error.message}", 'error' + return if thread_started + + handle_erroring_job(job, error) + rescue Exception => inner_error # rubocop:disable Lint/RescueException + job_say job, "could not record pre-perform thread crash: #{inner_error.class.name}: #{inner_error.message}", 'error' + end + # The backend adapter may return either a list or a single job # In some backends, this can be controlled with the `max_claims` config # Either way, we map this to an array of job instances @@ -235,6 +247,17 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + def thread_pool_size(job_count) + return job_count unless Delayed::Job.respond_to?(:connection_pool) + + pool_size = Delayed::Job.connection_pool.size + return job_count unless pool_size + + [job_count, [pool_size - 1, 1].max].min + rescue StandardError + job_count + end + def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end From 8e3a182d6614fe4f639f9a09b6833cf93e3a242d Mon Sep 17 00:00:00 2001 From: Bikash Pandey Date: Thu, 28 May 2026 10:24:25 +0545 Subject: [PATCH 2/2] verbose logging --- lib/delayed/worker.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 08bdfa5..123badc 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -88,12 +88,16 @@ def work_off(num = 100) while total < num start = clock_time + say "Attempting to reserve up to #{num - total} job(s)", 'debug' jobs = reserve_jobs + say 'No jobs reserved; exiting work_off loop', 'debug' if jobs.empty? break if jobs.empty? total += jobs.length + say "Reserved #{jobs.length} job(s); dispatching batch", 'debug' pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) jobs.each do |job| + job_say job, 'Queued for thread execution', 'debug' pool.post do thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do @@ -109,12 +113,15 @@ def work_off(num = 100) end end + say 'Waiting for worker threads to finish', 'debug' pool.shutdown pool.wait_for_termination + say "Batch finished with #{success.value} successful job(s) out of #{total} attempted so far", 'debug' break if stop? # leave if we're exiting elapsed = clock_time - start + say format('Batch elapsed %.4f seconds', elapsed), 'debug' interruptable_sleep(self.class.min_reserve_interval - elapsed) end