Skip to content

Commit

Permalink
Merge branch 'reversefold/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Oct 20, 2015
2 parents 631bdc1 + 1974371 commit 88d9d2f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,6 @@ Alexander Lebedev, 2015/04/25
Frantisek Holop, 2015/05/21
Feanil Patel, 2015/05/21
Jocelyn Delalande, 2015/06/03
Justin Patrin, 2015/08/06
Juan Rossi, 2015/08/10
Piotr Maślanka, 2015/08/24
44 changes: 25 additions & 19 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def _params_from_url(self, url, defaults):
db = db.strip('/') if isinstance(db, string_t) else db
connparams['db'] = int(db)

for key in ['socket_timeout', 'socket_connect_timeout']:
if key in query:
query[key] = float(query[key])

# Query parameters override other parameters
connparams.update(query)
return connparams
Expand Down Expand Up @@ -160,13 +164,13 @@ def set(self, key, value, **retry_policy):
return self.ensure(self._set, (key, value), **retry_policy)

def _set(self, key, value):
pipe = self.client.pipeline()
if self.expires:
pipe.setex(key, value, self.expires)
else:
pipe.set(key, value)
pipe.publish(key, value)
pipe.execute()
with self.client.pipeline() as pipe:
if self.expires:
pipe.setex(key, value, self.expires)
else:
pipe.set(key, value)
pipe.publish(key, value)
pipe.execute()

def delete(self, key):
self.client.delete(key)
Expand Down Expand Up @@ -207,13 +211,14 @@ def _new_chord_return(self, task, state, result, propagate=None):
jkey = self.get_key_for_group(gid, '.j')
tkey = self.get_key_for_group(gid, '.t')
result = self.encode_result(result, state)
_, readycount, totaldiff, _, _ = client.pipeline() \
.rpush(jkey, self.encode([1, tid, state, result])) \
.llen(jkey) \
.get(tkey) \
.expire(jkey, 86400) \
.expire(tkey, 86400) \
.execute()
with client.pipeline() as pipe:
_, readycount, totaldiff, _, _ = pipe \
.rpush(jkey, self.encode([1, tid, state, result])) \
.llen(jkey) \
.get(tkey) \
.expire(jkey, 86400) \
.expire(tkey, 86400) \
.execute()

totaldiff = int(totaldiff or 0)

Expand All @@ -222,11 +227,12 @@ def _new_chord_return(self, task, state, result, propagate=None):
total = callback['chord_size'] + totaldiff
if readycount == total:
decode, unpack = self.decode, self._unpack_chord_result
resl, _, _ = client.pipeline() \
.lrange(jkey, 0, total) \
.delete(jkey) \
.delete(tkey) \
.execute()
with client.pipeline() as pipe:
resl, _, _ = pipe \
.lrange(jkey, 0, total) \
.delete(jkey) \
.delete(tkey) \
.execute()
try:
callback.delay([unpack(tup, decode) for tup in resl])
except Exception as exc:
Expand Down
6 changes: 6 additions & 0 deletions celery/tests/backends/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def add_step(*args, **kwargs):
return self
return add_step

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
pass

def execute(self):
return [step(*a, **kw) for step, a, kw in self.steps]

Expand Down

0 comments on commit 88d9d2f

Please sign in to comment.