Skip to content

Commit

Permalink
Job compression (rq#907)
Browse files Browse the repository at this point in the history
job.exc_info and job.data is now stored in compressed format in Redis.

* job.data is now stored in compressed format.
  • Loading branch information
selwin authored Nov 23, 2017
1 parent 44a0a7b commit f500186
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 20 deletions.
23 changes: 19 additions & 4 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import inspect
import warnings
import zlib
from functools import partial
from uuid import uuid4

Expand Down Expand Up @@ -416,25 +417,39 @@ def to_date(date_str):
return utcparse(as_text(date_str))

try:
self.data = obj['data']
raw_data = obj['data']
except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj))

try:
self.data = zlib.decompress(raw_data)
except zlib.error:
# Fallback to uncompressed string
self.data = raw_data

self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = as_text(obj.get('exc_info'))
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}

raw_exc_info = obj.get('exc_info')
if raw_exc_info:
try:
self.exc_info = as_text(zlib.decompress(raw_exc_info))
except zlib.error:
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)


def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
Expand All @@ -444,7 +459,7 @@ def to_dict(self, include_meta=True):
"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
obj['data'] = self.data
obj['data'] = zlib.compress(self.data)

if self.origin is not None:
obj['origin'] = self.origin
Expand All @@ -462,7 +477,7 @@ def to_dict(self, include_meta=True):
except:
obj['result'] = 'Unpickleable return value'
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
Expand Down
61 changes: 53 additions & 8 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
unicode_literals)

from datetime import datetime
import time

import time
import sys
import zlib

is_py2 = sys.version[0] == '2'
if is_py2:
Expand All @@ -15,7 +16,7 @@

from tests import fixtures, RQTestCase

from rq.compat import PY2
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job
from rq.queue import Queue, get_failed_queue
Expand Down Expand Up @@ -160,7 +161,7 @@ def test_save(self): # noqa
self.assertEqual(self.testconn.type(job.key), b'hash')

# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data')))
self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')

def test_fetch(self):
Expand Down Expand Up @@ -236,7 +237,8 @@ def test_fetching_can_fail(self):
def test_fetching_unreadable_data(self):
"""Fetching succeeds on unreadable data, but lazy props fail."""
# Set up
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4),
kwargs=dict(z=2))
job.save()

# Just replace the data hkey with some random noise
Expand All @@ -255,14 +257,57 @@ def test_job_is_unimportable(self):
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace(b'say_hello', b'nay_hello')
self.testconn.hset(job.key, 'data', unimportable_data)
job_data = job.data
unimportable_data = job_data.replace(b'say_hello', b'nay_hello')

self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data))

job.refresh()
with self.assertRaises(AttributeError):
job.func # accessing the func property should fail

def test_compressed_exc_info_handling(self):
"""Jobs handle both compressed and uncompressed exc_info"""
exception_string = 'Some exception'

job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.exc_info = exception_string
job.save()

# exc_info is stored in compressed format
exc_info = self.testconn.hget(job.key, 'exc_info')
self.assertEqual(
as_text(zlib.decompress(exc_info)),
exception_string
)

job.refresh()
self.assertEqual(job.exc_info, exception_string)

# Uncompressed exc_info is also handled
self.testconn.hset(job.key, 'exc_info', exception_string)

job.refresh()
self.assertEqual(job.exc_info, exception_string)

def test_compressed_job_data_handling(self):
"""Jobs handle both compressed and uncompressed data"""

job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()

# Job data is stored in compressed format
job_data = job.data
self.assertEqual(
zlib.compress(job_data),
self.testconn.hget(job.key, 'data')
)

self.testconn.hset(job.key, 'data', job_data)
job.refresh()
self.assertEqual(job.data, job_data)


def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
Expand Down Expand Up @@ -457,7 +502,7 @@ def test_create_job_with_ttl_should_expire(self):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
time.sleep(1)
time.sleep(1.1)
self.assertEqual(0, len(queue.get_jobs()))

def test_create_and_cancel_job(self):
Expand Down
20 changes: 12 additions & 8 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

import os
import shutil
from datetime import datetime, timedelta
from time import sleep
import signal
import time
from multiprocessing import Process
import subprocess
import sys
import time
import zlib

from datetime import datetime, timedelta
from multiprocessing import Process
from time import sleep

from unittest import skipIf

import pytest
Expand Down Expand Up @@ -180,10 +183,11 @@ def test_work_is_unreadable(self):
# importable from the worker process.
job = Job.create(func=div_by_zero, args=(3,))
job.save()
data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace(b'div_by_zero', b'nonexisting')
assert data != invalid_data
self.testconn.hset(job.key, 'data', invalid_data)

job_data = job.data
invalid_data = job_data.replace(b'div_by_zero', b'nonexisting')
assert job_data != invalid_data
self.testconn.hset(job.key, 'data', zlib.compress(invalid_data))

# We use the low-level internal function to enqueue any data (bypassing
# validity checks)
Expand Down

0 comments on commit f500186

Please sign in to comment.