forked from nats-io/nats.py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadvanced.py
51 lines (41 loc) · 1.62 KB
/
advanced.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
try:
# Setting explicit list of servers in a cluster.
await nc.connect(servers=["nats://127.0.0.1:4222", "nats://127.0.0.1:4223", "nats://127.0.0.1:4224"], loop=loop)
except ErrNoServers as e:
print(e)
return
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
for i in range(0, 20):
await nc.publish(reply, f"i={i}".encode())
await nc.subscribe("help.>", cb=message_handler)
async def request_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Signal the server to stop sending messages after we got 10 already.
await nc.request(
"help.please", b'help', expected=10, cb=request_handler)
try:
# Flush connection to server, returns when all messages have been processed.
# It raises a timeout if roundtrip takes longer than 1 second.
await nc.flush(1)
except ErrTimeout:
print("Flush timeout")
await asyncio.sleep(1, loop=loop)
# Drain gracefully closes the connection, allowing all subscribers to
# handle any pending messages inflight that the server may have sent.
await nc.drain()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()