Skip to content

Commit

Permalink
Job retry feature. Docs WIP (rq#1299)
Browse files Browse the repository at this point in the history
* Initial implementation of Retry class

* Fixes job.refresh() under Python 3.5

* Remove the use of text_type in job.py

* Retry can be scheduled

* monitor_work_horse() should call handle_job_failure() with queue argument.

* Flake8 fixes

* Added docs for job retries
  • Loading branch information
selwin authored Jul 23, 2020
1 parent 8a0d9f9 commit 49b156e
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 59 deletions.
6 changes: 3 additions & 3 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ navigation:
url: /docs/results/
- text: Jobs
url: /docs/jobs/
- text: Exceptions
url: /docs/exceptions/
- text: Scheduling Jobs
url: /docs/scheduling/
- text: Monitoring
url: /docs/monitoring/
- text: Job Registries
url: /docs/job_registries/
- text: Connections
url: /docs/connections/
- text: Exceptions
url: /docs/exceptions/
url: /docs/connections/
- text: Testing
url: /docs/testing/
- text: Patterns
Expand Down
57 changes: 52 additions & 5 deletions docs/docs/exceptions.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,64 @@
---
title: "RQ: Exceptions"
title: "RQ: Exceptions & Retries"
layout: docs
---

Jobs can fail due to exceptions occurring. When your RQ workers run in the
Jobs can fail due to exceptions occurring. When your RQ workers run in the
background, how do you get notified of these exceptions?

## Default: the `FailedJobRegistry`
## Default: FailedJobRegistry

The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't
execute successfully is stored here, along with its exception information (type,
value, traceback). While this makes sure no failing jobs "get lost", this is
of no use to get notified pro-actively about job failure.
value, traceback).

```python
from redis import Redis
from rq import Queue
from rq.registry import FailedJobRegistry

redis = Redis()
queue = Queue(connection=redis)
registry = FailedJobRegistry(queue=queue)

# Show all failed job IDs and the exceptions they caused during runtime
for job_id in registry.get_job_ids():
job = Job.fetch(job_id, connection=redis)
print(job_id, job.exc_info)
```

## Retrying Failed Jobs

_New in version 1.5.0_

RQ lets you easily retry failed jobs. To configure retries, use RQ's
`Retry` object that accepts `max` and `interval` arguments. For example:

```python
from redis import Redis
from rq import Retry, Queue

from somewhere import my_func

queue = Queue(connection=redis)
# Retry up to 3 times, failed job will be requeued immediately
queue.enqueue(my_func, retry=Retry(max=3))

# Retry up to 3 times, with 60 seconds interval in between executions
queue.enqueue(my_func, retry=Retry(max=3, interval=60))

# Retry up to 3 times, with longer interval in between retries
queue.enqueue(my_func, retry=Retry(max=3, interval=[10, 30, 60]))
```

<div class="warning">
<img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
<strong>Note:</strong>
<p>
If you use `interval` argument with `Retry`, don't forget to run your workers using
the `--with-scheduler` argument.
</p>
</div>


## Custom Exception Handlers
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RQ to have job scheduling capabilities without:
2. Worrying about a separate `Scheduler` class.


# Scheduling Jobs for Execution
## Scheduling Jobs for Execution

There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`.

Expand Down Expand Up @@ -76,7 +76,7 @@ registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry
```

# Running the Scheduler
## Running the Scheduler

If you use RQ's scheduling features, you need to run RQ workers with the
scheduler component enabled.
Expand Down
2 changes: 1 addition & 1 deletion rq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .connections import (Connection, get_current_connection, pop_connection,
push_connection, use_connection)
from .job import cancel_job, get_current_job, requeue_job
from .job import cancel_job, get_current_job, requeue_job, Retry
from .queue import Queue
from .version import VERSION
from .worker import SimpleWorker, Worker
Expand Down
66 changes: 56 additions & 10 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
unicode_literals)

import inspect
import json
import pickle
import warnings
import zlib

from collections.abc import Iterable
from distutils.version import StrictVersion
from functools import partial
from uuid import uuid4

from rq.compat import as_text, decode_redis_hash, string_types, text_type
from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
Expand Down Expand Up @@ -342,9 +344,12 @@ def __init__(self, id=None, connection=None, serializer=None):
self.failure_ttl = None
self.ttl = None
self._status = None
self._dependency_ids = []
self._dependency_ids = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
# retry_intervals is a list of int e.g [60, 120, 240]
self.retry_intervals = None
self.redis_server_version = None

def __repr__(self): # noqa # pragma: no cover
Expand All @@ -370,7 +375,7 @@ def get_id(self): # noqa
first time the ID is requested.
"""
if self._id is None:
self._id = text_type(uuid4())
self._id = str(uuid4())
return self._id

def set_id(self, value):
Expand Down Expand Up @@ -481,13 +486,17 @@ def restore(self, raw_data):
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status')) if obj.get('status') else None
self._status = obj.get('status').decode() if obj.get('status') else None

dependency_id = obj.get('dependency_id', None)
self._dependency_ids = [as_text(dependency_id)] if dependency_id else []

self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}

self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None
if obj.get('retry_intervals'):
self.retry_intervals = json.loads(obj.get('retry_intervals').decode())

raw_exc_info = obj.get('exc_info')
if raw_exc_info:
Expand Down Expand Up @@ -516,19 +525,24 @@ def to_dict(self, include_meta=True):
You can exclude serializing the `meta` dictionary by setting
`include_meta=False`.
"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
obj['data'] = zlib.compress(self.data)

obj = {
'created_at': utcformat(self.created_at or utcnow()),
'data': zlib.compress(self.data),
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
}

if self.retries_left is not None:
obj['retries_left'] = self.retries_left
if self.retry_intervals is not None:
obj['retry_intervals'] = json.dumps(self.retry_intervals)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at)

obj['started_at'] = utcformat(self.started_at) if self.started_at else ''
obj['ended_at'] = utcformat(self.ended_at) if self.ended_at else ''
if self._result is not None:
try:
obj['result'] = self.serializer.dumps(self._result)
Expand Down Expand Up @@ -732,6 +746,17 @@ def failed_job_registry(self):
from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)

def get_retry_interval(self):
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first
value in the list will be used multiple times.
"""
if self.retry_intervals is None:
return 0
number_of_intervals = len(self.retry_intervals)
index = max(number_of_intervals - self.retries_left, 0)
return self.retry_intervals[index]

def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they
Expand Down Expand Up @@ -800,3 +825,24 @@ def dependencies_are_met(self, exclude_job_id=None, pipeline=None):
)

_job_stack = LocalStack()


class Retry(object):
def __init__(self, max, interval=0):
"""`interval` can be a positive number or a list of ints"""
super().__init__()
if max < 1:
raise ValueError('max: please enter a value greater than 0')

if isinstance(interval, int):
if interval < 0:
raise ValueError('interval: negative numbers are not allowed')
intervals = [interval]
elif isinstance(interval, Iterable):
for i in interval:
if i < 0:
raise ValueError('interval: negative numbers are not allowed')
intervals = interval

self.max = max
self.intervals = intervals
38 changes: 25 additions & 13 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def push_job_id(self, job_id, pipeline=None, at_front=False):
def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED):
meta=None, status=JobStatus.QUEUED, retry=None):
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout)

Expand All @@ -306,12 +306,16 @@ def create_job(self, func, args=None, kwargs=None, timeout=None,
origin=self.name, meta=meta, serializer=self.serializer
)

if retry:
job.retries_left = retry.max
job.retry_intervals = retry.intervals

return job

def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
at_front=False, meta=None):
at_front=False, meta=None, retry=None):
"""Creates a job to represent the delayed function call and enqueues
it.
nd
Expand All @@ -324,6 +328,7 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, depends_on=depends_on,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
retry=retry
)

# If a _dependent_ job depends on any unfinished job, register all the
Expand Down Expand Up @@ -397,47 +402,54 @@ def parse_args(cls, f, *args, **kwargs):
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
meta = kwargs.pop('meta', None)
retry = kwargs.pop('retry', None)

if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)

return (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs)
depends_on, job_id, at_front, meta, retry, args, kwargs)

def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it."""

(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)

return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta
at_front=at_front, meta=meta, retry=retry
)

def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry

(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta)

return self.schedule_job(job, datetime)

def schedule_job(self, job, datetime, pipeline=None):
"""Puts job on ScheduledJobRegistry"""
from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
# Add Queue key set
pipeline.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute()

pipe = pipeline if pipeline is not None else self.connection.pipeline()

# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipe)
registry.schedule(job, datetime, pipeline=pipe)
if pipeline is None:
pipe.execute()
return job

def enqueue_in(self, time_delta, func, *args, **kwargs):
Expand Down
9 changes: 4 additions & 5 deletions rq/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, queues, connection, interval=1):
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None

@property
def connection(self):
if self._connection:
Expand Down Expand Up @@ -87,8 +87,7 @@ def acquire_locks(self, auto_start=False):

# Always reset _scheduled_job_registries when acquiring locks
self._scheduled_job_registries = []
self._acquired_locks = self._acquired_locks.union(successful_locks)

self._acquired_locks = self._acquired_locks.union(successful_locks)
self.lock_acquisition_time = datetime.now()

# If auto_start is requested and scheduler is not started,
Expand All @@ -110,7 +109,7 @@ def prepare_registries(self, queue_names=None):
)

@classmethod
def get_locking_key(self, name):
def get_locking_key(cls, name):
"""Returns scheduler key for a given queue name"""
return SCHEDULER_LOCKING_KEY_TEMPLATE % name

Expand Down Expand Up @@ -172,7 +171,7 @@ def stop(self):
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
self._status = self.Status.STOPPED

def start(self):
self._status = self.Status.STARTED
# Redis instance can't be pickled across processes so we need to
Expand Down
2 changes: 1 addition & 1 deletion rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def str_to_date(date_str):
if not date_str:
return
else:
return utcparse(as_text(date_str))
return utcparse(date_str.decode())


def parse_timeout(timeout):
Expand Down
Loading

0 comments on commit 49b156e

Please sign in to comment.