forked from nats-io/nats.py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclustered.py
96 lines (76 loc) · 2.85 KB
/
clustered.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def run(loop):
nc = NATS()
# Setup pool of servers from a NATS cluster.
options = {
"servers": [
"nats://user1:[email protected]:4222",
"nats://user2:[email protected]:4223",
"nats://user3:[email protected]:4224",
],
"loop": loop,
}
# Will try to connect to servers in order of configuration,
# by defaults it connect to one in the pool randomly.
options["dont_randomize"] = True
# Optionally set reconnect wait and max reconnect attempts.
# This example means 10 seconds total per backend.
options["max_reconnect_attempts"] = 5
options["reconnect_time_wait"] = 2
async def disconnected_cb():
print("Got disconnected!")
async def reconnected_cb():
# See who we are connected to on reconnect.
print(f"Got reconnected to {nc.connected_url.netloc}")
# Setup callbacks to be notified on disconnects and reconnects
options["disconnected_cb"] = disconnected_cb
options["reconnected_cb"] = reconnected_cb
async def error_cb(e):
print(f"There was an error: {e}")
async def closed_cb():
print("Connection is closed")
async def subscribe_handler(msg):
print("Got message: ", msg.subject, msg.reply, msg.data)
# Setup callbacks to be notified when there is an error
# or connection is closed.
options["error_cb"] = error_cb
options["closed_cb"] = closed_cb
try:
await nc.connect(**options)
except ErrNoServers as e:
# Could not connect to any server in the cluster.
print(e)
return
if nc.is_connected:
await nc.subscribe("help.*", cb=subscribe_handler)
max_messages = 1000
start_time = datetime.now()
print(f"Sending {max_messages} messages to NATS...")
for i in range(0, max_messages):
try:
await nc.publish(f"help.{i}", b'A')
await nc.flush(0.500)
except ErrConnectionClosed as e:
print("Connection closed prematurely.")
break
except ErrTimeout as e:
print("Timeout occured when publishing msg i={}: {}".format(
i, e))
end_time = datetime.now()
await nc.close()
duration = end_time - start_time
print(f"Duration: {duration}")
try:
await nc.publish("help", b"hello world")
except ErrConnectionClosed:
print("Can't publish since no longer connected.")
err = nc.last_error
if err is not None:
print(f"Last Error: {err}")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()