diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 434b8f65..f67d5709 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -1,5 +1,13 @@ require "puma/plugin" +module Puma + class DSL + def solid_queue_mode(mode = :fork) + @options[:solid_queue_mode] = mode.to_sym + end + end +end + Puma::Plugin.create do attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor @@ -7,34 +15,54 @@ def start(launcher) @log_writer = launcher.log_writer @puma_pid = $$ - in_background do - monitor_solid_queue + if launcher.options[:solid_queue_mode] == :async + start_async(launcher) + else + start_forked(launcher) end + end - if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") - launcher.events.on_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start - end + private + def start_forked(launcher) + in_background do + monitor_solid_queue end - launcher.events.on_stopped { stop_solid_queue } - launcher.events.on_restart { stop_solid_queue } - else - launcher.events.after_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end + end + + launcher.events.on_stopped { stop_solid_queue } + launcher.events.on_restart { stop_solid_queue } + else + launcher.events.after_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end end + + launcher.events.after_stopped { stop_solid_queue } + launcher.events.before_restart { stop_solid_queue } end + end - launcher.events.after_stopped { stop_solid_queue } - launcher.events.before_restart { stop_solid_queue } + def start_async(launcher) + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.on_stopped { solid_queue_supervisor&.stop } + launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + else + launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.after_stopped { solid_queue_supervisor&.stop } + launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + end end - end - private def stop_solid_queue Process.waitpid(solid_queue_pid, Process::WNOHANG) log "Stopping Solid Queue..." diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb new file mode 100644 index 00000000..329c1c07 --- /dev/null +++ b/lib/solid_queue/async_supervisor.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module SolidQueue + class AsyncSupervisor < Supervisor + private + attr_reader :threads + + def start_processes + @threads = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :async + end + + thread = Thread.new do + begin + process_instance.start + rescue Exception => e + puts "Error in thread: #{e.message}" + puts e.backtrace + end + end + threads[thread] = [ process_instance, configured_process ] + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + processes.each(&:stop) + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do + # No-op, we just wait + end + + unless all_threads_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + threads.keys.each(&:kill) + end + end + + def supervised_processes + processes.map(&:to_s) + end + + def reap_and_replace_terminated_forks + # No-op in async mode, we'll check for dead threads in the supervise loop + end + + def all_threads_terminated? + threads.keys.all? { |thread| !thread.alive? } + end + + def supervise + loop do + break if stopped? + + set_procline + process_signal_queue + + unless stopped? + check_and_replace_terminated_threads + interruptible_sleep(1.second) + end + end + ensure + shutdown + end + + def check_and_replace_terminated_threads + terminated_threads = {} + threads.each do |thread, (process, configured_process)| + unless thread.alive? + terminated_threads[thread] = configured_process + end + end + + terminated_threads.each do |thread, configured_process| + threads.delete(thread) + start_process(configured_process) + end + end + + def processes + threads.values.map(&:first) + end + end +end \ No newline at end of file diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 7bfe555b..930ddaab 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -8,6 +8,8 @@ class Cli < Thor desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).", banner: "SOLID_QUEUE_CONFIG" + class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)" + class_option :recurring_schedule_file, type: :string, desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).", banner: "SOLID_QUEUE_RECURRING_SCHEDULE" diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index a002b41d..a6916d4f 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -31,8 +31,11 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" + attr_reader :mode + def initialize(**options) @options = options.with_defaults(default_options) + @mode = @options[:mode].to_s.inquiry end def configured_processes @@ -84,6 +87,7 @@ def ensure_correctly_sized_thread_pool def default_options { + mode: :fork, config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), only_work: false, @@ -110,7 +114,12 @@ def skip_recurring_tasks? def workers workers_options.flat_map do |worker_options| - processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + processes = if mode.fork? + worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + else + 1 + end + processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) } end end diff --git a/lib/solid_queue/fork_supervisor.rb b/lib/solid_queue/fork_supervisor.rb new file mode 100644 index 00000000..23ab15ed --- /dev/null +++ b/lib/solid_queue/fork_supervisor.rb @@ -0,0 +1,111 @@ +# frozen_string_literal: true + +module SolidQueue + class ForkSupervisor < Supervisor + private + attr_reader :forks, :configured_processes + + def start_processes + @forks = {} + @configured_processes = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :fork + end + + pid = fork do + process_instance.start + end + + configured_processes[pid] = configured_process + forks[pid] = process_instance + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + term_forks + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do + reap_terminated_forks + end + + unless all_forks_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + quit_forks + end + end + + def supervised_processes + forks.keys + end + + def term_forks + signal_processes(forks.keys, :TERM) + end + + def quit_forks + signal_processes(forks.keys, :QUIT) + end + + def reap_and_replace_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + replace_fork(pid, status) + end + end + + def reap_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) + handle_claimed_jobs_by(terminated_fork, status) + end + + configured_processes.delete(pid) + end + rescue SystemCallError + # All children already reaped + end + + def replace_fork(pid, status) + SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| + if terminated_fork = forks.delete(pid) + payload[:fork] = terminated_fork + handle_claimed_jobs_by(terminated_fork, status) + + start_process(configured_processes.delete(pid)) + end + end + end + + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. + def handle_claimed_jobs_by(terminated_fork, status) + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) + error = Processes::ProcessExitError.new(status) + registered_process.fail_all_claimed_executions_with(error) + end + end + + def all_forks_terminated? + forks.empty? + end + end +end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 33b441f6..aa5bb8f6 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -9,11 +9,7 @@ module Runnable def start boot - if running_async? - @thread = create_thread { run } - else - run - end + run end def stop diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 7d010593..5c999a84 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -13,7 +13,8 @@ def start(**options) configuration = Configuration.new(**options) if configuration.valid? - new(configuration).tap(&:start) + klass = configuration.mode.fork? ? ForkSupervisor : AsyncSupervisor + klass.new(configuration).tap(&:start) else abort configuration.errors.full_messages.join("\n") + "\nExiting..." end @@ -22,9 +23,6 @@ def start(**options) def initialize(configuration) @configuration = configuration - @forks = {} - @configured_processes = {} - super end @@ -44,7 +42,7 @@ def stop end private - attr_reader :configuration, :forks, :configured_processes + attr_reader :configuration def boot SolidQueue.instrument(:start_process, process: self) do @@ -54,10 +52,6 @@ def boot end end - def start_processes - configuration.configured_processes.each { |configured_process| start_process(configured_process) } - end - def supervise loop do break if stopped? @@ -74,45 +68,14 @@ def supervise shutdown end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :fork - end - - pid = fork do - process_instance.start - end - - configured_processes[pid] = configured_process - forks[pid] = process_instance + def start_processes + raise NotImplementedError end def set_procline procline "supervising #{supervised_processes.join(", ")}" end - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - term_forks - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do - reap_terminated_forks - end - - unless all_forks_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end - - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - quit_forks - end - end - def shutdown SolidQueue.instrument(:shutdown_process, process: self) do run_callbacks(:shutdown) do @@ -125,65 +88,8 @@ def sync_std_streams STDOUT.sync = STDERR.sync = true end - def supervised_processes - forks.keys - end - - def term_forks - signal_processes(forks.keys, :TERM) - end - - def quit_forks - signal_processes(forks.keys, :QUIT) - end - def reap_and_replace_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - replace_fork(pid, status) - end - end - - def reap_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end - - configured_processes.delete(pid) - end - rescue SystemCallError - # All children already reaped - end - - def replace_fork(pid, status) - SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| - if terminated_fork = forks.delete(pid) - payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) - - start_process(configured_processes.delete(pid)) - end - end - end - - # When a supervised fork crashes or exits we need to mark all the - # executions it had claimed as failed so that they can be retried - # by some other worker. - def handle_claimed_jobs_by(terminated_fork, status) - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) - end - end - - def all_forks_terminated? - forks.empty? + # No-op by default, implemented in ForkSupervisor end end -end +end \ No newline at end of file