-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhello.py
110 lines (87 loc) · 3.09 KB
/
hello.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from types import TracebackType
from typing import Optional, Type
import websockets.asyncio.client
import asyncio
import logging
import sys
import random
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class Connection:
websocket: websockets.asyncio.client.ClientConnection
inbox: asyncio.Queue
outbox: asyncio.Queue
loop_task: asyncio.Task
def __init__(self) -> None:
self.websocket = None
self.inbox = asyncio.Queue()
self.outbox = asyncio.Queue()
self.loop_task = None
async def __aenter__(self) -> "Connection":
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
if self.loop_task:
await self.loop_task
if self.websocket:
await self.websocket.close()
async def connect(self, hostname: str, port: str) -> None:
self.websocket = await websockets.asyncio.client.connect(
f"ws://{hostname}:{port}"
)
self.loop_task = asyncio.create_task(self.loop())
async def produce(self) -> None:
while True:
message = await self.outbox.get()
await self.websocket.send(message)
logging.debug("sent from outbox")
async def consume(self) -> None:
async for message in self.websocket:
await self.inbox.put(message)
logging.debug("received to inbox")
async def loop(self) -> None:
await asyncio.gather(self.consume(), self.produce())
class Subscriber:
connection: Connection
loop_task: asyncio.Task
def __init__(self, connection: Connection):
self.connection = connection
self.loop_task = None
async def __aenter__(self) -> "Subscriber":
self.loop_task = asyncio.create_task(self.consume())
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
if self.loop_task:
await self.loop_task
async def consume(self) -> None:
while True:
message = await self.connection.inbox.get()
logging.debug("received from inbox")
print(message)
async def send_something(self, message: str) -> None:
await self.connection.outbox.put(message)
logging.debug("sent to outbox")
async def main():
async with Connection() as connection:
await connection.connect("127.0.0.1", "4444")
async with Subscriber(connection) as subscriber:
async def foo(wait: int):
await subscriber.send_something(str(wait))
asyncio.gather(*[foo(i) for i in range(5)])
return
if __name__ == "__main__":
try:
sys.exit(asyncio.run(main(), debug=True))
except asyncio.CancelledError as e:
print(f"main() was cancelled: {e}")
except KeyboardInterrupt as _:
print("Received KeyboardInterrupt - shutting down")