Skip to content

Commit

Permalink
Merge pull request grpc#25409 from sergiitk/xds-k8s-use-grpc-health-c…
Browse files Browse the repository at this point in the history
…heck

xds-k8s driver: switch Backend Health Check from TCP to GRPC
  • Loading branch information
srini100 authored Feb 12, 2021
2 parents ee915d8 + e98cd15 commit b1bf3f8
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 43 deletions.
31 changes: 25 additions & 6 deletions tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import server_app

logger = logging.getLogger(__name__)
# Flags
Expand All @@ -61,6 +62,9 @@
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)

_DEFAULT_SECURE_MODE_MAINTENANCE_PORT = \
server_app.KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT


def main(argv):
if len(argv) > 1:
Expand All @@ -76,6 +80,7 @@ def main(argv):
# Test server
server_name = xds_flags.SERVER_NAME.value
server_port = xds_flags.SERVER_PORT.value
server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
server_xds_host = xds_flags.SERVER_XDS_HOST.value
server_xds_port = xds_flags.SERVER_XDS_PORT.value

Expand All @@ -92,17 +97,23 @@ def main(argv):
project=project,
resource_prefix=namespace,
network=network)
if server_maintenance_port is None:
server_maintenance_port = _DEFAULT_SECURE_MODE_MAINTENANCE_PORT

try:
if command in ('create', 'cycle'):
logger.info('Create mode')
if security_mode is None:
logger.info('No security')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)

elif security_mode == 'mtls':
logger.info('Setting up mtls')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
Expand All @@ -115,7 +126,9 @@ def main(argv):

elif security_mode == 'tls':
logger.info('Setting up tls')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
Expand All @@ -128,7 +141,9 @@ def main(argv):

elif security_mode == 'plaintext':
logger.info('Setting up plaintext')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
Expand All @@ -143,7 +158,9 @@ def main(argv):
# Error case: server expects client mTLS cert,
# but client configured only for TLS
logger.info('Setting up mtls_error')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
Expand All @@ -158,7 +175,9 @@ def main(argv):
# Error case: client does not authorize server
# because of mismatched SAN name.
logger.info('Setting up mtls_error')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
# Regular TLS setup, but with client policy configured using
# intentionality incorrect server_namespace.
td.setup_server_security(server_namespace=namespace,
Expand Down
6 changes: 4 additions & 2 deletions tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ def main(argv):

if _CMD.value == 'run':
logger.info('Run server, secure_mode=%s', _SECURE.value)
server_runner.run(test_port=xds_flags.SERVER_PORT.value,
secure_mode=_SECURE.value)
server_runner.run(
test_port=xds_flags.SERVER_PORT.value,
maintenance_port=xds_flags.SERVER_MAINTENANCE_PORT.value,
secure_mode=_SECURE.value)

elif _CMD.value == 'cleanup':
logger.info('Cleanup server')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,37 @@ def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):

class HealthCheckProtocol(enum.Enum):
TCP = enum.auto()
GRPC = enum.auto()

class BackendServiceProtocol(enum.Enum):
HTTP2 = enum.auto()
GRPC = enum.auto()

def create_health_check_tcp(self,
name,
use_serving_port=False) -> GcpResource:
def create_health_check(self,
name: str,
protocol: HealthCheckProtocol,
*,
port: Optional[int] = None) -> GcpResource:
if protocol is self.HealthCheckProtocol.TCP:
health_check_field = 'tcpHealthCheck'
elif protocol is self.HealthCheckProtocol.GRPC:
health_check_field = 'grpcHealthCheck'
else:
raise TypeError(f'Unexpected Health Check protocol: {protocol}')

health_check_settings = {}
if use_serving_port:
if port is None:
health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
else:
health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
health_check_settings['port'] = port

return self._insert_resource(self.api.healthChecks(), {
'name': name,
'type': 'TCP',
'tcpHealthCheck': health_check_settings,
})
return self._insert_resource(
self.api.healthChecks(), {
'name': name,
'type': protocol.name,
health_check_field: health_check_settings,
})

def delete_health_check(self, name):
self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ZonalGcpResource = _ComputeV1.ZonalGcpResource
BackendServiceProtocol = _ComputeV1.BackendServiceProtocol
_BackendGRPC = BackendServiceProtocol.GRPC
_HealthCheckGRPC = HealthCheckProtocol.GRPC

# Network Security
_NetworkSecurityV1Alpha1 = gcp.network_security.NetworkSecurityV1Alpha1
Expand Down Expand Up @@ -83,15 +84,18 @@ def setup_for_grpc(
service_host,
service_port,
*,
backend_protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
self.setup_backend_for_grpc(protocol=backend_protocol)
backend_protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.setup_backend_for_grpc(protocol=backend_protocol,
health_check_port=health_check_port)
self.setup_routing_rule_map_for_grpc(service_host, service_port)

def setup_backend_for_grpc(self,
*,
protocol: Optional[
BackendServiceProtocol] = _BackendGRPC):
self.create_health_check()
def setup_backend_for_grpc(
self,
*,
protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.create_health_check(port=health_check_port)
self.create_backend_service(protocol)

def setup_routing_rule_map_for_grpc(self, service_host, service_port):
Expand All @@ -113,17 +117,20 @@ def cleanup(self, *, force=False):
def _ns_name(self, name):
return f'{self.resource_prefix}-{name}'

def create_health_check(self, protocol=HealthCheckProtocol.TCP):
def create_health_check(
self,
*,
protocol: Optional[HealthCheckProtocol] = _HealthCheckGRPC,
port: Optional[int] = None):
if self.health_check:
raise ValueError(f'Health check {self.health_check.name} '
'already created, delete it first')
if protocol is None:
protocol = _HealthCheckGRPC

name = self._ns_name(self.HEALTH_CHECK_NAME)
logger.info('Creating %s Health Check "%s"', protocol.name, name)
if protocol is HealthCheckProtocol.TCP:
resource = self.compute.create_health_check_tcp(
name, use_serving_port=True)
else:
raise ValueError('Unexpected protocol')
resource = self.compute.create_health_check(name, protocol, port=port)
self.health_check = resource

def delete_health_check(self, force=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def get_server_socket_matching_client(self,


class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_TEST_PORT = 8080
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081

def __init__(self,
k8s_namespace,
Expand Down Expand Up @@ -176,7 +179,7 @@ def __init__(self,

def run(self,
*,
test_port=8080,
test_port=DEFAULT_TEST_PORT,
maintenance_port=None,
secure_mode=False,
server_id=None,
Expand All @@ -190,7 +193,11 @@ def run(self,
# maintenance services can be reached independently from the security
# configuration under test.
if maintenance_port is None:
maintenance_port = test_port if not secure_mode else test_port + 1
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT

if secure_mode and maintenance_port == test_port:
raise ValueError('port and maintenance_port must be different '
'when running test server in secure mode')
Expand Down
9 changes: 9 additions & 0 deletions tools/run_tests/xds_k8s_test_driver/framework/xds_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@
help="Server deployment and service name")
SERVER_PORT = flags.DEFINE_integer("server_port",
default=8080,
lower_bound=0,
upper_bound=65535,
help="Server test port")
SERVER_MAINTENANCE_PORT = flags.DEFINE_integer(
"server_maintenance_port",
lower_bound=0,
upper_bound=65535,
default=None,
help="Server port running maintenance services: health check, channelz, etc"
)
SERVER_XDS_HOST = flags.DEFINE_string("server_xds_host",
default='xds-test-server',
help="Test server xDS hostname")
Expand Down
37 changes: 28 additions & 9 deletions tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
_ChannelState = grpc_channelz.ChannelState
_timedelta = datetime.timedelta
_DEFAULT_SECURE_MODE_MAINTENANCE_PORT = \
server_app.KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT


class XdsKubernetesTestCase(absltest.TestCase):
Expand All @@ -68,6 +70,7 @@ def setUpClass(cls):
cls.server_image = xds_k8s_flags.SERVER_IMAGE.value
cls.server_name = xds_flags.SERVER_NAME.value
cls.server_port = xds_flags.SERVER_PORT.value
cls.server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
cls.server_xds_host = xds_flags.SERVER_NAME.value
cls.server_xds_port = xds_flags.SERVER_XDS_PORT.value

Expand Down Expand Up @@ -110,7 +113,9 @@ def tearDown(self):
force_namespace=self.force_cleanup)

def setupTrafficDirectorGrpc(self):
self.td.setup_for_grpc(self.server_xds_host, self.server_xds_port)
self.td.setup_for_grpc(self.server_xds_host,
self.server_xds_port,
health_check_port=self.server_maintenance_port)

def setupServerBackends(self, *, wait_for_healthy_status=True):
# Load Backends
Expand Down Expand Up @@ -199,9 +204,11 @@ def setUp(self):
reuse_namespace=self.server_namespace == self.client_namespace)

def startTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
test_server = self.server_runner.run(replica_count=replica_count,
test_port=self.server_port,
**kwargs)
test_server = self.server_runner.run(
replica_count=replica_count,
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
**kwargs)
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
return test_server

Expand All @@ -220,6 +227,17 @@ class SecurityMode(enum.Enum):
TLS = enum.auto()
PLAINTEXT = enum.auto()

@classmethod
def setUpClass(cls):
super().setUpClass()
if cls.server_maintenance_port is None:
# In secure mode, the maintenance port is different from
# the test port to keep it insecure, and make
# Health Checks and Channelz tests available.
# When not provided, use explicit numeric port value, so
# Backend Health Checks are created on a fixed port.
cls.server_maintenance_port = _DEFAULT_SECURE_MODE_MAINTENANCE_PORT

def setUp(self):
super().setUp()

Expand Down Expand Up @@ -259,11 +277,12 @@ def setUp(self):
debug_use_port_forwarding=self.debug_use_port_forwarding)

def startSecureTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
test_server = self.server_runner.run(replica_count=replica_count,
test_port=self.server_port,
maintenance_port=8081,
secure_mode=True,
**kwargs)
test_server = self.server_runner.run(
replica_count=replica_count,
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
secure_mode=True,
**kwargs)
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
return test_server

Expand Down
6 changes: 4 additions & 2 deletions tools/run_tests/xds_k8s_test_driver/tests/security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def test_mtls_error(self):
been received as confirmed by the TD team.
"""
# Create backend service
self.td.setup_backend_for_grpc()
self.td.setup_backend_for_grpc(
health_check_port=self.server_maintenance_port)

# Start server and attach its NEGs to the backend service, but
# until they become healthy.
Expand Down Expand Up @@ -145,7 +146,8 @@ def test_server_authz_error(self):
The order of operations is the same as in `test_mtls_error`.
"""
# Create backend service
self.td.setup_backend_for_grpc()
self.td.setup_backend_for_grpc(
health_check_port=self.server_maintenance_port)

# Start server and attach its NEGs to the backend service, but
# until they become healthy.
Expand Down

0 comments on commit b1bf3f8

Please sign in to comment.