Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 47 additions & 19 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,40 +1,68 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_solid_queue
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end
end

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
end
private
def start_forked(launcher)
in_background do
monitor_solid_queue
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
end
end

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
def start_async(launcher)
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
launcher.events.on_stopped { solid_queue_supervisor&.stop }
launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
else
launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
launcher.events.after_stopped { solid_queue_supervisor&.stop }
launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
end
end
end

private
def stop_solid_queue
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Expand Down
98 changes: 98 additions & 0 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# frozen_string_literal: true

module SolidQueue
class AsyncSupervisor < Supervisor
private
attr_reader :threads

def start_processes
@threads = {}

configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end

def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :async
end

thread = Thread.new do
begin
process_instance.start
rescue Exception => e
puts "Error in thread: #{e.message}"
puts e.backtrace
end
end
threads[thread] = [ process_instance, configured_process ]
end

def terminate_gracefully
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
processes.each(&:stop)

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do
# No-op, we just wait
end

unless all_threads_terminated?
payload[:shutdown_timeout_exceeded] = true
terminate_immediately
end
end
end

def terminate_immediately
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
threads.keys.each(&:kill)
end
end

def supervised_processes
processes.map(&:to_s)
end

def reap_and_replace_terminated_forks
# No-op in async mode, we'll check for dead threads in the supervise loop
end

def all_threads_terminated?
threads.keys.all? { |thread| !thread.alive? }
end

def supervise
loop do
break if stopped?

set_procline
process_signal_queue

unless stopped?
check_and_replace_terminated_threads
interruptible_sleep(1.second)
end
end
ensure
shutdown
end

def check_and_replace_terminated_threads
terminated_threads = {}
threads.each do |thread, (process, configured_process)|
unless thread.alive?
terminated_threads[thread] = configured_process
end
end

terminated_threads.each do |thread, configured_process|
threads.delete(thread)
start_process(configured_process)
end
end

def processes
threads.values.map(&:first)
end
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Cli < Thor
desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).",
banner: "SOLID_QUEUE_CONFIG"

class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)"

class_option :recurring_schedule_file, type: :string,
desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
Expand Down
11 changes: 10 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ def instantiate
DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

attr_reader :mode

def initialize(**options)
@options = options.with_defaults(default_options)
@mode = @options[:mode].to_s.inquiry
end

def configured_processes
Expand Down Expand Up @@ -84,6 +87,7 @@ def ensure_correctly_sized_thread_pool

def default_options
{
mode: :fork,
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
only_work: false,
Expand All @@ -110,7 +114,12 @@ def skip_recurring_tasks?

def workers
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
1
end

processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end
Expand Down
111 changes: 111 additions & 0 deletions lib/solid_queue/fork_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# frozen_string_literal: true

module SolidQueue
class ForkSupervisor < Supervisor
private
attr_reader :forks, :configured_processes

def start_processes
@forks = {}
@configured_processes = {}

configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end

def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :fork
end

pid = fork do
process_instance.start
end

configured_processes[pid] = configured_process
forks[pid] = process_instance
end

def terminate_gracefully
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
term_forks

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
reap_terminated_forks
end

unless all_forks_terminated?
payload[:shutdown_timeout_exceeded] = true
terminate_immediately
end
end
end

def terminate_immediately
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
quit_forks
end
end

def supervised_processes
forks.keys
end

def term_forks
signal_processes(forks.keys, :TERM)
end

def quit_forks
signal_processes(forks.keys, :QUIT)
end

def reap_and_replace_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

replace_fork(pid, status)
end
end

def reap_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
handle_claimed_jobs_by(terminated_fork, status)
end

configured_processes.delete(pid)
end
rescue SystemCallError
# All children already reaped
end

def replace_fork(pid, status)
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if terminated_fork = forks.delete(pid)
payload[:fork] = terminated_fork
handle_claimed_jobs_by(terminated_fork, status)

start_process(configured_processes.delete(pid))
end
end
end

# When a supervised fork crashes or exits we need to mark all the
# executions it had claimed as failed so that they can be retried
# by some other worker.
def handle_claimed_jobs_by(terminated_fork, status)
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
error = Processes::ProcessExitError.new(status)
registered_process.fail_all_claimed_executions_with(error)
end
end

def all_forks_terminated?
forks.empty?
end
end
end
6 changes: 1 addition & 5 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ module Runnable
def start
boot

if running_async?
@thread = create_thread { run }
else
run
end
run
end

def stop
Expand Down
Loading
Loading