forked from MystenLabs/sui
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor_synced.py
executable file
·189 lines (160 loc) · 6.63 KB
/
monitor_synced.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python3
# Copyright (c) Mysten Labs, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import os
import sys
import subprocess
import getopt
from enum import Enum
import time
from datetime import datetime
NUM_RETRIES = 10
CHECKPOINT_SLEEP_SEC = 30
STARTUP_TIMEOUT_SEC = 60
RETRY_BASE_TIME_SEC = 3
AVAILABLE_NETWORKS = ['testnet', 'devnet']
class Metric(Enum):
CHECKPOINT = 'last_executed_checkpoint'
EPOCH = 'current_epoch'
def get_current_network_epoch(env='testnet'):
for i in range(NUM_RETRIES):
cmd = ['curl', '--location', '--request', 'POST', f'https://explorer-rpc.{env}.sui.io/',
'--header', 'Content-Type: application/json', '--data-raw',
'{"jsonrpc":"2.0", "method":"suix_getCurrentEpoch", "params":[], "id":1}']
try:
result = subprocess.check_output(cmd, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f'curl command failed with error {e.returncode}: {e.output}')
time.sleep(RETRY_BASE_TIME_SEC * 2**i) # exponential backoff
continue
try:
result = json.loads(result)
if 'error' in result:
print(
f'suix_getCurrentEpoch rpc request failed: {result["error"]}')
time.sleep(3)
continue
return int(result['result']['epoch'])
except (KeyError, IndexError, json.JSONDecodeError):
print(f'suix_getCurrentEpoch rpc request failed: {result}')
time.sleep(RETRY_BASE_TIME_SEC * 2**i) # exponential backoff
continue
print(f"Failed to get current network epoch after {NUM_RETRIES} tries")
exit(1)
def get_local_metric(metric: Metric):
for i in range(NUM_RETRIES):
curl = subprocess.Popen(
['curl', '-s', 'http://localhost:9184/metrics'], stdout=subprocess.PIPE)
grep_1 = subprocess.Popen(
['grep', metric.value], stdin=curl.stdout, stdout=subprocess.PIPE)
try:
result = subprocess.check_output(
['grep', '^[^#;]'], stdin=grep_1.stdout, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f'curl command failed with error {e.returncode}: {e.output}')
time.sleep(RETRY_BASE_TIME_SEC * 2**i) # exponential backoff
continue
try:
return int(result.split()[1])
except (KeyError, IndexError, json.JSONDecodeError):
print(
f'Failed to get local metric {metric.value}: {result.stdout}')
time.sleep(RETRY_BASE_TIME_SEC * 2**i) # exponential backoff
continue
print(
f"Failed to get local metric {metric.value} after {NUM_RETRIES} tries")
exit(1)
def await_started(start_checkpoint):
for i in range(STARTUP_TIMEOUT_SEC):
if get_local_metric(Metric.CHECKPOINT) != start_checkpoint:
print(f"sui-node started successfully after {i} seconds")
return
print("Awaiting sui-node startup...")
time.sleep(1)
print(f"sui-node failed to start after {STARTUP_TIMEOUT_SEC} seconds")
def usage():
print(
'Usage: monitor_synced.py [--env=<env>] [--end-epoch=<epoch>] [--epoch-timeout=<timeout>] [--verbose]')
print(
f' --env=<env> Environment to sync against (one of {AVAILABLE_NETWORKS.join(", ")}')
print(' --end-epoch=<epoch> Epoch to sync to (default: current network epoch)')
print(' --epoch-timeout=<timeout> Timeout IN MINUTES for syncing to the next epoch (default: None)')
print(' --verbose Print verbose output')
print(' --help Print this help message')
def main(argv):
if len(argv) > 4:
usage()
exit(1)
try:
opts, args = getopt.getopt(
argv, '', ["help", "verbose", "env=", "end-epoch=", "epoch-timeout="])
except getopt.GetoptError as err:
print(err)
usage()
env = 'testnet'
end_epoch = None
epoch_timeout = None
verbose = False
for opt, arg in opts:
if opt == '--help':
usage()
exit(0)
elif opt == '--env':
if arg not in AVAILABLE_NETWORKS:
print(f'Invalid environment {arg}')
exit(1)
env = arg
elif opt == '--end-epoch':
end_epoch = int(arg)
elif opt == '--epoch-timeout':
epoch_timeout = int(arg)
elif opt == '--verbose':
verbose = True
else:
usage()
exit(1)
if end_epoch is None:
end_epoch = get_current_network_epoch(env)
print(f'Will attempt to sync to epoch {end_epoch}')
current_epoch = get_local_metric(Metric.EPOCH)
print(f'Current local epoch: {current_epoch}')
start_epoch = current_epoch
current_checkpoint = get_local_metric(Metric.CHECKPOINT)
print(f'Locally highest executed checkpoint: {current_checkpoint}')
start_checkpoint = current_checkpoint
await_started(start_checkpoint)
current_time = datetime.now()
start_time = current_time
progress_check_iteration = 1
while current_epoch < end_epoch:
# check that we are making progress
time.sleep(CHECKPOINT_SLEEP_SEC)
new_checkpoint = get_local_metric(Metric.CHECKPOINT)
if new_checkpoint == current_checkpoint:
print(
f'WARNING: Checkpoint is stuck at {current_checkpoint} for over {CHECKPOINT_SLEEP_SEC * progress_check_iteration} seconds')
progress_check_iteration += 1
else:
if verbose:
print(f'New highest executed checkpoint: {new_checkpoint}')
current_checkpoint = new_checkpoint
progress_check_iteration = 1
new_epoch = get_local_metric(Metric.EPOCH)
if new_epoch > current_epoch:
current_epoch = new_epoch
print(f'New local epoch: {current_epoch}')
current_time = datetime.now()
else:
# check if we've been stuck on the same epoch for too long
if epoch_timeout is not None and (datetime.now() - current_time).total_seconds() // 60 > epoch_timeout:
print(
f'Epoch is stuck at {current_epoch} for over {epoch_timeout} minutes')
exit(1)
elapsed_minutes = (datetime.now() - start_time).total_seconds() / 60
print('-------------------------------')
print(
f"Successfully synced to epoch {end_epoch} from epoch {start_epoch} ({current_checkpoint - start_checkpoint} checkpoints) in {elapsed_minutes:.2f} minutes")
exit(0)
if __name__ == "__main__":
main(sys.argv[1:])