Skip to content

Commit

Permalink
Merge pull request grpc#25076 from lidizheng/aio-examples-3
Browse files Browse the repository at this point in the history
Adding three more AsyncIO examples
  • Loading branch information
lidizheng authored Jan 7, 2021
2 parents c95ed7a + e6dffc6 commit a873c5a
Show file tree
Hide file tree
Showing 16 changed files with 679 additions and 33 deletions.
59 changes: 48 additions & 11 deletions examples/python/debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,35 @@

load("@grpc_python_dependencies//:requirements.bzl", "requirement")

package(default_testonly = 1)

py_binary(
name = "debug_server",
testonly = 1,
srcs = ["debug_server.py"],
data = ["helloworld.proto"],
imports = ["."],
python_version = "PY3",
deps = [
"//examples/protos:helloworld_py_pb2",
"//examples/protos:helloworld_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
"//tools/distrib/python/grpcio_tools:grpc_tools",
],
)

py_binary(
name = "send_message",
testonly = 1,
srcs = ["send_message.py"],
data = ["helloworld.proto"],
imports = ["."],
python_version = "PY3",
deps = [
"//examples/protos:helloworld_py_pb2",
"//examples/protos:helloworld_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//tools/distrib/python/grpcio_tools:grpc_tools",
],
)

py_binary(
name = "get_stats",
testonly = 1,
srcs = ["get_stats.py"],
python_version = "PY3",
deps = [
Expand All @@ -49,17 +51,52 @@ py_binary(
],
)

py_binary(
name = "asyncio_debug_server",
srcs = ["asyncio_debug_server.py"],
data = ["helloworld.proto"],
imports = ["."],
python_version = "PY3",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
"//tools/distrib/python/grpcio_tools:grpc_tools",
],
)

py_binary(
name = "asyncio_send_message",
srcs = ["asyncio_send_message.py"],
data = ["helloworld.proto"],
imports = ["."],
python_version = "PY3",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//tools/distrib/python/grpcio_tools:grpc_tools",
],
)

py_binary(
name = "asyncio_get_stats",
srcs = ["asyncio_get_stats.py"],
python_version = "PY3",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
],
)

py_test(
name = "_debug_example_test",
srcs = ["test/_debug_example_test.py"],
imports = ["."],
python_version = "PY3",
deps = [
":asyncio_debug_server",
":asyncio_get_stats",
":asyncio_send_message",
":debug_server",
":get_stats",
":send_message",
"//examples/protos:helloworld_py_pb2",
"//examples/protos:helloworld_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
],
)
83 changes: 83 additions & 0 deletions examples/python/debug/asyncio_debug_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python AsyncIO example of utilizing Channelz feature."""

import asyncio
import argparse
import logging
import random

import grpc

helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto")

# TODO: Suppress until the macOS segfault fix rolled out
from grpc_channelz.v1 import channelz # pylint: disable=wrong-import-position

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)

_RANDOM_FAILURE_RATE = 0.3


class FaultInjectGreeter(helloworld_pb2_grpc.GreeterServicer):

def __init__(self, failure_rate):
self._failure_rate = failure_rate

async def SayHello(self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext
) -> helloworld_pb2.HelloReply:
if random.random() < self._failure_rate:
context.abort(grpc.StatusCode.UNAVAILABLE,
'Randomly injected failure.')
return helloworld_pb2.HelloReply(message=f'Hello, {request.name}!')


def create_server(addr: str, failure_rate: float) -> grpc.aio.Server:
server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(
FaultInjectGreeter(failure_rate), server)

# Add Channelz Servicer to the gRPC server
channelz.add_channelz_servicer(server)

server.add_insecure_port(addr)
return server


async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--addr',
nargs=1,
type=str,
default='[::]:50051',
help='the address to listen on')
parser.add_argument(
'--failure_rate',
nargs=1,
type=float,
default=0.3,
help='a float indicates the percentage of failed message injections')
args = parser.parse_args()

server = create_server(addr=args.addr, failure_rate=args.failure_rate)
await server.start()
await server.wait_for_termination()


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.get_event_loop().run_until_complete(main())
46 changes: 46 additions & 0 deletions examples/python/debug/asyncio_get_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Poll statistics from the server."""

import asyncio
import logging
import argparse
import grpc

from grpc_channelz.v1 import channelz_pb2
from grpc_channelz.v1 import channelz_pb2_grpc


async def run(addr: str) -> None:
async with grpc.aio.insecure_channel(addr) as channel:
channelz_stub = channelz_pb2_grpc.ChannelzStub(channel)
response = await channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
print('Info for all servers: %s' % response)


async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--addr',
nargs=1,
type=str,
default='[::]:50051',
help='the address to request')
args = parser.parse_args()
run(addr=args.addr)


if __name__ == '__main__':
logging.basicConfig()
asyncio.get_event_loop().run_until_complete(main())
61 changes: 61 additions & 0 deletions examples/python/debug/asyncio_send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Send multiple greeting messages to the backend."""

import asyncio
import logging
import argparse
import grpc

helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto")


async def process(stub: helloworld_pb2_grpc.GreeterStub,
request: helloworld_pb2.HelloRequest) -> None:
try:
response = await stub.SayHello(request)
except grpc.aio.AioRpcError as rpc_error:
print(f'Received error: {rpc_error}')
else:
print(f'Received message: {response}')


async def run(addr: str, n: int) -> None:
async with grpc.aio.insecure_channel(addr) as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
request = helloworld_pb2.HelloRequest(name='you')
for _ in range(n):
await process(stub, request)


async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--addr',
nargs=1,
type=str,
default='[::]:50051',
help='the address to request')
parser.add_argument('-n',
nargs=1,
type=int,
default=10,
help='an integer for number of messages to sent')
args = parser.parse_args()
await run(addr=args.addr, n=args.n)


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.get_event_loop().run_until_complete(main())
8 changes: 5 additions & 3 deletions examples/python/debug/debug_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import random

import grpc
from grpc_channelz.v1 import channelz

from examples.protos import helloworld_pb2
from examples.protos import helloworld_pb2_grpc
helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto")

# TODO: Suppress until the macOS segfault fix rolled out
from grpc_channelz.v1 import channelz # pylint: disable=wrong-import-position

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
Expand Down
8 changes: 5 additions & 3 deletions examples/python/debug/get_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
def run(addr):
with grpc.insecure_channel(addr) as channel:
channelz_stub = channelz_pb2_grpc.ChannelzStub(channel)
response = channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
print('Info for all servers: %s' % response)
# This RPC pulls server-level metrics, like sent/received messages,
# succeeded/failed RPCs. For more info see:
# https://github.com/grpc/grpc/blob/master/src/proto/grpc/channelz/channelz.proto
response = channelz_stub.GetServers(channelz_pb2.GetServersRequest())
print(f'Info for all servers: {response}')


def main():
Expand Down
1 change: 1 addition & 0 deletions examples/python/debug/helloworld.proto
5 changes: 3 additions & 2 deletions examples/python/debug/send_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import logging
import argparse
import grpc
from examples.protos import helloworld_pb2
from examples.protos import helloworld_pb2_grpc

helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
"helloworld.proto")


def process(stub, request):
Expand Down
26 changes: 21 additions & 5 deletions examples/python/debug/test/_debug_example_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
# limitations under the License.
"""Test for gRPC Python debug example."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import asyncio
import logging
import unittest

from examples.python.debug import debug_server
from examples.python.debug import asyncio_debug_server
from examples.python.debug import send_message
from examples.python.debug import asyncio_send_message
from examples.python.debug import get_stats
from examples.python.debug import asyncio_get_stats

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
Expand All @@ -47,7 +47,23 @@ def test_channelz_example(self):
server.stop(None)
# No unhandled exception raised, test passed!

def test_asyncio_channelz_example(self):

async def body():
server = asyncio_debug_server.create_server(
addr='[::]:0', failure_rate=_FAILURE_RATE)
port = server.add_insecure_port('[::]:0')
await server.start()
address = _ADDR_TEMPLATE % port

await asyncio_send_message.run(addr=address, n=_NUMBER_OF_MESSAGES)
await asyncio_get_stats.run(addr=address)
await server.stop(None)
# No unhandled exception raised, test passed!

asyncio.get_event_loop().run_until_complete(body())


if __name__ == '__main__':
logging.basicConfig()
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)
Loading

0 comments on commit a873c5a

Please sign in to comment.