Skip to content

Commit

Permalink
Safe fetching a deleted job removes the deleted job from queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
selwin committed Feb 16, 2013
1 parent 501a387 commit c987569
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
10 changes: 8 additions & 2 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def safe_fetch(job_id):
try:
job = Job.safe_fetch(job_id, connection=self.connection)
except NoSuchJobError:
self.remove(job_id)
return None
except UnpickleError:
return None
Expand All @@ -88,6 +89,11 @@ def count(self):
"""Returns a count of all messages in the queue."""
return self.connection.llen(self.key)

def remove(self, job_or_id):
"""Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, Job) else job_or_id
return self.connection._lrem(self.key, 0, job_id)

def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
Expand Down Expand Up @@ -316,11 +322,11 @@ def requeue(self, job_id):
job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing)
self.connection._lrem(self.key, 0, job_id)
self.remove(job_id)
return

# Delete it from the failed queue (raise an error if that failed)
if self.connection._lrem(self.key, 0, job.id) == 0:
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')

job.status = Status.QUEUED
Expand Down
26 changes: 26 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,32 @@ def test_queue_is_empty(self):
self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.is_empty(), False)

def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
q = Queue('example')
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job)
self.assertNotIn(job.id, q.job_ids)

job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job.id)
self.assertNotIn(job.id, q.job_ids)

def test_jobs(self):
"""Getting jobs out of a queue."""
q = Queue('example')
self.assertEqual(q.jobs, [])
job = q.enqueue(say_hello)
self.assertEqual(q.jobs, [job])

# Fetching a deleted removes it from queue
job.delete()
self.assertEqual(q.job_ids, [job.id])
q.jobs
self.assertEqual(q.job_ids, [])

def test_compact(self):
"""Compacting queueus."""
q = Queue()
Expand Down

0 comments on commit c987569

Please sign in to comment.