Skip to content

Commit

Permalink
Add Active Job extension for Labels (bensheldon#1188)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Dec 18, 2023
1 parent 5247637 commit a181f81
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 12 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,31 @@ The Dashboard can be set to automatically refresh by checking "Live Poll" in the
Higher priority numbers run first in all versions of GoodJob v3.x and below. GoodJob v4.x will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority (see #524). To opt-in to this behavior now, set `config.good_job.smaller_number_is_higher_priority = true` in your GoodJob initializer or `application.rb`.
### Labelled jobs
Labels are the recommended way to add context or metadata to specific jobs. For example, all jobs that have a dependency on an email service could be labeled `email`. Using labels requires adding the Active Job extension `GoodJob::ActiveJobExtensions::Labels` to your job class.
```ruby
class ApplicationRecord < ActiveJob::Base
include GoodJob::ActiveJobExtensions::Labels
end
# Add a default label to every job within the class
class WelcomeJob < ApplicationRecord
self.good_job_labels = ["email"]
def perform
# Labels can be inspected from within the job
puts good_job_labels # => ["email"]
end
end
# Or add to individual jobs when enqueued
WelcomeJob.set(good_job_labels: ["email"]).perform_later
```
Labels can be used to search jobs in the Dashboard. For example, to find all jobs labeled `email`, search for `email`.
### Concurrency controls
GoodJob can extend ActiveJob to provide limits on concurrently running jobs, either at time of _enqueue_ or at _perform_. Limiting concurrency can help prevent duplicate, double or unnecessary jobs from being enqueued, or race conditions when performing, for example when interacting with 3rd-party APIs.
Expand Down
2 changes: 1 addition & 1 deletion app/models/concerns/good_job/filterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module Filterable
query = query.to_s.strip
next if query.blank?

tsvector = "(to_tsvector('english', serialized_params) || to_tsvector('english', id::text) || to_tsvector('english', COALESCE(error, '')::text))"
tsvector = "(to_tsvector('english', id::text) || to_tsvector('english', COALESCE(active_job_id::text, '')) || to_tsvector('english', serialized_params) || to_tsvector('english', COALESCE(error, ''))#{" || to_tsvector('english', COALESCE(array_to_string(labels, ' '), ''))" if labels_migrated?})"
to_tsquery_function = database_supports_websearch_to_tsquery? ? 'websearch_to_tsquery' : 'plainto_tsquery'
where("#{tsvector} @@ #{to_tsquery_function}(?)", query)
.order(sanitize_sql_for_order([Arel.sql("ts_rank(#{tsvector}, #{to_tsquery_function}(?))"), query]) => 'DESC')
Expand Down
15 changes: 15 additions & 0 deletions app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ def cron_indices_migrated?
migration_pending_warning!
false
end

def labels_migrated?
return true if columns_hash["labels"].present?

migration_pending_warning!
false
end

def labels_indices_migrated?
return true if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels)

migration_pending_warning!
false
end
end

# The ActiveJob job class, as a string
Expand Down Expand Up @@ -88,6 +102,7 @@ def active_job_data
.tap do |job_data|
job_data["provider_job_id"] = id
job_data["good_job_concurrency_key"] = concurrency_key if concurrency_key
job_data["good_job_labels"] = Array(labels) if self.class.labels_migrated? && labels.present?
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ def self.enqueue_args(active_job, overrides = {})
execution_args[:scheduled_at] = Time.zone.at(active_job.scheduled_at) if active_job.scheduled_at
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)

if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? && labels_migrated?
labels = active_job.good_job_labels.dup
labels.map! { |label| label.to_s.strip.presence }
labels.tap(&:compact!).tap(&:uniq!)
execution_args[:labels] = labels
end

reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
current_execution = CurrentThread.execution

Expand Down
15 changes: 15 additions & 0 deletions demo/db/migrate/20231216183914_create_good_job_labels.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

class CreateGoodJobLabels < ActiveRecord::Migration[7.1]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :labels)
end
end

add_column :good_jobs, :labels, :text, array: true
end
end
22 changes: 22 additions & 0 deletions demo/db/migrate/20231216183915_create_good_job_labels_index.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class CreateGoodJobLabelsIndex < ActiveRecord::Migration[7.1]
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels)
add_index :good_jobs, :labels, where: "(labels IS NOT NULL)",
using: :gin, name: :index_good_jobs_on_labels, algorithm: :concurrently
end
end

dir.down do
if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels)
remove_index :good_jobs, name: :index_good_jobs_on_labels
end
end
end
end
end
4 changes: 3 additions & 1 deletion demo/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2023_11_28_075428) do
ActiveRecord::Schema.define(version: 2023_12_16_183915) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
Expand Down Expand Up @@ -79,6 +79,7 @@
t.integer "executions_count"
t.text "job_class"
t.integer "error_event", limit: 2
t.text "labels", array: true
t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id"
t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"
Expand All @@ -87,6 +88,7 @@
t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at_cond", where: "(cron_key IS NOT NULL)"
t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at_cond", unique: true, where: "(cron_key IS NOT NULL)"
t.index ["finished_at"], name: "index_good_jobs_jobs_on_finished_at", where: "((retried_good_job_id IS NULL) AND (finished_at IS NOT NULL))"
t.index ["labels"], name: "index_good_jobs_on_labels", where: "(labels IS NOT NULL)", using: :gin
t.index ["priority", "created_at"], name: "index_good_jobs_jobs_on_priority_created_at_when_unfinished", order: { priority: "DESC NULLS LAST" }, where: "(finished_at IS NULL)"
t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)"
t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.integer :executions_count
t.text :job_class
t.integer :error_event, limit: 2
t.text :labels, array: true
end

create_table :good_job_batches, id: :uuid do |t|
Expand Down Expand Up @@ -82,6 +83,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished
add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL"
add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL"
add_index :good_jobs, :labels, using: :gin, where: "(labels IS NOT NULL)", name: :index_good_jobs_on_labels

add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

class CreateGoodJobLabels < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :labels)
end
end

add_column :good_jobs, :labels, :text, array: true
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class CreateGoodJobLabelsIndex < ActiveRecord::Migration<%= migration_version %>
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels)
add_index :good_jobs, :labels, using: :gin, where: "(labels IS NOT NULL)",
name: :index_good_jobs_on_labels, algorithm: :concurrently
end
end

dir.down do
if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_labels)
remove_index :good_jobs, name: :index_good_jobs_on_labels
end
end
end
end
end
5 changes: 3 additions & 2 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require "good_job/active_job_extensions/concurrency"
require "good_job/interrupt_error"
require "good_job/active_job_extensions/interrupt_errors"
require "good_job/active_job_extensions/labels"
require "good_job/active_job_extensions/notify_options"

require "good_job/assignable_connection"
Expand All @@ -22,7 +23,7 @@
require "good_job/cli"
require "good_job/configuration"
require "good_job/cron_manager"
require 'good_job/current_thread'
require "good_job/current_thread"
require "good_job/daemon"
require "good_job/dependencies"
require "good_job/job_performer"
Expand Down Expand Up @@ -272,7 +273,7 @@ def self.deprecator
def self.migrated?
# Always update with the most recent migration check
GoodJob::Execution.reset_column_information
GoodJob::Execution.cron_indices_migrated?
GoodJob::Execution.labels_indices_migrated?
end

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down
32 changes: 32 additions & 0 deletions lib/good_job/active_job_extensions/labels.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module GoodJob
module ActiveJobExtensions
module Labels
extend ActiveSupport::Concern

module Prepends
def initialize(*arguments)
super
self.good_job_labels = Array(self.class.good_job_labels)
end

def enqueue(options = {})
self.good_job_labels = Array(options[:good_job_labels]) if options.key?(:good_job_labels)
super
end

def deserialize(job_data)
super
self.good_job_labels = job_data.delete("good_job_labels")&.dup || []
end
end

included do
prepend Prepends
class_attribute :good_job_labels, instance_accessor: false, instance_predicate: false, default: []
attr_accessor :good_job_labels
end
end
end
end
32 changes: 25 additions & 7 deletions spec/app/models/concerns/good_job/filterable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,50 @@
require 'rails_helper'

RSpec.describe GoodJob::Filterable do
let(:model_class) { GoodJob::Execution }
let!(:execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default", serialized_params: { example_key: 'example_value' }, error: "ExampleJob::ExampleError: a message") }
let(:model_class) { GoodJob::Job }
let!(:job) do
model_class.create(
active_job_id: SecureRandom.uuid,
queue_name: "default",
serialized_params: { example_key: 'example_value' },
labels: %w[buffalo gopher],
error: "ExampleJob::ExampleError: a message"
)
end

describe '.search_test' do
it 'searches serialized params' do
expect(model_class.search_text('example_value')).to include(execution)
expect(model_class.search_text('example_value')).to include(job)
end

it 'searches record id' do
expect(model_class.search_text(execution.id)).to include(execution)
expect(model_class.search_text(job.id)).to include(job)
end

it 'searches active_job_id' do
expect(model_class.search_text(job.active_job_id)).to include(job)
end

it 'searches labels' do
expect(model_class.search_text('buffalo')).to include(job)
expect(model_class.search_text('gopher')).to include(job)
expect(model_class.search_text('hippo')).not_to include(job)
end

it 'searches errors' do
expect(model_class.search_text('ExampleError')).to include(execution)
expect(model_class.search_text('ExampleError')).to include(job)
end

it 'searches strings with colons' do
expect(model_class.search_text('ExampleJob::ExampleError')).to include(execution)
expect(model_class.search_text('ExampleJob::ExampleError')).to include(job)
end

it 'filters out non-matching records' do
expect(model_class.search_text('ghost')).to be_empty
end

it 'is chainable and reversible' do
expect(model_class.where.not(id: nil).search_text('example_value').reverse).to include(execution)
expect(model_class.where.not(id: nil).search_text('example_value').reverse).to include(job)
end
end
end
Loading

0 comments on commit a181f81

Please sign in to comment.