forked from OpenBazaar/OpenBazaar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_util.py
executable file
·92 lines (61 loc) · 2.06 KB
/
test_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import time
from tornado.ioloop import IOLoop
from tornado import gen
from tornado.websocket import websocket_connect
from node.db_store import Obdb
def ip_address(i):
return '127.0.0.%s' % str(i + 1)
def nickname(i):
return ''
def get_db_path(i):
return 'db/ob-test-%s.db' % i
def node_uri(node_index):
return 'tcp://127.0.0.%s:12345' % str(node_index + 1)
def set_store_description(i):
ws_send(i, 'update_settings',
{'settings':
{'storeDescription': storeDescription(i),
'nickname': nickname(i)}})
def storeDescription(i):
return 'store %s' % i
def remove_peers_from_db(i):
Obdb(get_db_path(i)).deleteEntries('peers')
def node_to_ws_port(node_index):
return node_index + 8888
def ws_connect(node_index):
port = str(node_to_ws_port(node_index))
@gen.coroutine
def client():
client = yield websocket_connect('ws://localhost:%s/ws' % port)
message = yield client.read_message()
raise gen.Return(json.loads(message))
return IOLoop.current().run_sync(client)
def ws_send_raw(port, string):
@gen.coroutine
def client():
client = yield websocket_connect('ws://localhost:%s/ws' % port)
# skip myself message
message = yield client.read_message()
client.write_message(json.dumps(string))
message = yield client.read_message()
raise gen.Return(json.loads(message))
return IOLoop.current().run_sync(client)
def ws_send(node_index, command, params=None):
if params is None:
params = {}
port = node_to_ws_port(node_index)
cmd = {'command': command,
'id': 1,
'params': params}
ret = ws_send_raw(port, cmd)
time.sleep(0.1)
return ret
def ws_receive_myself(node_index):
port = node_to_ws_port(node_index)
@gen.coroutine
def client():
client = yield websocket_connect('ws://localhost:%s/ws' % port)
message = yield client.read_message()
raise gen.Return(json.loads(message))
return IOLoop.current().run_sync(client)