-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added the ability to fetch workers by queue (rq#911)
* job.exc_info is now compressed. * job.data is now stored in compressed format. * Added worker_registration.unregister. * Added worker_registration.get_keys(). * Modified Worker.all(), Worker.all_keys() and Worker.count() to accept "connection" and "queue" arguments.
- Loading branch information
Showing
4 changed files
with
166 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from .compat import as_text | ||
|
||
|
||
WORKERS_BY_QUEUE_KEY = 'rq:workers:%s' | ||
REDIS_WORKER_KEYS = 'rq:workers' | ||
|
||
|
||
def register(worker, pipeline=None): | ||
"""Store worker key in Redis so we can easily discover active workers.""" | ||
connection = pipeline if pipeline is not None else worker.connection | ||
connection.sadd(worker.redis_workers_keys, worker.key) | ||
for name in worker.queue_names(): | ||
redis_key = WORKERS_BY_QUEUE_KEY % name | ||
connection.sadd(redis_key, worker.key) | ||
|
||
|
||
def unregister(worker, pipeline=None): | ||
"""Remove worker key from Redis.""" | ||
if pipeline is None: | ||
connection = worker.connection._pipeline() | ||
else: | ||
connection = pipeline | ||
|
||
connection.srem(worker.redis_workers_keys, worker.key) | ||
for name in worker.queue_names(): | ||
redis_key = WORKERS_BY_QUEUE_KEY % name | ||
connection.srem(redis_key, worker.key) | ||
|
||
if pipeline is None: | ||
connection.execute() | ||
|
||
|
||
def get_keys(queue=None, connection=None): | ||
"""Returnes a list of worker keys for a queue""" | ||
if queue is None and connection is None: | ||
raise ValueError('"queue" or "connection" argument is required') | ||
|
||
if queue: | ||
redis = queue.connection | ||
redis_key = WORKERS_BY_QUEUE_KEY % queue.name | ||
else: | ||
redis = connection | ||
redis_key = REDIS_WORKER_KEYS | ||
|
||
return {as_text(key) for key in redis.smembers(redis_key)} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from tests import RQTestCase | ||
|
||
from rq import Queue, Worker | ||
from rq.worker_registration import (get_keys, register, unregister, | ||
WORKERS_BY_QUEUE_KEY) | ||
|
||
|
||
class TestWorkerRegistry(RQTestCase): | ||
|
||
def test_worker_registration(self): | ||
"""Ensure worker.key is correctly set in Redis.""" | ||
foo_queue = Queue(name='foo') | ||
bar_queue = Queue(name='bar') | ||
worker = Worker([foo_queue, bar_queue]) | ||
|
||
register(worker) | ||
redis = worker.connection | ||
|
||
self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) | ||
self.assertTrue( | ||
redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) | ||
) | ||
self.assertTrue( | ||
redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) | ||
) | ||
|
||
unregister(worker) | ||
self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) | ||
self.assertFalse( | ||
redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) | ||
) | ||
self.assertFalse( | ||
redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) | ||
) | ||
|
||
def test_get_keys_by_queue(self): | ||
"""get_keys_by_queue only returns active workers for that queue""" | ||
foo_queue = Queue(name='foo') | ||
bar_queue = Queue(name='bar') | ||
baz_queue = Queue(name='baz') | ||
|
||
worker1 = Worker([foo_queue, bar_queue]) | ||
worker2 = Worker([foo_queue]) | ||
worker3 = Worker([baz_queue]) | ||
|
||
self.assertEqual(set(), get_keys(foo_queue)) | ||
|
||
register(worker1) | ||
register(worker2) | ||
register(worker3) | ||
|
||
# get_keys(queue) will return worker keys for that queue | ||
self.assertEqual( | ||
set([worker1.key, worker2.key]), | ||
get_keys(foo_queue) | ||
) | ||
self.assertEqual(set([worker1.key]), get_keys(bar_queue)) | ||
|
||
# get_keys(connection=connection) will return all worker keys | ||
self.assertEqual( | ||
set([worker1.key, worker2.key, worker3.key]), | ||
get_keys(connection=worker1.connection) | ||
) | ||
|
||
# Calling get_keys without arguments raises an exception | ||
self.assertRaises(ValueError, get_keys) | ||
|
||
unregister(worker1) | ||
unregister(worker2) | ||
unregister(worker3) |