Skip to content

Commit

Permalink
Fixes for race conditions in ActiveJob concurrency extension. (benshe…
Browse files Browse the repository at this point in the history
  • Loading branch information
codyrobbins authored Aug 13, 2021
1 parent 9c2b9f7 commit 655aa43
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ module Concurrency
included do
class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}

before_enqueue do |job|
around_enqueue do |job, block|
# Always allow jobs to be retried because the current job's execution will complete momentarily
next if CurrentExecution.active_job_id == job.job_id
next(block.call) if CurrentExecution.active_job_id == job.job_id

limit = job.class.good_job_concurrency_config.fetch(:enqueue_limit, Float::INFINITY)
next if limit.blank? || (0...Float::INFINITY).exclude?(limit)
next(block.call) if limit.blank? || (0...Float::INFINITY).exclude?(limit)

key = job.good_job_concurrency_key
next if key.blank?
next(block.call) if key.blank?

GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
# TODO: Why is `unscoped` necessary? Nested scope is bleeding into subsequent query?
enqueue_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).unfinished.count
# The job has not yet been enqueued, so check if adding it will go over the limit
throw :abort if enqueue_concurrency + 1 > limit
block.call unless enqueue_concurrency + 1 > limit
end
end

Expand All @@ -41,9 +41,9 @@ module Concurrency
next if key.blank?

GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
perform_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).advisory_locked.count
allowed_active_job_ids = GoodJob::Job.unscoped.where(concurrency_key: key).advisory_locked.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")).limit(limit).pluck(:active_job_id)
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError if perform_concurrency > limit
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include? job.job_id
end
end
end
Expand Down

0 comments on commit 655aa43

Please sign in to comment.