Skip to content

Commit

Permalink
SCALRCORE-8536 (FAM-328) Enable heartbeats on gevent
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergii Daniuk authored and Marat Komarov committed Feb 7, 2023
1 parent 7dfc1fd commit ae58149
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
16 changes: 14 additions & 2 deletions celery/worker/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class Blueprint(bootsteps.Blueprint):
"""Consumer blueprint."""

name = 'Consumer'
# [FAM-328] We remove Events, Gossip, Heartbeat and Mingle from default
# steps because they are not used by us but interfere with amqp
# heartbeats we need.
default_steps = [
'celery.worker.consumer.connection:Connection',
'celery.worker.consumer.mingle:Mingle',
Expand Down Expand Up @@ -206,8 +209,10 @@ def __init__(self, on_task_request,
self.task_buckets = defaultdict(lambda: None)
self.reset_rate_limits()

self.gevent_env = _detect_environment() == 'gevent'

self.hub = hub
if self.hub or getattr(self.pool, 'is_green', False):
if self.hub or getattr(self.pool, 'is_green', False) or self.gevent_env:
self.amqheartbeat = amqheartbeat
if self.amqheartbeat is None:
self.amqheartbeat = self.app.conf.broker_heartbeat
Expand All @@ -217,7 +222,7 @@ def __init__(self, on_task_request,
if not hasattr(self, 'loop'):
self.loop = loops.asynloop if hub else loops.synloop

if _detect_environment() == 'gevent':
if self.gevent_env:
# there's a gevent bug that causes timeouts to not be reset,
# so if the connection timeout is exceeded once, it can NEVER
# connect again.
Expand Down Expand Up @@ -428,6 +433,13 @@ def connect(self):
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub)

# Install timer to send heartbeats on gevent
if self.gevent_env and self.amqheartbeat:
hbtick = conn.heartbeat_check
logger.debug('Registring heartbeat timer for connection: {}'.format(id(conn)))
self.timer.call_repeatedly(int(self.amqheartbeat / self.amqheartbeat_rate), hbtick, (self.amqheartbeat_rate,))

return conn

def connection_for_read(self, heartbeat=None):
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/consumer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from celery import bootsteps
from celery.utils.log import get_logger

from .mingle import Mingle
from .connection import Connection

__all__ = ('Tasks',)

Expand All @@ -15,7 +15,7 @@
class Tasks(bootsteps.StartStopStep):
"""Bootstep starting the task message consumer."""

requires = (Mingle,)
requires = (Connection,)

def __init__(self, c, **kwargs):
c.task_consumer = c.qos = None
Expand Down

0 comments on commit ae58149

Please sign in to comment.