diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index bc0baea..123badc 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -88,14 +88,20 @@ def work_off(num = 100) while total < num start = clock_time + say "Attempting to reserve up to #{num - total} job(s)", 'debug' jobs = reserve_jobs + say 'No jobs reserved; exiting work_off loop', 'debug' if jobs.empty? break if jobs.empty? total += jobs.length - pool = Concurrent::FixedThreadPool.new(jobs.length) + say "Reserved #{jobs.length} job(s); dispatching batch", 'debug' + pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length)) jobs.each do |job| + job_say job, 'Queued for thread execution', 'debug' pool.post do + thread_started = false self.class.lifecycle.run_callbacks(:thread, self) do + thread_started = true success.increment if perform(job) rescue DeserializationError => e handle_unrecoverable_error(job, e) @@ -103,16 +109,19 @@ def work_off(num = 100) handle_erroring_job(job, e) end rescue Exception => e # rubocop:disable Lint/RescueException - say "Job thread crashed with #{e.class.name}: #{e.message}", 'error' + handle_thread_error(job, e, thread_started) end end + say 'Waiting for worker threads to finish', 'debug' pool.shutdown pool.wait_for_termination + say "Batch finished with #{success.value} successful job(s) out of #{total} attempted so far", 'debug' break if stop? # leave if we're exiting elapsed = clock_time - start + say format('Batch elapsed %.4f seconds', elapsed), 'debug' interruptable_sleep(self.class.min_reserve_interval - elapsed) end @@ -215,6 +224,16 @@ def handle_unrecoverable_error(job, error) failed(job) end + def handle_thread_error(job, error, thread_started) + phase = thread_started ? 'after perform' : 'before perform' + job_say job, "thread crashed #{phase} with #{error.class.name}: #{error.message}", 'error' + return if thread_started + + handle_erroring_job(job, error) + rescue Exception => inner_error # rubocop:disable Lint/RescueException + job_say job, "could not record pre-perform thread crash: #{inner_error.class.name}: #{inner_error.message}", 'error' + end + # The backend adapter may return either a list or a single job # In some backends, this can be controlled with the `max_claims` config # Either way, we map this to an array of job instances @@ -235,6 +254,17 @@ def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + def thread_pool_size(job_count) + return job_count unless Delayed::Job.respond_to?(:connection_pool) + + pool_size = Delayed::Job.connection_pool.size + return job_count unless pool_size + + [job_count, [pool_size - 1, 1].max].min + rescue StandardError + job_count + end + def clock_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end