Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,40 @@ def work_off(num = 100)

while total < num
start = clock_time
say "Attempting to reserve up to #{num - total} job(s)", 'debug'
jobs = reserve_jobs
say 'No jobs reserved; exiting work_off loop', 'debug' if jobs.empty?
break if jobs.empty?

total += jobs.length
pool = Concurrent::FixedThreadPool.new(jobs.length)
say "Reserved #{jobs.length} job(s); dispatching batch", 'debug'
pool = Concurrent::FixedThreadPool.new(thread_pool_size(jobs.length))
jobs.each do |job|
job_say job, 'Queued for thread execution', 'debug'
pool.post do
thread_started = false
self.class.lifecycle.run_callbacks(:thread, self) do
thread_started = true
success.increment if perform(job)
rescue DeserializationError => e
handle_unrecoverable_error(job, e)
rescue Exception => e # rubocop:disable Lint/RescueException
handle_erroring_job(job, e)
end
rescue Exception => e # rubocop:disable Lint/RescueException
say "Job thread crashed with #{e.class.name}: #{e.message}", 'error'
handle_thread_error(job, e, thread_started)
end
end

say 'Waiting for worker threads to finish', 'debug'
pool.shutdown
pool.wait_for_termination
say "Batch finished with #{success.value} successful job(s) out of #{total} attempted so far", 'debug'

break if stop? # leave if we're exiting

elapsed = clock_time - start
say format('Batch elapsed %.4f seconds', elapsed), 'debug'
interruptable_sleep(self.class.min_reserve_interval - elapsed)
end

Expand Down Expand Up @@ -215,6 +224,16 @@ def handle_unrecoverable_error(job, error)
failed(job)
end

def handle_thread_error(job, error, thread_started)
phase = thread_started ? 'after perform' : 'before perform'
job_say job, "thread crashed #{phase} with #{error.class.name}: #{error.message}", 'error'
return if thread_started

handle_erroring_job(job, error)
rescue Exception => inner_error # rubocop:disable Lint/RescueException
job_say job, "could not record pre-perform thread crash: #{inner_error.class.name}: #{inner_error.message}", 'error'
end

# The backend adapter may return either a list or a single job
# In some backends, this can be controlled with the `max_claims` config
# Either way, we map this to an array of job instances
Expand All @@ -235,6 +254,17 @@ def reload!
Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check!
end

def thread_pool_size(job_count)
return job_count unless Delayed::Job.respond_to?(:connection_pool)

pool_size = Delayed::Job.connection_pool.size
return job_count unless pool_size

[job_count, [pool_size - 1, 1].max].min
rescue StandardError
job_count
end

def clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
Expand Down