Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
olttwa committed Oct 12, 2023
1 parent efe27cd commit 5ebfb6a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
18 changes: 9 additions & 9 deletions src/goose/brokers/redis/batch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[goose.retry]
[goose.utils :as u]

[taoensso.carmine :as car]))
[taoensso.carmine :as car]
[clojure.tools.logging :as log]))

(defn batch-keys [id]
{:batch-hash (d/prefix-batch id)
Expand Down Expand Up @@ -73,7 +74,7 @@
[redis-conn
{:keys [id ready-queue linger-sec] :as batch}
completion-status]
(let [{:keys [batch-hash enqueued-set retrying-set success-set dead-set]} (batch-keys id)
(let [{:keys [batch-hash success-set dead-set]} (batch-keys id)
callback (batch/new-callback-job batch id completion-status)]
(redis-cmds/atomic
redis-conn
Expand All @@ -85,18 +86,17 @@
;; Terminal job execution marks completion of a batch, NOT callback execution.
;; Clean-up batch after enqueuing callback.
(car/expire batch-hash linger-sec "NX")
(car/expire enqueued-set linger-sec "NX")
(car/expire retrying-set linger-sec "NX")
(car/expire success-set linger-sec "NX")
(car/expire dead-set linger-sec "NX"))))

(defn- mark-batch-completion
[{:keys [redis-conn] :as opts} job batch-id completion-status]
;; If a job is executed after a batch has been deleted,
;; `when-let` guards against nil return from `get-batch`.
(when-let [batch (get-batch redis-conn batch-id)]
(enqueue-callback-and-cleanup-batch redis-conn batch completion-status)
(record-metrics opts job batch completion-status)))
;; If a batch-job is executed after a batch has been deleted,
;; `if-let` guards against nil return from `get-batch`.
(if-let [batch (get-batch redis-conn batch-id)]
(do (enqueue-callback-and-cleanup-batch redis-conn batch completion-status)
(record-metrics opts job batch completion-status))
(log/warnf "Job executed after batch-id: %s was deleted." batch-id)))

(defn- job-source-set
[job {:keys [enqueued-set retrying-set]}]
Expand Down
21 changes: 16 additions & 5 deletions test/goose/brokers/redis/integration_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns goose.brokers.redis.integration-test
(:require
[goose.api.batch :as batch-api]
[goose.batch :as batch]
[goose.brokers.redis.batch :as redis-batch]
[goose.brokers.redis.commands :as redis-cmds]
[goose.brokers.redis.consumer :as redis-consumer]
[goose.client :as c]
Expand All @@ -12,7 +12,8 @@
[goose.utils :as u]
[goose.worker :as w]

[clojure.test :refer [deftest is testing use-fixtures]])
[clojure.test :refer [deftest is testing use-fixtures]]
[taoensso.carmine :as car])
(:import
[clojure.lang ExceptionInfo]
[java.time Instant]
Expand Down Expand Up @@ -207,6 +208,14 @@
(def callback-fn-executed (atom (promise)))
(defn batch-callback [id status]
(deliver @callback-fn-executed {:id id :status status}))
(defmacro assert-batch-expiration [id]
`(let [foo# (redis-batch/batch-keys ~id)]
(is (= -2 (redis-cmds/wcar* tu/redis-conn (car/ttl (:enqueued-set foo#)))))
(is (= -2 (redis-cmds/wcar* tu/redis-conn (car/ttl (:retrying-set foo#)))))

(is (not= -1 (redis-cmds/wcar* tu/redis-conn (car/ttl (:batch-hash foo#)))))
(is (not= -1 (redis-cmds/wcar* tu/redis-conn (car/ttl (:success-set foo#)))))
(is (not= -1 (redis-cmds/wcar* tu/redis-conn (car/ttl (:dead-set foo#)))))))

(deftest perform-batch-test
(let [shared-args (-> []
Expand All @@ -225,10 +234,8 @@
(is (uuid? (UUID/fromString batch-id)))
(is (= (deref @callback-fn-executed 400 :n-jobs-batch-callback-timed-out)
{:id batch-id :status batch/status-success}))
(is (not-empty (batch-api/status tu/redis-producer batch-id)))
(u/sleep linger-sec)
(is (empty? (batch-api/status tu/redis-producer batch-id)))
(is (= (reduce + n-args) @n-jobs-batch-args-sum))
(assert-batch-expiration batch-id)
(w/stop worker)))

(testing "[redis][batch-jobs] Enqueued -> Retrying -> Success"
Expand All @@ -240,6 +247,7 @@
(is (= (deref @callback-fn-executed 2100 :fail-pass-batch-callback-timed-out)
{:id batch-id :status batch/status-success}))
(is (= 4 @batch-fail-pass-count))
(assert-batch-expiration batch-id)
(w/stop worker)))

(testing "[redis][batch-jobs] Enqueued -> Retrying -> Dead"
Expand All @@ -252,6 +260,7 @@
(is (= (deref @callback-fn-executed 2100 :dead-batch-callback-timed-out)
{:id batch-id :status batch/status-dead}))
(is (= 4 @dead-job-run-count))
(assert-batch-expiration batch-id)
(w/stop worker)))

(testing "[redis][batch-jobs] Enqueued -> Dead"
Expand All @@ -263,6 +272,7 @@
(is (= (deref @callback-fn-executed 400 :dead-batch-callback-timed-out)
{:id batch-id :status batch/status-dead}))
(is (= 2 @dead-job-run-count))
(assert-batch-expiration batch-id)
(w/stop worker)))

(testing "[redis][batch-jobs] Enqueued -> Success/Dead -> Partial Success"
Expand All @@ -272,4 +282,5 @@
worker (w/start tu/redis-worker-opts)]
(is (= (deref @callback-fn-executed 400 :partial-success-batch-callback-timed-out)
{:id batch-id :status batch/status-partial-success}))
(assert-batch-expiration batch-id)
(w/stop worker)))))

0 comments on commit 5ebfb6a

Please sign in to comment.