forked from hyperledger/indy-node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
158 lines (125 loc) · 4.74 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import datetime
import os
import random
from typing import Tuple, Union, TypeVar, List, Callable
import libnacl.secret
from base58 import b58decode
from common.serializers.serialization import serialize_msg_for_signing
from plenum.common.types import f
from plenum.common.util import isHex, cryptonymToHex
from common.error import error
from stp_core.crypto.nacl_wrappers import Verifier
def getMsgWithoutSig(msg, sigFieldName=f.SIG.nm):
msgWithoutSig = {}
for k, v in msg.items():
if k != sigFieldName:
msgWithoutSig[k] = v
return msgWithoutSig
def verifySig(identifier, signature, msg) -> bool:
key = cryptonymToHex(identifier) if not isHex(
identifier) else identifier
ser = serialize_msg_for_signing(msg)
b64sig = signature.encode('utf-8')
sig = b58decode(b64sig)
vr = Verifier(key)
return vr.verify(sig, ser)
def getSymmetricallyEncryptedVal(val, secretKey: Union[str, bytes] = None) -> \
Tuple[str, str]:
"""
Encrypt the provided value with symmetric encryption
:param val: the value to encrypt
:param secretKey: Optional key, if provided should be either in hex or bytes
:return: Tuple of the encrypted value and secret key encoded in hex
"""
if isinstance(val, str):
val = val.encode("utf-8")
if secretKey:
if isHex(secretKey):
secretKey = bytes(bytearray.fromhex(secretKey))
elif not isinstance(secretKey, bytes):
error("Secret key must be either in hex or bytes")
box = libnacl.secret.SecretBox(secretKey)
else:
box = libnacl.secret.SecretBox()
return box.encrypt(val).hex(), box.sk.hex()
def getSymmetricallyDecryptedVal(val, secretKey: Union[str, bytes]) -> str:
if isHex(val):
val = bytes(bytearray.fromhex(val))
elif isinstance(val, str):
val = val.encode("utf-8")
if isHex(secretKey):
secretKey = bytes(bytearray.fromhex(secretKey))
elif isinstance(secretKey, str):
secretKey = secretKey.encode()
box = libnacl.secret.SecretBox(secretKey)
return box.decrypt(val).decode()
def dateTimeEncoding(obj):
if isinstance(obj, datetime.datetime):
return int(obj.strftime('%s'))
raise TypeError('Not sure how to serialize %s' % (obj,))
def getNonce(length=32):
hexChars = [hex(i)[2:] for i in range(0, 16)]
return "".join([random.choice(hexChars) for i in range(length)])
def get_reply_if_confirmed(client, identifier, request_id: int):
reply, status = client.getReply(identifier, request_id)
if status == 'CONFIRMED':
return reply, None
_, errors = \
client.reqRepStore.getAllReplies(identifier, request_id)
if not errors:
return None, None
sender, error_reason = errors.popitem()
return reply, error_reason
# TODO: Should have a timeout, should not have kwargs
def ensureReqCompleted(
loop,
reqKey,
client,
clbk=None,
pargs=None,
kwargs=None,
cond=None):
reply, err = get_reply_if_confirmed(client, *reqKey)
if err is None and reply is None and (cond is None or not cond()):
loop.call_later(.2, ensureReqCompleted, loop,
reqKey, client, clbk, pargs, kwargs, cond)
elif clbk:
# TODO: Do something which makes reply and error optional in the
# callback.
# TODO: This is kludgy, but will be resolved once we move away from
# this callback pattern
if pargs is not None and kwargs is not None:
clbk(reply, err, *pargs, **kwargs)
elif pargs is not None and kwargs is None:
clbk(reply, err, *pargs)
elif pargs is None and kwargs is not None:
clbk(reply, err, **kwargs)
else:
clbk(reply, err)
def getNonceForProof(nonce):
return int(nonce, 16)
T = TypeVar('T')
def getIndex(predicateFn: Callable[[T], bool], items: List[T]) -> int:
"""
Finds the index of an item in list, which satisfies predicate
:param predicateFn: predicate function to run on items of list
:param items: list of tuples
:return: first index for which predicate function returns True
"""
try:
return next(i for i, v in enumerate(items) if predicateFn(v))
except StopIteration:
return -1
def compose_cmd(cmd):
if os.name != 'nt':
cmd = ' '.join(cmd)
return cmd
def invalidate_config_caches():
import stp_core.common.config.util
import plenum.common.config_util
import indy_common.config_util
# All 3 references must be nullified because all they reference
# the same object due to specific logic of getConfig methods
stp_core.common.config.util.CONFIG = None
plenum.common.config_util.CONFIG = None
indy_common.config_util.CONFIG = None