diff --git a/README.md b/README.md index 40cea400..1da184ca 100644 --- a/README.md +++ b/README.md @@ -426,11 +426,11 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can be configured to either be discarded or blocked. ```ruby class MyJob < ApplicationJob - limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group + limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour # ... ``` @@ -438,6 +438,9 @@ class MyJob < ApplicationJob - `to` is `1` by default. - `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well. - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. +- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration. + - (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it + - `:discard`; the job is discarded When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). @@ -480,6 +483,31 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past. +### Discarding conflicting jobs + +When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued. + +```ruby +class ConcurrentJob < ApplicationJob + limits_concurrency key: ->(record) { record }, on_conflict: :discard + + def perform(user); end +end + +enqueued_job = ConcurrentJob.perform_later(record) +# => instance of ConcurrentJob +enqueued_job.successfully_enqueued? +# => true + +second_enqueued_job = ConcurrentJob.perform_later(record) do |job| + job.successfully_enqueued? + # => false +end + +second_enqueued_job +# => false +``` + ### Performance considerations Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example: @@ -503,6 +531,10 @@ production: Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). +### Discarding concurrent jobs + + + ## Failed jobs and retries Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as: diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..df4ab405 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -2,7 +2,7 @@ module SolidQueue class Job < Record - class EnqueueError < StandardError; end + class EnqueueError < ActiveJob::EnqueueError; end include Executable, Clearable, Recurrable @@ -10,19 +10,24 @@ class EnqueueError < StandardError; end class << self def enqueue_all(active_jobs) - active_jobs_by_job_id = active_jobs.index_by(&:job_id) + enqueued_jobs_count = 0 transaction do jobs = create_all_from_active_jobs(active_jobs) - prepare_all_for_execution(jobs).tap do |enqueued_jobs| - enqueued_jobs.each do |enqueued_job| - active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id - active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true - end + prepare_all_for_execution(jobs) + jobs_by_active_job_id = jobs.index_by(&:active_job_id) + + active_jobs.each do |active_job| + job = jobs_by_active_job_id[active_job.job_id] + + active_job.provider_job_id = job&.id + active_job.enqueue_error = job&.enqueue_error + active_job.successfully_enqueued = job.present? && job.enqueue_error.nil? + enqueued_jobs_count += 1 if active_job.successfully_enqueued? end end - active_jobs.count(&:successfully_enqueued?) + enqueued_jobs_count end def enqueue(active_job, scheduled_at: Time.current) @@ -49,7 +54,7 @@ def create_from_active_job(active_job) def create_all_from_active_jobs(active_jobs) job_rows = active_jobs.map { |job| attributes_from_active_job(job) } insert_all(job_rows) - where(active_job_id: active_jobs.map(&:job_id)) + where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc) end def attributes_from_active_job(active_job) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6ae12e28..6c37cab4 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_on_conflict, :concurrency_duration, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -34,6 +34,10 @@ def blocked? end private + def discard_concurrent? + concurrency_on_conflict == :discard + end + def acquire_concurrency_lock return true unless concurrency_limited? diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..28c1ac35 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -13,6 +13,8 @@ module Executable after_create :prepare_for_execution + attr_accessor :enqueue_error + scope :finished, -> { where.not(finished_at: nil) } end @@ -37,7 +39,13 @@ def dispatch_all_at_once(jobs) end def dispatch_all_one_by_one(jobs) - jobs.each(&:dispatch) + jobs.each do |job| + begin + job.dispatch + rescue EnqueueError => e + job.enqueue_error = e + end + end end def successfully_dispatched(jobs) @@ -66,6 +74,9 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready + elsif discard_concurrent? + discard + raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.") else block end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 0ea290f6..ceb03e7e 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -11,15 +11,17 @@ module ConcurrencyControls class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit + class_attribute :concurrency_on_conflict class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration + self.concurrency_on_conflict = on_conflict end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 486756ab..cb2fb377 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -10,6 +10,14 @@ def perform(job_result) end end + class DiscardedNonOverlappingJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard + end + + class DiscardedOverlappingJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard + end + class NonOverlappingGroupedJob1 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end @@ -18,8 +26,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end + class DiscardedNonOverlappingGroupedJob1 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + + class DiscardedNonOverlappingGroupedJob2 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + setup do @result = JobResult.create!(queue_name: "default") + @discarded_concurrent_error = SolidQueue::Job::EnqueueError.new( + "Dispatched job discarded due to concurrent configuration." + ) end test "enqueue active job to be executed right away" do @@ -98,6 +117,78 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal active_job.concurrency_key, job.concurrency_key end + test "enqueue jobs with discarding concurrency controls" do + assert_ready do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + + assert_not DiscardedNonOverlappingJob.perform_later(@result, name: "B") do |overlapping_active_job| + assert_not overlapping_active_job.successfully_enqueued? + assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error + end + end + end + + test "enqueue scheduled job with discarding concurrency controls" do + assert_ready do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + end + + scheduled_job_id = nil + + assert_scheduled do + scheduled_active_job = DiscardedNonOverlappingJob.set(wait: 0.5.seconds).perform_later(@result, name: "B") + assert scheduled_active_job.successfully_enqueued? + assert_nil scheduled_active_job.enqueue_error + + scheduled_job_id = scheduled_active_job.provider_job_id + end + + scheduled_job = SolidQueue::Job.find(scheduled_job_id) + wait_for { scheduled_job.due? } + + dispatched = SolidQueue::ScheduledExecution.dispatch_next_batch(10) + assert_equal 0, dispatched + assert_raises(ActiveRecord::RecordNotFound) { scheduled_job.reload } + end + + test "enqueues jobs in bulk with discarding concurrency controls" do + jobs = [ + job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"), + job_2 = DiscardedNonOverlappingJob.new(@result, name: "B") + ] + + assert_job_counts(ready: 1, discarded: 1) do + enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs) + assert_equal enqueued_jobs_count, 1 + end + + assert job_1.successfully_enqueued? + assert_not job_2.successfully_enqueued? + assert_equal SolidQueue::Job::EnqueueError, job_2.enqueue_error.class + assert_equal @discarded_concurrent_error.message, job_2.enqueue_error.message + end + + test "enqueue jobs with discarding concurrency controls when below limit" do + assert_job_counts(ready: 2) do + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + end + + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "B") + assert active_job.successfully_enqueued? + end + + assert_not DiscardedOverlappingJob.perform_later(@result, name: "C") do |overlapping_active_job| + assert_not overlapping_active_job.successfully_enqueued? + assert_equal @discarded_concurrent_error, overlapping_active_job.enqueue_error + end + end + end + test "enqueue jobs with concurrency controls in the same concurrency group" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") @@ -112,6 +203,23 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end end + test "enqueue jobs with discarding concurrency controls in the same concurrency group" do + assert_job_counts(ready: 1) do + assert_ready do + active_job = DiscardedNonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert active_job.successfully_enqueued? + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_not DiscardedNonOverlappingGroupedJob2.perform_later(@result, name: "B") do |blocked_active_job| + assert_not blocked_active_job.successfully_enqueued? + assert_equal 1, blocked_active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", blocked_active_job.concurrency_key + end + end + end + test "enqueue multiple jobs" do active_jobs = [ AddToBufferJob.new(2), @@ -249,13 +357,15 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "raise EnqueueError when there's an ActiveRecordError" do SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked) - active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") assert_raises SolidQueue::Job::EnqueueError do + active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") SolidQueue::Job.enqueue(active_job) end - assert_raises SolidQueue::Job::EnqueueError do - AddToBufferJob.perform_later(1) + # #perform_later doesn't raise ActiveJob::EnqueueError, and instead set's successfully_enqueued? to false + assert_not AddToBufferJob.perform_later(1) do |active_job| + assert_not active_job.successfully_enqueued? + assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class end end @@ -291,8 +401,12 @@ def assert_blocked(&block) assert SolidQueue::Job.last.blocked? end - def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block) - assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do + def assert_discarded(&block) + assert_job_counts(discarded: 1, &block) + end + + def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block) + assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block