Skip to content

Commit

Permalink
Switch to multiprocessing instead of threading.
Browse files Browse the repository at this point in the history
  • Loading branch information
arikfr committed Apr 27, 2014
1 parent e8aba6b commit 08d6a90
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 27 deletions.
45 changes: 26 additions & 19 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
CLI to manage redash.
"""
import atfork
import signal

atfork.monkeypatch_os_fork_functions()
import atfork.stdlib_fixer
atfork.stdlib_fixer.fix_logging_module()
Expand All @@ -28,26 +30,31 @@ def version():
def runworkers():
"""Starts the re:dash query executors/workers."""

try:
old_workers = data_manager.redis_connection.smembers('workers')
data_manager.redis_connection.delete('workers')

logging.info("Cleaning old workers: %s", old_workers)

data_manager.start_workers(settings.WORKERS_COUNT)
logging.info("Workers started.")

while True:
try:
data_manager.refresh_queries()
data_manager.report_status()
except Exception as e:
logging.error("Something went wrong with refreshing queries...")
logging.exception(e)
time.sleep(60)
except KeyboardInterrupt:
logging.warning("Exiting; waiting for threads")
def stop_handler(signum, frame):
logging.warning("Exiting; waiting for workers")
data_manager.stop_workers()
exit()

signal.signal(signal.SIGTERM, stop_handler)
signal.signal(signal.SIGINT, stop_handler)

old_workers = data_manager.redis_connection.smembers('workers')
data_manager.redis_connection.delete('workers')

logging.info("Cleaning old workers: %s", old_workers)

data_manager.start_workers(settings.WORKERS_COUNT)
logging.info("Workers started.")

while True:
try:
data_manager.refresh_queries()
data_manager.report_status()
except Exception as e:
logging.error("Something went wrong with refreshing queries...")
logging.exception(e)
time.sleep(60)


@manager.shell
def make_shell_context():
Expand Down
5 changes: 4 additions & 1 deletion redash/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,17 @@ def start_workers(self, workers_count):
redis_connection_params = self.redis_connection.connection_pool.connection_kwargs
self.workers = [worker.Worker(worker_id, self, redis_connection_params)
for worker_id in xrange(workers_count)]

for w in self.workers:
w.start()

return self.workers

def stop_workers(self):
for w in self.workers:
w.continue_working = False
w.terminate()

for w in self.workers:
w.join()

def _save_status(self):
Expand Down
38 changes: 31 additions & 7 deletions redash/data/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
"""
import json
import logging
import multiprocessing
import os
import threading
import uuid
import datetime
import time
Expand Down Expand Up @@ -209,31 +209,30 @@ def __str__(self):
return "<Job:%s,priority:%d,status:%d>" % (self.id, self.priority, self.status)


class Worker(threading.Thread):
class Worker(multiprocessing.Process):
def __init__(self, worker_id, manager, redis_connection_params, sleep_time=0.1):
self.manager = manager

self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT,
prefix=settings.STATSD_PREFIX)

self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems()
if k in ('host', 'db', 'password', 'port')}

self.worker_id = None
self.continue_working = True
self.sleep_time = sleep_time
self.child_pid = None
self.worker_id = worker_id
self.current_job_id = None
self.status = {
'id': self.worker_id,
'jobs_count': 0,
'cancelled_jobs_count': 0,
'done_jobs_count': 0,
'updated_at': time.time(),
'started_at': time.time()
}
self._save_status()
self.manager.redis_connection.sadd('workers', self._key)

super(Worker, self).__init__(name="Worker-%s" % self.worker_id)
super(Worker, self).__init__(name="Worker")

def set_title(self, title=None):
base_title = "redash worker:%s" % self.worker_id
Expand All @@ -245,7 +244,28 @@ def set_title(self, title=None):
setproctitle.setproctitle(full_title)

def run(self):
self.worker_id = os.getpid()
self.status['id'] = self.worker_id
self.name = "Worker:%d" % self.worker_id
self.manager.redis_connection.sadd('workers', self._key)
self._save_status()
self.set_title()

logging.info("[%s] started.", self.name)

signal.signal(signal.SIGINT, self._stop)
signal.signal(signal.SIGTERM, self._stop)

self._wait_for_jobs()

def _stop(self, signum, frame):
self.continue_working = False
if self.current_job_id:
job = Job.load(self.manager.redis_connection, self.current_job_id)
if job:
job.cancel()

def _wait_for_jobs(self):
while self.continue_working:
job_id = self.manager.queue.pop()
if job_id:
Expand All @@ -270,6 +290,7 @@ def _save_status(self):
self.manager.redis_connection.hmset(self._key, self.status)

def _fork_and_process(self, job_id):
self.current_job_id = job_id
self.child_pid = os.fork()
if self.child_pid == 0:
self.set_title("processing %s" % job_id)
Expand All @@ -291,6 +312,9 @@ def _fork_and_process(self, job_id):
logging.info("[%s] Finished Processing %s (pid: %d status: %d)",
self.name, job_id, self.child_pid, status)

self.child_pid = None
self.current_job_id = None

def _process(self, job_id):
redis_connection = redis.StrictRedis(**self.redis_connection_params)
job = Job.load(redis_connection, job_id)
Expand Down

0 comments on commit 08d6a90

Please sign in to comment.