Skip to content

Commit

Permalink
Reconnect disconnected channels automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
kpayson64 committed May 22, 2017
1 parent 5ef2087 commit 8f7bc54
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def drive():
class _ChannelConnectivityState(object):

def __init__(self, channel):
self.lock = threading.Lock()
self.lock = threading.RLock()
self.channel = channel
self.polling = False
self.connectivity = None
Expand Down Expand Up @@ -926,6 +926,11 @@ def __init__(self, target, options, credentials):
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)

# TODO(https://github.com/grpc/grpc/issues/9884)
# Temporary work around UNAVAILABLE issues
# Remove this once c-core has retry support
_subscribe(self._connectivity_state, lambda *args: None, None)

def subscribe(self, callback, try_to_connect=None):
_subscribe(self._connectivity_state, callback, try_to_connect)

Expand Down
1 change: 1 addition & 0 deletions src/python/grpcio_tests/tests/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"unit._invocation_defects_test.InvocationDefectsTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_test.MetadataTest",
"unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
"unit._sanity._sanity_test.Sanity",
Expand Down
70 changes: 70 additions & 0 deletions src/python/grpcio_tests/tests/unit/_reconnect_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2017, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests that a channel will reconnect if a connection is dropped"""

import unittest

import grpc
from grpc.framework.foundation import logging_pool

from tests.unit.framework.common import test_constants

_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x01'

_UNARY_UNARY = '/test/UnaryUnary'


def _handle_unary_unary(unused_request, unused_servicer_context):
return _RESPONSE


class ReconnectTest(unittest.TestCase):

def test_reconnect(self):
server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
})
server = grpc.server(server_pool, (handler,))
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
multi_callable = channel.unary_unary(_UNARY_UNARY)
self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
server.stop(None)
server = grpc.server(server_pool, (handler,))
server.add_insecure_port('[::]:{}'.format(port))
server.start()
self.assertEqual(_RESPONSE, multi_callable(_REQUEST))


if __name__ == '__main__':
unittest.main(verbosity=2)

0 comments on commit 8f7bc54

Please sign in to comment.