forked from blockchain-etl/ethereum-etl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
134 lines (103 loc) · 4.4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import itertools
import warnings
from ethereumetl.misc.retriable_value_error import RetriableValueError
def hex_to_dec(hex_string):
if hex_string is None:
return None
try:
return int(hex_string, 16)
except ValueError:
print("Not a hex string %s" % hex_string)
return hex_string
def to_int_or_none(val):
if isinstance(val, int):
return val
if val is None or val == '':
return None
try:
return int(val)
except ValueError:
return None
def chunk_string(string, length):
return (string[0 + i:length + i] for i in range(0, len(string), length))
def to_normalized_address(address):
if address is None or not isinstance(address, str):
return address
return address.lower()
def validate_range(range_start_incl, range_end_incl):
if range_start_incl < 0 or range_end_incl < 0:
raise ValueError('range_start and range_end must be greater or equal to 0')
if range_end_incl < range_start_incl:
raise ValueError('range_end must be greater or equal to range_start')
def rpc_response_batch_to_results(response):
for response_item in response:
yield rpc_response_to_result(response_item)
def rpc_response_to_result(response):
result = response.get('result')
if result is None:
error_message = 'result is None in response {}.'.format(response)
if response.get('error') is None:
error_message = error_message + ' Make sure Ethereum node is synced.'
# When nodes are behind a load balancer it makes sense to retry the request in hopes it will go to other,
# synced node
raise RetriableValueError(error_message)
elif response.get('error') is not None and is_retriable_error(response.get('error').get('code')):
raise RetriableValueError(error_message)
raise ValueError(error_message)
return result
def is_retriable_error(error_code):
if error_code is None:
return False
if not isinstance(error_code, int):
return False
# https://www.jsonrpc.org/specification#error_object
if error_code == -32603 or (-32000 >= error_code >= -32099):
return True
return False
def split_to_batches(start_incl, end_incl, batch_size):
"""start_incl and end_incl are inclusive, the returned batch ranges are also inclusive"""
for batch_start in range(start_incl, end_incl + 1, batch_size):
batch_end = min(batch_start + batch_size - 1, end_incl)
yield batch_start, batch_end
def dynamic_batch_iterator(iterable, batch_size_getter):
batch = []
batch_size = batch_size_getter()
for item in iterable:
batch.append(item)
if len(batch) >= batch_size:
yield batch
batch = []
batch_size = batch_size_getter()
if len(batch) > 0:
yield batch
def pairwise(iterable):
"""s -> (s0,s1), (s1,s2), (s2, s3), ..."""
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def check_classic_provider_uri(chain, provider_uri):
if chain == 'classic' and provider_uri == 'https://mainnet.infura.io':
warnings.warn("ETC Chain not supported on Infura.io. Using https://ethereumclassic.network instead")
return 'https://ethereumclassic.network'
return provider_uri