Skip to content

Commit

Permalink
Simplify FailedQueue.quarantine and ensure that a deferred job's stat…
Browse files Browse the repository at this point in the history
…us is set to Queued when enqueued.
  • Loading branch information
selwin committed Jan 23, 2015
1 parent 7fd2ac8 commit 9320496
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,25 @@ def enqueue(self, f, *args, **kwargs):
description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front)

def enqueue_job(self, job, set_meta_data=True, at_front=False):
def enqueue_job(self, job, at_front=False):
"""Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately.
"""
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)

with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)

if set_meta_data:
job.origin = self.name
job.enqueued_at = utcnow()

if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save()
if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipeline)

pipeline.execute()

if self._async:
self.push_job_id(job.id, at_front=at_front)
Expand Down Expand Up @@ -401,14 +402,20 @@ def __init__(self, connection=None):
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
queue).
This is different from normal job enqueueing, since certain meta data
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
data must be inserted (`ended_at` and `exc_info`).
"""
job.ended_at = utcnow()
job.exc_info = exc_info
return self.enqueue_job(job, set_meta_data=False)

with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)

job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline)

self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()

return job

def requeue(self, job_id):
"""Requeues the job with the given job ID."""
Expand Down

0 comments on commit 9320496

Please sign in to comment.