Skip to content

Commit

Permalink
fix: fix early pea shutdown logic (jina-ai#2947)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM authored Jul 14, 2021
1 parent 46cf10b commit e089375
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
20 changes: 17 additions & 3 deletions jina/peapods/peas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def close(self) -> None:
This method makes sure that the `Process/thread` is properly finished and its resources properly released
"""
# if that 1s is not enough, it means the process/thread is still in forever loop, cancel it
self.logger.debug('waiting for ready or shutdown signal from runtime')
if self.is_ready.is_set() and not self.is_shutdown.is_set():
try:
self._deactivate_runtime()
Expand Down Expand Up @@ -331,11 +332,24 @@ def close(self) -> None:
# sometimes, we arrive to the close logic before the `is_ready` is even set.
# Observed with `gateway` when Pods fail to start
self.logger.warning(
'Pea is being closed before being ready. Most likely some other Pea in the Flow or Pod'
'Pea is being closed before being ready. Most likely some other Pea in the Flow or Pod '
'failed to start'
)
if self.is_ready.wait(timeout=0.1):
self._cancel_runtime()
_timeout = self.args.timeout_ready
if _timeout <= 0:
_timeout = None
else:
_timeout /= 1e3
self.logger.debug('waiting for ready or shutdown signal from runtime')
if self.ready_or_shutdown.event.wait(_timeout):
if self.is_ready.is_set():
self._cancel_runtime()
if not self.is_shutdown.wait(timeout=self._timeout_ctrl):
self.terminate()
time.sleep(0.1)
raise Exception(
f'Shutdown signal was not received for {self._timeout_ctrl}'
)
else:
self.logger.warning(
'Terminating process after waiting for readiness signal for graceful shutdown'
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/flow/test_flow_except.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def f3(*args):
hit.clear()


@pytest.mark.repeat(10)
@pytest.mark.timeout(10)
@pytest.mark.parametrize('protocol', ['websocket', 'grpc', 'http'])
def test_flow_startup_exception_not_hanging(protocol):
Expand All @@ -246,6 +247,7 @@ def __init__(self, *args, **kwargs):
pass


@pytest.mark.repeat(10)
@pytest.mark.timeout(10)
@pytest.mark.parametrize('protocol', ['websocket', 'grpc', 'http'])
def test_flow_startup_exception_not_hanging2(protocol):
Expand Down

0 comments on commit e089375

Please sign in to comment.