From d4db54d4014ed56d1cace3e5ba3ca5841f7f544b Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:20:25 -0300 Subject: [PATCH 1/8] async mode --- lib/puma/plugin/solid_queue.rb | 70 +++++++++++----- lib/solid_queue/async_supervisor.rb | 125 ++++++++++++++++++++++++++++ lib/solid_queue/cli.rb | 2 + lib/solid_queue/configuration.rb | 11 ++- lib/solid_queue/fork_supervisor.rb | 111 ++++++++++++++++++++++++ lib/solid_queue/supervisor.rb | 111 +++--------------------- 6 files changed, 307 insertions(+), 123 deletions(-) create mode 100644 lib/solid_queue/async_supervisor.rb create mode 100644 lib/solid_queue/fork_supervisor.rb diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 434b8f65..3acfa3b4 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -1,40 +1,68 @@ require "puma/plugin" +module Puma + class DSL + def solid_queue_mode(mode = :fork) + @options[:solid_queue_mode] = mode.to_sym + end + end +end + Puma::Plugin.create do attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor def start(launcher) @log_writer = launcher.log_writer - @puma_pid = $$ + @puma_pid = $ - in_background do - monitor_solid_queue + if launcher.options[:solid_queue_mode] == :async + start_async(launcher) + else + start_forked(launcher) end + end - if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") - launcher.events.on_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start - end + private + def start_forked(launcher) + in_background do + monitor_solid_queue end - launcher.events.on_stopped { stop_solid_queue } - launcher.events.on_restart { stop_solid_queue } - else - launcher.events.after_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end + end + + launcher.events.on_stopped { stop_solid_queue } + launcher.events.on_restart { stop_solid_queue } + else + launcher.events.after_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end end + + launcher.events.after_stopped { stop_solid_queue } + launcher.events.before_restart { stop_solid_queue } end + end - launcher.events.after_stopped { stop_solid_queue } - launcher.events.before_restart { stop_solid_queue } + def start_async(launcher) + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.on_stopped { solid_queue_supervisor&.stop } + launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + else + launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.after_stopped { solid_queue_supervisor&.stop } + launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + end end - end - private def stop_solid_queue Process.waitpid(solid_queue_pid, Process::WNOHANG) log "Stopping Solid Queue..." @@ -55,7 +83,7 @@ def monitor(process_dead, message) loop do if send(process_dead) log message - Process.kill(:INT, $$) + Process.kill(:INT, $) break end sleep 2 diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb new file mode 100644 index 00000000..483c9f22 --- /dev/null +++ b/lib/solid_queue/async_supervisor.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +module SolidQueue + class AsyncSupervisor < Supervisor + private + attr_reader :threads + + def start_processes + @threads = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :async + end + + thread = Thread.new { process_instance.start } + threads[thread] = configured_process + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + processes.each(&:stop) + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do + # No-op, we just wait + end + + unless all_threads_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + threads.keys.each(&:kill) + end + end + + attr_reader :threads + + def start_processes + @threads = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :async + end + + thread = Thread.new { process_instance.start } + threads[thread] = [ process_instance, configured_process ] + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + processes.each(&:stop) + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do + # No-op, we just wait + end + + unless all_threads_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + threads.keys.each(&:kill) + end + end + + def supervised_processes + processes.map(&:to_s) + end + + def reap_and_replace_terminated_forks + # No-op in async mode, we'll check for dead threads in the supervise loop + end + + def all_threads_terminated? + threads.keys.all? { |thread| !thread.alive? } + end + + def supervise + loop do + break if stopped? + + set_procline + process_signal_queue + + unless stopped? + check_and_replace_terminated_threads + interruptible_sleep(1.second) + end + end + ensure + shutdown + end + + def check_and_replace_terminated_threads + threads.each do |thread, (process, configured_process)| + unless thread.alive? + threads.delete(thread) + start_process(configured_process) + end + end + end + + def processes + threads.values.map(&:first) + end + end +end \ No newline at end of file diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 7bfe555b..930ddaab 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -8,6 +8,8 @@ class Cli < Thor desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).", banner: "SOLID_QUEUE_CONFIG" + class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)" + class_option :recurring_schedule_file, type: :string, desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).", banner: "SOLID_QUEUE_RECURRING_SCHEDULE" diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index a002b41d..a6916d4f 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -31,8 +31,11 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" + attr_reader :mode + def initialize(**options) @options = options.with_defaults(default_options) + @mode = @options[:mode].to_s.inquiry end def configured_processes @@ -84,6 +87,7 @@ def ensure_correctly_sized_thread_pool def default_options { + mode: :fork, config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), only_work: false, @@ -110,7 +114,12 @@ def skip_recurring_tasks? def workers workers_options.flat_map do |worker_options| - processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + processes = if mode.fork? + worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + else + 1 + end + processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) } end end diff --git a/lib/solid_queue/fork_supervisor.rb b/lib/solid_queue/fork_supervisor.rb new file mode 100644 index 00000000..23ab15ed --- /dev/null +++ b/lib/solid_queue/fork_supervisor.rb @@ -0,0 +1,111 @@ +# frozen_string_literal: true + +module SolidQueue + class ForkSupervisor < Supervisor + private + attr_reader :forks, :configured_processes + + def start_processes + @forks = {} + @configured_processes = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :fork + end + + pid = fork do + process_instance.start + end + + configured_processes[pid] = configured_process + forks[pid] = process_instance + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + term_forks + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do + reap_terminated_forks + end + + unless all_forks_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + quit_forks + end + end + + def supervised_processes + forks.keys + end + + def term_forks + signal_processes(forks.keys, :TERM) + end + + def quit_forks + signal_processes(forks.keys, :QUIT) + end + + def reap_and_replace_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + replace_fork(pid, status) + end + end + + def reap_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) + handle_claimed_jobs_by(terminated_fork, status) + end + + configured_processes.delete(pid) + end + rescue SystemCallError + # All children already reaped + end + + def replace_fork(pid, status) + SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| + if terminated_fork = forks.delete(pid) + payload[:fork] = terminated_fork + handle_claimed_jobs_by(terminated_fork, status) + + start_process(configured_processes.delete(pid)) + end + end + end + + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. + def handle_claimed_jobs_by(terminated_fork, status) + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) + error = Processes::ProcessExitError.new(status) + registered_process.fail_all_claimed_executions_with(error) + end + end + + def all_forks_terminated? + forks.empty? + end + end +end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 7d010593..53dae005 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -1,5 +1,8 @@ # frozen_string_literal: true +require "solid_queue/fork_supervisor" +require "solid_queue/async_supervisor" + module SolidQueue class Supervisor < Processes::Base include LifecycleHooks @@ -13,7 +16,8 @@ def start(**options) configuration = Configuration.new(**options) if configuration.valid? - new(configuration).tap(&:start) + klass = configuration.mode.fork? ? ForkSupervisor : AsyncSupervisor + klass.new(configuration).tap(&:start) else abort configuration.errors.full_messages.join("\n") + "\nExiting..." end @@ -22,9 +26,6 @@ def start(**options) def initialize(configuration) @configuration = configuration - @forks = {} - @configured_processes = {} - super end @@ -44,7 +45,7 @@ def stop end private - attr_reader :configuration, :forks, :configured_processes + attr_reader :configuration def boot SolidQueue.instrument(:start_process, process: self) do @@ -54,10 +55,6 @@ def boot end end - def start_processes - configuration.configured_processes.each { |configured_process| start_process(configured_process) } - end - def supervise loop do break if stopped? @@ -74,45 +71,14 @@ def supervise shutdown end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :fork - end - - pid = fork do - process_instance.start - end - - configured_processes[pid] = configured_process - forks[pid] = process_instance + def start_processes + raise NotImplementedError end def set_procline procline "supervising #{supervised_processes.join(", ")}" end - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - term_forks - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do - reap_terminated_forks - end - - unless all_forks_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end - - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - quit_forks - end - end - def shutdown SolidQueue.instrument(:shutdown_process, process: self) do run_callbacks(:shutdown) do @@ -125,65 +91,8 @@ def sync_std_streams STDOUT.sync = STDERR.sync = true end - def supervised_processes - forks.keys - end - - def term_forks - signal_processes(forks.keys, :TERM) - end - - def quit_forks - signal_processes(forks.keys, :QUIT) - end - def reap_and_replace_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - replace_fork(pid, status) - end - end - - def reap_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end - - configured_processes.delete(pid) - end - rescue SystemCallError - # All children already reaped - end - - def replace_fork(pid, status) - SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| - if terminated_fork = forks.delete(pid) - payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) - - start_process(configured_processes.delete(pid)) - end - end - end - - # When a supervised fork crashes or exits we need to mark all the - # executions it had claimed as failed so that they can be retried - # by some other worker. - def handle_claimed_jobs_by(terminated_fork, status) - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) - end - end - - def all_forks_terminated? - forks.empty? + # No-op by default, implemented in ForkSupervisor end end -end +end \ No newline at end of file From 8037cdea5db5ac7df34e9ec932d48c0ae3023edb Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:28:47 -0300 Subject: [PATCH 2/8] refactor: remove unused supervisor require statements --- lib/solid_queue/supervisor.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 53dae005..5c999a84 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -1,8 +1,5 @@ # frozen_string_literal: true -require "solid_queue/fork_supervisor" -require "solid_queue/async_supervisor" - module SolidQueue class Supervisor < Processes::Base include LifecycleHooks From 0876c75b1c3d29a4052b9fcfd1063ec5ec2fe19f Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:36:03 -0300 Subject: [PATCH 3/8] fix: use 6075 instead of $ to get correct process ID in solid_queue plugin --- lib/puma/plugin/solid_queue.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 3acfa3b4..4130a0b8 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -13,7 +13,7 @@ def solid_queue_mode(mode = :fork) def start(launcher) @log_writer = launcher.log_writer - @puma_pid = $ + @puma_pid = $$ if launcher.options[:solid_queue_mode] == :async start_async(launcher) From 8b0a71658e35f106eb3ca907d83b0ba60c423751 Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:41:02 -0300 Subject: [PATCH 4/8] typo --- lib/puma/plugin/solid_queue.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 4130a0b8..f67d5709 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -83,7 +83,7 @@ def monitor(process_dead, message) loop do if send(process_dead) log message - Process.kill(:INT, $) + Process.kill(:INT, $$) break end sleep 2 From f7b253d9be550c315e176563bf7059db701b38cc Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:47:00 -0300 Subject: [PATCH 5/8] fix: don't modify array during iteration --- lib/solid_queue/async_supervisor.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb index 483c9f22..ec697044 100644 --- a/lib/solid_queue/async_supervisor.rb +++ b/lib/solid_queue/async_supervisor.rb @@ -110,12 +110,17 @@ def supervise end def check_and_replace_terminated_threads + terminated_threads = {} threads.each do |thread, (process, configured_process)| unless thread.alive? - threads.delete(thread) - start_process(configured_process) + terminated_threads[thread] = configured_process end end + + terminated_threads.each do |thread, configured_process| + threads.delete(thread) + start_process(configured_process) + end end def processes From 295d02f667a05c8440075516a36a83bc5de2366c Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:56:58 -0300 Subject: [PATCH 6/8] chore: remove duplicate code --- lib/solid_queue/async_supervisor.rb | 39 ----------------------------- 1 file changed, 39 deletions(-) diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb index ec697044..7f0af9d6 100644 --- a/lib/solid_queue/async_supervisor.rb +++ b/lib/solid_queue/async_supervisor.rb @@ -11,45 +11,6 @@ def start_processes configuration.configured_processes.each { |configured_process| start_process(configured_process) } end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :async - end - - thread = Thread.new { process_instance.start } - threads[thread] = configured_process - end - - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - processes.each(&:stop) - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do - # No-op, we just wait - end - - unless all_threads_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end - - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - threads.keys.each(&:kill) - end - end - - attr_reader :threads - - def start_processes - @threads = {} - - configuration.configured_processes.each { |configured_process| start_process(configured_process) } - end - def start_process(configured_process) process_instance = configured_process.instantiate.tap do |instance| instance.supervised_by process From c64dc518cdb380e9623b3120b6149d6837869abb Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 17:00:19 -0300 Subject: [PATCH 7/8] chore: logging --- lib/solid_queue/async_supervisor.rb | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb index 7f0af9d6..329c1c07 100644 --- a/lib/solid_queue/async_supervisor.rb +++ b/lib/solid_queue/async_supervisor.rb @@ -17,7 +17,14 @@ def start_process(configured_process) instance.mode = :async end - thread = Thread.new { process_instance.start } + thread = Thread.new do + begin + process_instance.start + rescue Exception => e + puts "Error in thread: #{e.message}" + puts e.backtrace + end + end threads[thread] = [ process_instance, configured_process ] end From 66ded2de0f5981d91390e44c6d17e3ea971aa0b7 Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 17:02:21 -0300 Subject: [PATCH 8/8] refactor: remove async thread creation option from runnable process --- lib/solid_queue/processes/runnable.rb | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 33b441f6..aa5bb8f6 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -9,11 +9,7 @@ module Runnable def start boot - if running_async? - @thread = create_thread { run } - else - run - end + run end def stop