forked from bensheldon/good_job
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch.rb
156 lines (130 loc) · 3.99 KB
/
batch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# frozen_string_literal: true
module GoodJob
# NOTE: This class delegates to {GoodJob::BatchRecord} and is intended to be the public interface for Batches.
class Batch
include GlobalID::Identification
thread_cattr_accessor :current_batch_id
thread_cattr_accessor :current_batch_callback_id
PROTECTED_PROPERTIES = %i[
on_finish
on_success
on_discard
callback_queue_name
callback_priority
description
properties
].freeze
delegate(
:id,
:created_at,
:updated_at,
:persisted?,
:enqueued_at,
:finished_at,
:discarded_at,
:enqueued?,
:finished?,
:succeeded?,
:discarded?,
:description,
:description=,
:on_finish,
:on_finish=,
:on_success,
:on_success=,
:on_discard,
:on_discard=,
:callback_queue_name,
:callback_queue_name=,
:callback_priority,
:callback_priority=,
:properties,
:properties=,
:save,
:reload,
to: :record
)
# Create a new batch and enqueue it
# @param properties [Hash] Additional properties to be stored on the batch
# @param block [Proc] Enqueue jobs within the block to add them to the batch
# @return [GoodJob::BatchRecord]
def self.enqueue(active_jobs = [], **properties, &block)
new.tap do |batch|
batch.enqueue(active_jobs, **properties, &block)
end
end
def self.primary_key
:id
end
def self.find(id)
new _record: BatchRecord.find(id)
end
# Helper method to enqueue jobs and assign them to a batch
def self.within_thread(batch_id: nil, batch_callback_id: nil)
original_batch_id = current_batch_id
original_batch_callback_id = current_batch_callback_id
self.current_batch_id = batch_id
self.current_batch_callback_id = batch_callback_id
yield
ensure
self.current_batch_id = original_batch_id
self.current_batch_callback_id = original_batch_callback_id
end
def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName
self.record = _record || BatchRecord.new
assign_properties(properties)
end
# @return [Array<ActiveJob::Base>] Active jobs added to the batch
def enqueue(active_jobs = [], **properties, &block)
assign_properties(properties)
if record.new_record?
record.save!
else
record.with_advisory_lock(function: "pg_advisory_lock") do
record.enqueued_at_will_change!
record.finished_at_will_change!
record.update!(enqueued_at: nil, finished_at: nil)
end
end
active_jobs = add(active_jobs, &block)
Rails.application.executor.wrap do
record.with_advisory_lock(function: "pg_advisory_lock") do
record.update!(enqueued_at: Time.current)
# During inline execution, this could enqueue and execute further jobs
record._continue_discard_or_finish(lock: false)
end
end
active_jobs
end
# Enqueue jobs and add them to the batch
# @param block [Proc] Enqueue jobs within the block to add them to the batch
# @return [Array<ActiveJob::Base>] Active jobs added to the batch
def add(active_jobs = nil, &block)
record.save if record.new_record?
buffer = Bulk::Buffer.new
buffer.add(active_jobs)
buffer.capture(&block) if block
self.class.within_thread(batch_id: id) do
buffer.enqueue
end
buffer.active_jobs
end
def active_jobs
record.jobs.map(&:active_job)
end
def callback_active_jobs
record.callback_jobs.map(&:active_job)
end
def assign_properties(properties)
properties = properties.dup
batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact
record.assign_attributes(batch_attrs)
self.properties.merge!(properties)
end
def _record
record
end
private
attr_accessor :record
end
end