diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 21b023bc..32dd5acf 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,18 +24,17 @@ jobs: fail-fast: false matrix: ruby-version: - - 3.1.6 - - 3.2.0 - - 3.2.4 - - 3.3.0 - - 3.3.1 - - 3.3.2 - - 3.3.4 - - 3.3.5 - - 3.3.6 - - 3.4.0 - - 3.4.1 + - 3.1 + - 3.2 + - 3.3 + - 3.4 database: [ mysql, postgres, sqlite ] + gemfile: [ rails_7_1, rails_7_2, rails_8_0, rails_main ] + exclude: + - ruby-version: "3.1" + gemfile: rails_8_0 + - ruby-version: "3.1" + gemfile: rails_main services: mysql: image: mysql:8.0.31 @@ -52,6 +51,7 @@ jobs: - 55432:5432 env: TARGET_DB: ${{ matrix.database }} + BUNDLE_GEMFILE: ${{ github.workspace }}/gemfiles/${{ matrix.gemfile }}.gemfile steps: - name: Checkout code uses: actions/checkout@v4 @@ -60,6 +60,9 @@ jobs: with: ruby-version: ${{ matrix.ruby-version }} bundler-cache: true + - name: Update to latest Rails + run: | + bundle update railties - name: Setup test database run: | bin/rails db:setup diff --git a/.gitignore b/.gitignore index 3cbf5eae..86382d5a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /.bundle/ /doc/ +/gemfiles/*.lock /log/*.log /pkg/ /tmp/ diff --git a/Appraisals b/Appraisals new file mode 100644 index 00000000..24860528 --- /dev/null +++ b/Appraisals @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +appraise "rails-7-1" do + # rdoc 6.14 is not compatible with Ruby 3.1 + gem 'rdoc', '6.13' + gem "railties", "~> 7.1.0" +end + +appraise "rails-7-2" do + gem 'rdoc', '6.13' + gem "railties", "~> 7.2.0" +end + +appraise "rails-8-0" do + gem "railties", "~> 8.0.0" +end + +appraise "rails-main" do + gem "railties", github: "rails/rails", branch: "main" +end diff --git a/Gemfile.lock b/Gemfile.lock index 42b81dcf..248adac7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -50,6 +50,10 @@ GEM mutex_m securerandom (>= 0.3) tzinfo (~> 2.0) + appraisal (2.5.0) + bundler + rake + thor (>= 0.14.0) ast (2.4.2) base64 (0.2.0) benchmark (0.4.0) @@ -189,6 +193,7 @@ PLATFORMS x86_64-linux DEPENDENCIES + appraisal debug (~> 1.9) logger mocha diff --git a/gemfiles/rails_7_1.gemfile b/gemfiles/rails_7_1.gemfile new file mode 100644 index 00000000..fcb9a654 --- /dev/null +++ b/gemfiles/rails_7_1.gemfile @@ -0,0 +1,8 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "rdoc", "6.13" +gem "railties", "~> 7.1.0" + +gemspec path: "../" diff --git a/gemfiles/rails_7_2.gemfile b/gemfiles/rails_7_2.gemfile new file mode 100644 index 00000000..bfd04992 --- /dev/null +++ b/gemfiles/rails_7_2.gemfile @@ -0,0 +1,8 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "rdoc", "6.13" +gem "railties", "~> 7.2.0" + +gemspec path: "../" diff --git a/gemfiles/rails_8_0.gemfile b/gemfiles/rails_8_0.gemfile new file mode 100644 index 00000000..28bd9f00 --- /dev/null +++ b/gemfiles/rails_8_0.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "railties", "~> 8.0.0" + +gemspec path: "../" diff --git a/gemfiles/rails_main.gemfile b/gemfiles/rails_main.gemfile new file mode 100644 index 00000000..53b80cd9 --- /dev/null +++ b/gemfiles/rails_main.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "railties", branch: "main", git: "https://github.com/rails/rails.git" + +gemspec path: "../" diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index d3042194..fe556042 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -7,7 +7,10 @@ module QueueAdapters # To use it set the queue_adapter config to +:solid_queue+. # # Rails.application.config.active_job.queue_adapter = :solid_queue - class SolidQueueAdapter + class SolidQueueAdapter < (Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 1 ? Object : AbstractAdapter) + class_attribute :stopping, default: false, instance_writer: false + SolidQueue.on_worker_stop { self.stopping = true } + def enqueue_after_transaction_commit? true end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 17242ff9..5ffd1239 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -32,6 +32,7 @@ Gem::Specification.new do |spec| spec.add_dependency "fugit", "~> 1.11.0" spec.add_dependency "thor", "~> 1.3.1" + spec.add_development_dependency "appraisal" spec.add_development_dependency "debug", "~> 1.9" spec.add_development_dependency "mocha" spec.add_development_dependency "puma" diff --git a/test/dummy/app/jobs/continuable_job.rb b/test/dummy/app/jobs/continuable_job.rb new file mode 100644 index 00000000..00fdfdd2 --- /dev/null +++ b/test/dummy/app/jobs/continuable_job.rb @@ -0,0 +1,22 @@ +begin + require "active_job/continuation" +rescue LoadError + # Zeitwerk requires that we define the constant + class ContinuableJob; end + return +end + +class ContinuableJob < ApplicationJob + include ActiveJob::Continuable + + def perform(result, pause: 0) + step :step_one do + sleep pause if pause > 0 + result.update!(queue_name: queue_name, status: "stepped", value: "step_one") + end + step :step_two do + sleep pause if pause > 0 + result.update!(queue_name: queue_name, status: "stepped", value: "step_two") + end + end +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index dbce706d..f04c87fa 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -180,6 +180,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do + # Doesn't work with enqueue_after_transaction_commit? true on SolidQueueAdapter, but only Rails 7.2 uses this + skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 + ActiveRecord::Base.transaction do SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) SequentialUpdateResultJob.perform_later(@result, name: "B") diff --git a/test/integration/continuation_test.rb b/test/integration/continuation_test.rb new file mode 100644 index 00000000..4adb6941 --- /dev/null +++ b/test/integration/continuation_test.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require "test_helper" + +begin + require "active_job/continuation" +rescue LoadError + return +end + +class ContinuationTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + def setup + start_processes + @result = JobResult.create! + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "continuable job completes" do + ContinuableJob.perform_later(@result) + + wait_for_jobs_to_finish_for(5.seconds) + + assert_no_unfinished_jobs + assert_last_step :step_two + end + + test "continuable job can be interrupted and resumed" do + job = ContinuableJob.perform_later(@result, pause: 0.5.seconds) + + sleep 0.2.seconds + signal_process(@pid, :TERM) + + wait_for_jobs_to_be_released_for(2.seconds) + + assert_no_claimed_jobs + assert_unfinished_jobs job + assert_last_step :step_one + + ActiveJob::QueueAdapters::SolidQueueAdapter.stopping = false + start_processes + wait_for_jobs_to_finish_for(5.seconds) + + assert_no_unfinished_jobs + assert_last_step :step_two + end + + private + def assert_last_step(step) + @result.reload + assert_equal "stepped", @result.status + assert_equal step.to_s, @result.value + end + + def start_processes + default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } + dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } + @pid = run_supervisor_as_fork(workers: [ default_worker ], dispatchers: [ dispatcher ]) + wait_for_registered_processes(5, timeout: 5.second) # 3 workers working the default queue + dispatcher + supervisor + end +end diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index b2fd50da..da7feedc 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -6,121 +6,140 @@ class LifecycleHooksTest < ActiveSupport::TestCase self.use_transactional_tests = false test "run lifecycle hooks" do - SolidQueue.on_start do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_start") - end - - SolidQueue.on_stop do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_stop") - end - - SolidQueue.on_exit do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_exit") - end - - SolidQueue.on_worker_start do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start") - end - - SolidQueue.on_worker_stop do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop") - end - - SolidQueue.on_worker_exit do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit") + resetting_hooks do + SolidQueue.on_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end + + SolidQueue.on_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end + + SolidQueue.on_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end + + SolidQueue.on_worker_start do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start") + end + + SolidQueue.on_worker_stop do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop") + end + + SolidQueue.on_worker_exit do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit") + end + + SolidQueue.on_dispatcher_start do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start") + end + + SolidQueue.on_dispatcher_stop do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop") + end + + SolidQueue.on_dispatcher_exit do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit") + end + + SolidQueue.on_scheduler_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end + + SolidQueue.on_scheduler_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end + + SolidQueue.on_scheduler_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end + + pid = run_supervisor_as_fork( + workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ], + dispatchers: [ { batch_size: 100 } ], + skip_recurring: false + ) + + wait_for_registered_processes(5) + + terminate_process(pid) + wait_for_registered_processes(0) + + + results = skip_active_record_query_cache do + JobResult.where(status: :hook_called) + end + + assert_equal %w[ + supervisor_start supervisor_stop supervisor_exit + worker_first_queue_start worker_first_queue_stop worker_first_queue_exit + worker_second_queue_start worker_second_queue_stop worker_second_queue_exit + dispatcher_100_start dispatcher_100_stop dispatcher_100_exit + scheduler_start scheduler_stop scheduler_exit + ].sort, results.map(&:value).sort + + assert_equal({ "hook_called" => 15 }, results.map(&:status).tally) end + end - SolidQueue.on_dispatcher_start do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start") - end + test "handle errors on lifecycle hooks" do + resetting_hooks do + previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } + SolidQueue.on_start { raise RuntimeError, "everything is broken" } - SolidQueue.on_dispatcher_stop do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop") - end + pid = run_supervisor_as_fork + wait_for_registered_processes(4) - SolidQueue.on_dispatcher_exit do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit") - end + terminate_process(pid) + wait_for_registered_processes(0) - SolidQueue.on_scheduler_start do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_start") - end + result = skip_active_record_query_cache { JobResult.last } - SolidQueue.on_scheduler_stop do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_stop") + assert_equal "error", result.status + assert_equal "everything is broken", result.value end + end - SolidQueue.on_scheduler_exit do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_exit") + private + def resetting_hooks + reset_hooks(SolidQueue::Supervisor) do + reset_hooks(SolidQueue::Worker) do + reset_hooks(SolidQueue::Dispatcher) do + reset_hooks(SolidQueue::Scheduler) do + yield + end + end + end + end end - pid = run_supervisor_as_fork( - workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ], - dispatchers: [ { batch_size: 100 } ], - skip_recurring: false - ) - - wait_for_registered_processes(5) - - terminate_process(pid) - wait_for_registered_processes(0) - - - results = skip_active_record_query_cache do - job_results = JobResult.where(status: :hook_called) - assert_equal 15, job_results.count - job_results + def reset_hooks(process) + exit_hooks = process.lifecycle_hooks[:exit] + start_hooks = process.lifecycle_hooks[:start] + stop_hooks = process.lifecycle_hooks[:stop] + process.lifecycle_hooks[:exit] = [] + process.lifecycle_hooks[:start] = [] + process.lifecycle_hooks[:stop] = [] + yield + ensure + process.lifecycle_hooks[:exit] = exit_hooks + process.lifecycle_hooks[:start] = start_hooks + process.lifecycle_hooks[:stop] = stop_hooks end - - assert_equal({ "hook_called" => 15 }, results.map(&:status).tally) - assert_equal %w[ - supervisor_start supervisor_stop supervisor_exit - worker_first_queue_start worker_first_queue_stop worker_first_queue_exit - worker_second_queue_start worker_second_queue_stop worker_second_queue_exit - dispatcher_100_start dispatcher_100_stop dispatcher_100_exit - scheduler_start scheduler_stop scheduler_exit - ].sort, results.map(&:value).sort - ensure - SolidQueue::Supervisor.clear_hooks - SolidQueue::Worker.clear_hooks - SolidQueue::Dispatcher.clear_hooks - SolidQueue::Scheduler.clear_hooks - end - - test "handle errors on lifecycle hooks" do - previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } - SolidQueue.on_start { raise RuntimeError, "everything is broken" } - - pid = run_supervisor_as_fork - wait_for_registered_processes(4) - - terminate_process(pid) - wait_for_registered_processes(0) - - result = skip_active_record_query_cache { JobResult.last } - - assert_equal "error", result.status - assert_equal "everything is broken", result.value - ensure - SolidQueue.on_thread_error = previous_on_thread_error - SolidQueue::Supervisor.clear_hooks - SolidQueue::Worker.clear_hooks - SolidQueue::Dispatcher.clear_hooks - SolidQueue::Scheduler.clear_hooks - end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 17a658d7..486756ab 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -260,6 +260,8 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end test "enqueue successfully inside a rolled-back transaction in the app DB" do + # Doesn't work with enqueue_after_transaction_commit? true on SolidQueueAdapter, but only Rails 7.2 uses this + skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 assert_difference -> { SolidQueue::Job.count } do assert_no_difference -> { JobResult.count } do JobResult.transaction do diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..2d80a35f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -33,6 +33,7 @@ class ActiveSupport::TestCase setup do @_on_thread_error = SolidQueue.on_thread_error SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError, @_on_thread_error) + ActiveJob::QueueAdapters::SolidQueueAdapter.stopping = false end teardown do diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..8b71e7f6 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -9,6 +9,20 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) end end + def wait_for_jobs_to_be_released_for(timeout = 1.second) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::ClaimedExecution.count > 0 + end + end + end + + def assert_unfinished_jobs(*jobs) + skip_active_record_query_cache do + assert_equal jobs.map(&:job_id).sort, SolidQueue::Job.where(finished_at: nil).map(&:active_job_id).sort + end + end + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none?