Skip to content

Commit

Permalink
Create get_dependencies_statuses method on Job
Browse files Browse the repository at this point in the history
This method shall be used in Queue#enqueue_dependendents to determine if all of a dependents' dependencies have been _FINISHED_.
  • Loading branch information
Thomas Matecki committed Apr 17, 2020
1 parent 0dd9ff0 commit ee215a1
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 5 deletions.
42 changes: 40 additions & 2 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_status(self, refresh=True):

def set_status(self, status, pipeline=None):
self._status = status
connection = pipeline or self.connection
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)

@property
Expand Down Expand Up @@ -405,7 +405,6 @@ def fetch_dependencies(self, watch=False, pipeline=None):

return jobs


@property
def result(self):
"""Returns the return value of the job.
Expand Down Expand Up @@ -725,4 +724,43 @@ def register_dependency(self, pipeline=None):
connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id)

def get_dependencies_statuses(
self,
watch=False,
pipeline=None
):
"""Returns a list of tuples containing the job ids and status of all
dependencies; e.g:
[('14462606-09c4-41c2-8bf1-fbd109092318', 'started'),
('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...]
As a minor optimization allowing callers to more quickly tell if all
dependencies are _FINISHED_, the returned list is sorted by the
`ended_at` timestamp, so those jobs which are not yet finished are at
the start of the list.
"""

pipe = pipeline if pipeline is not None else self.connection

if watch:
pipe.watch(self.dependencies_key)
pipe.watch(*[self.redis_job_namespace_prefix + as_text(_id)
for _id in pipe.smembers(self.dependencies_key)])

sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status'

# Sorting here lexographically works because these dates are stored in
# an ISO 8601 format, so lexographic order is the same as
# chronological order.
dependencies_statuses = [
(as_text(_id), as_text(status))
for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by,
get=['#', get_field], alpha=True, groups=True, )
]

return dependencies_statuses


_job_stack = LocalStack()
3 changes: 2 additions & 1 deletion rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ def enqueue_dependents(self, job, pipeline=None):

pipe.multi()

for dependent in dependent_jobs:
for dependent, dependents_dependencies in dependent_jobs:

registry = DeferredJobRegistry(dependent.origin,
self.connection,
job_class=self.job_class)
Expand Down
139 changes: 137 additions & 2 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import time
import queue
import zlib
from datetime import datetime
from datetime import datetime, timedelta

import pytest
from redis import WatchError

from rq.compat import PY2, as_text
Expand All @@ -17,7 +18,7 @@
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
ScheduledJobRegistry)
from rq.utils import utcformat
from rq.utils import utcformat, utcnow
from rq.worker import Worker
from tests import RQTestCase, fixtures

Expand Down Expand Up @@ -796,3 +797,137 @@ def test_fetch_dependencies_watches(self):
self.testconn.set(dependency_job.id, 'somethingelsehappened')
pipeline.touch(dependency_job.id)
pipeline.execute()

def test_get_dependencies_statuses_returns_ids_and_statuses(self):
queue = Queue(connection=self.testconn)

dependency_job_ids = [
queue.enqueue(fixtures.say_hello).id
for _ in range(5)
]

dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = dependency_job_ids
dependent_job.register_dependency()

dependencies_statuses = dependent_job.get_dependencies_statuses()

self.assertSetEqual(
set(dependencies_statuses),
{(_id, JobStatus.QUEUED) for _id in dependency_job_ids}
)

def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self):
queue = Queue(connection=self.testconn)

dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency()

dependencies_statuses = dependent_job.get_dependencies_statuses()

self.assertListEqual(
dependencies_statuses,
[]
)

def test_get_dependencies_statuses_returns_ordered_by_end_time(self):
dependency_jobs = [
Job.create(fixtures.say_hello)
for _ in range(5)
]

dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()

now = utcnow()

for i, job in enumerate(dependency_jobs):
job._status = JobStatus.FINISHED
job.ended_at = now - timedelta(seconds=i)
job.save()

dependencies_statuses = dependent_job.get_dependencies_statuses()

self.assertListEqual(
dependencies_statuses,
[(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)]
)

def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self):
dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)]

dependency_jobs[0]._status = JobStatus.FINISHED
dependency_jobs[0].ended_at = utcnow()
dependency_jobs[0].save()

dependency_jobs[1]._status = JobStatus.STARTED
dependency_jobs[1].ended_at = None
dependency_jobs[1].save()

dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()

now = utcnow()

dependencies_statuses = dependent_job.get_dependencies_statuses()

self.assertEqual(
dependencies_statuses[0],
(dependency_jobs[1].id, JobStatus.STARTED)
)

self.assertEqual(
dependencies_statuses[1],
(dependency_jobs[0].id, JobStatus.FINISHED)
)

def test_get_dependencies_statuses_watches_job(self):
queue = Queue(connection=self.testconn)

dependency_job = queue.enqueue(fixtures.say_hello)

dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [dependency_job.id]
dependent_job.register_dependency()

with self.testconn.pipeline() as pipeline:

dependent_job.get_dependencies_statuses(
pipeline=pipeline,
watch=True
)

dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)
pipeline.multi()

with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()

def test_get_dependencies_statuses_watches_dependency_set(self):
queue = Queue(connection=self.testconn)

dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [dependency_job.id]
dependent_job.register_dependency()

with self.testconn.pipeline() as pipeline:

dependent_job.get_dependencies_statuses(
pipeline=pipeline,
watch=True
)

self.testconn.sadd(
dependent_job.dependencies_key,
queue.enqueue(fixtures.say_hello).id,
)

pipeline.multi()

with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()

0 comments on commit ee215a1

Please sign in to comment.