-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathsend.py
executable file
·696 lines (533 loc) · 22.2 KB
/
send.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
#!/usr/bin/env python3
"""
@summary: submit many contract.set(arg) transactions to the example contract
@version: v52 (22/January/2019)
@since: 17/April/2018
@author: https://github.com/drandreaskrueger
@see: https://github.com/drandreaskrueger/chainhammer for updates
"""
# extend sys.path for imports:
if __name__ == '__main__' and __package__ is None:
from os import sys, path
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
################
## Dependencies:
# standard library:
import sys, time, random, json
from threading import Thread
from queue import Queue
from pprint import pprint
# pypi:
import requests # pip3 install requests
import web3
from web3 import Web3, HTTPProvider # pip3 install web3
from web3.utils.abi import filter_by_name, abi_to_signature
from web3.utils.encoding import pad_hex
# chainhammer:
from hammer.config import RPCaddress, ROUTE, PRIVATE_FOR, EXAMPLE_ABI
from hammer.config import PARITY_UNLOCK_EACH_TRANSACTION
from hammer.config import GAS_FOR_SET_CALL
from hammer.config import FILE_LAST_EXPERIMENT, EMPTY_BLOCKS_AT_END
from hammer.deploy import loadFromDisk
from hammer.clienttools import web3connection, unlockAccount
##########################
## smart contract related:
def initialize_fromAddress():
"""
initialise contract object from address, stored in disk file by deploy.py
"""
contractAddress, abi = loadFromDisk()
myContract = w3.eth.contract(address=contractAddress,
abi=abi)
return myContract
def contract_set_via_web3(contract, arg, hashes = None, privateFor=PRIVATE_FOR, gas=GAS_FOR_SET_CALL):
"""
call the .set(arg) method, possibly with 'privateFor' tx-property
using the web3 method
"""
txParameters = {'from': w3.eth.defaultAccount,
'gas' : gas}
if privateFor:
txParameters['privateFor'] = privateFor # untested
# pprint (txParameters)
if PARITY_UNLOCK_EACH_TRANSACTION:
unlockAccount()
tx = contract.functions.set( x=arg ).transact(txParameters)
# print ("[sent via web3]", end=" ") # TODO: not print this here but at start
print (".", end=" ") # TODO: not print this here but at start
tx = w3.toHex(tx)
if not hashes==None:
hashes.append(tx)
return tx
def try_contract_set_via_web3(contract, arg=42):
"""
test the above
"""
tx = contract_set_via_web3(contract, arg=arg)
print (tx)
tx_receipt = w3.eth.waitForTransactionReceipt(tx)
storedData = contract.functions.get().call()
print (storedData)
return storedData
## Manually build & submit transaction, i.e. not going though web3
## (the hope of @jpmsam was that this would speed it up)
##
## Note that the data compilation steps are already implemented as
## myContract.functions.myMethod(*args, **kwargs).buildTransaction(transaction)
## but the following bypasses web3.py completely!
def contract_method_ID(methodname, abi):
"""
build the 4 byte ID, from abi & methodname
"""
method_abi = filter_by_name(methodname, abi)
assert(len(method_abi)==1)
method_abi = method_abi[0]
method_signature = abi_to_signature(method_abi)
method_signature_hash_bytes = w3.sha3(text=method_signature)
method_signature_hash_hex = w3.toHex(method_signature_hash_bytes)
method_signature_hash_4bytes = method_signature_hash_hex[0:10]
return method_signature_hash_4bytes
def argument_encoding(contract_method_ID, arg):
"""
concatenate method ID + padded parameter
"""
arg_hex = w3.toHex(arg)
arg_hex_padded = pad_hex ( arg_hex, bit_size=256)
data = contract_method_ID + arg_hex_padded [2:]
return data
def timeit_argument_encoding():
"""
test the above:
'Doing that 10000 times ... took 0.45 seconds'
"""
timer = time.clock()
reps = 10000
for i in range(reps):
method_ID = contract_method_ID("set", ABI)
data = argument_encoding(method_ID, 7)
timer = time.clock() - timer
print (data)
# no need to precalculate, it takes near to no time:
print ("Doing that %d times ... took %.2f seconds" % (reps, timer) )
def contract_set_via_RPC(contract, arg, hashes = None, privateFor=PRIVATE_FOR, gas=GAS_FOR_SET_CALL):
"""
call the .set(arg) method numTx=10
not going through web3
but directly via RPC
suggestion by @jpmsam
https://github.com/jpmorganchase/quorum/issues/346#issuecomment-382216968
"""
method_ID = contract_method_ID("set", contract.abi) # TODO: make this "set" flexible for any method name
data = argument_encoding(method_ID, arg)
txParameters = {'from': w3.eth.defaultAccount,
'to' : contract.address,
'gas' : w3.toHex(gas),
'data' : data}
if privateFor:
txParameters['privateFor'] = privateFor # untested
method = 'eth_sendTransaction'
payload= {"jsonrpc" : "2.0",
"method" : method,
"params" : [txParameters],
"id" : 1}
headers = {'Content-type' : 'application/json'}
response = requests.post(RPCaddress, json=payload, headers=headers)
# print('raw json response: {}'.format(response.json()))
tx = response.json()['result']
# print ("[sent directly via RPC]", end=" ") # TODO: not print this here but at start
print (".", end=" ") # TODO: not print this here but at start
if not hashes==None:
hashes.append(tx)
return tx
def try_contract_set_via_RPC(contract, steps=3):
"""
test the above, write 3 transactions, and check the storedData
"""
rand = random.randint(1, 100)
for number in range(rand, rand+steps):
tx = contract_set_via_RPC(contract, number)
print ("after setat(%d) tx" % number, tx, " the storedData now is", end=" ")
# TODO: wait for receipt!
storedData = contract.functions.get().call()
print (storedData)
# CHOOSE which route to choose (web3 / RPC) depending on constant ROUTE
contract_set = contract_set_via_web3 if ROUTE=="web3" else contract_set_via_RPC
################################################################
###
### benchmarking routines
###
### 0 blocking
### 1 async
### 2 async, queue, can give number of workers
### 3 async, batched (obsolete)
###
################################################################
def many_transactions_consecutive(contract, numTx):
"""
naive approach, blocking --> 15 TPS
"""
print ("send %d transactions, non-async, one after the other:\n" % (numTx))
txs = []
for i in range(numTx):
tx = contract_set(contract, i)
print ("set() transaction submitted: ", tx) # Web3.toHex(tx)) # new web3
txs.append(tx)
return txs
def many_transactions_threaded(contract, numTx):
"""
submit many transactions multi-threaded.
N.B.: 1 thread / transaction
--> machine can run out of threads, then crash
"""
print ("send %d transactions, multi-threaded, one thread per tx:\n" % (numTx))
threads = []
txs = [] # container to keep all transaction hashes
for i in range(numTx):
t = Thread(target = contract_set,
args = (contract, i, txs))
threads.append(t)
print (".", end="")
print ("%d transaction threads created." % len(threads))
for t in threads:
t.start()
print (".", end="")
sys.stdout.flush()
print ("all threads started.")
for t in threads:
t.join()
print ("all threads ended.")
return txs
def many_transactions_threaded_Queue(contract, numTx, num_worker_threads=25):
"""
submit many transactions multi-threaded,
with size limited threading Queue
"""
line = "send %d transactions, via multi-threading queue with %d workers:\n"
print (line % (numTx, num_worker_threads))
q = Queue()
txs = [] # container to keep all transaction hashes
def worker():
while True:
item = q.get()
contract_set(contract, item, txs)
print ("T", end=""); sys.stdout.flush()
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
print ("W", end=""); sys.stdout.flush()
print ("\n%d worker threads created." % num_worker_threads)
for i in range(numTx):
q.put (i)
print ("I", end=""); sys.stdout.flush()
print ("\n%d items queued." % numTx)
q.join()
print ("\nall items - done.")
return txs
def many_transactions_threaded_in_batches(contract, numTx, batchSize=25):
"""
submit many transactions multi-threaded;
but in batches of rather small numbers.
OBSOLETE <-- not faster than threaded2.
"""
line = "send %d transactions, multi-threaded, one thread per tx, " \
"in batches of %d parallel threads:\n"
print (line % (numTx, batchSize))
txs = [] # container to keep all transaction hashes
howManyLeft=numTx
while howManyLeft>0:
line = "Next batch of %d transactions ... %d left to do"
print (line % (batchSize, howManyLeft))
threads = []
number = batchSize if howManyLeft>batchSize else howManyLeft
for i in range(number):
t = Thread(target = contract_set,
args = (contract, i, txs))
threads.append(t)
print (".", end="")
print ("\n%d transaction threads created." % len(threads))
for t in threads:
t.start()
print (".", end="")
sys.stdout.flush()
print ("\nall threads started.")
for t in threads:
t.join()
print ("\nall threads ended.")
howManyLeft -= number
return txs
################################################################
###
### control sample: have the transactions been SUCCESSFUL ?
###
################################################################
def hasTxSucceeded(tx_receipt): #, gasGiven=GAS_FOR_SET_CALL):
# txReceipt.status or None
status = tx_receipt.get("status", None)
if status == 1: # clear answer = transaction succeeded!
return True
if status == 0: # clear answer = transaction failed!
return False
# unfortunately not all clients support status field yet (e.g. testrpc-py, quorum)
# second way is to compare gasGiven with gasUsed:
tx_hash=tx_receipt.transactionHash
gasGiven = w3.eth.getTransaction(tx_hash)["gas"]
gasLeftOver = tx_receipt.gasUsed < gasGiven
if not gasLeftOver:
# many types of transaction failures result in all given gas being used up
# e.g. a failed assert() in solidity leads to all gas used up
# Then it's clear = transaction failed!
return False
if gasLeftOver:
# THIS is the dangerous case, because
# e.g. solidity throw / revert() / require() are also returning some unused gas!
# As well as SUCCESSFUL transactions are returning some gas!
# But for clients without the status field, this is the only indicator, so:
return True
def receiptGetter(tx_hash, timeout, resultsDict):
try:
resultsDict[tx_hash] = w3.eth.waitForTransactionReceipt(tx_hash, timeout)
except web3.utils.threads.Timeout:
pass
def getReceipts_multithreaded(tx_hashes, timeout):
"""
one thread per tx_hash
"""
tx_receipts = {}
print("Waiting for %d transaction receipts, can possibly take a while ..." % len(tx_hashes))
threads = []
for tx_hash in tx_hashes:
t = Thread(target = receiptGetter,
args = (tx_hash, timeout, tx_receipts))
threads.append(t)
t.start()
# wait for all of them coming back:
for t in threads:
t.join()
return tx_receipts
def controlSample_transactionsSuccessful(txs, sampleSize=50, timeout=100):
"""
Makes sure that the transactions were actually successful,
and did not fail because e.g. running out of gas, etc.
We want to benchmark the speed of successful state changes!!
Method: Instead of checking EVERY transaction this just takes some sample.
It can fail in three very different ways:
* timeout when waiting for tx-receipt, then you try raising the timeout seconds
* tx_receipt.status == 0 for any of the sampled transactions. Real tx failure!
* all given gas used up. It's only an indirect indicator for a failed transaction.
"""
print ("Check control sample.")
N = sampleSize if len(txs)>sampleSize else len(txs)
txs_sample = random.sample(txs, N)
tx_receipts = getReceipts_multithreaded(tx_hashes=txs_sample, timeout=timeout)
# Test 1: Are all receipts here?
M = len(tx_receipts)
if M != N:
print ("Bad: Timeout, received receipts only for %d out of %d sampled transactions." % (M, N))
success = False
else:
print ("Good: No timeout, received the receipts for all %d sampled transactions." % N)
success = True
# Test 2: Was each an every transaction successful?
badCounter=0
for tx_hash, tx_receipt in tx_receipts.items():
# status = tx_receipt.get("status", None) # unfortunately not all clients support this yet
# print ((tx_hash, status, tx_receipt.gasUsed ))
if not hasTxSucceeded(tx_receipt):
success = False
print ("Transaction NOT successful:", tx_hash, tx_receipt)
badCounter = badCounter+1
# pprint (dict(tx_receipt))
if badCounter:
print ("Bad: %d out of %d not successful!" % (badCounter, M))
print ("Sample of %d transactions checked ... hints at:" % M, end=" ")
print( "TOTAL SUCCESS :-)" if success else "-AT LEAST PARTIAL- FAILURE :-(" )
return success
# Try out the above with
# pytest tests/test_send.py::test_controlSample_transactionsSuccessful
################################################################################
###
### estimate range of blocks, first and last 100 transaction hashes
###
################################################################################
def getReceipts_multithreaded_Queue(tx_hashes, timeout, num_worker_threads=8, ifPrint=False):
"""
Query the RPC via a multithreading Queue, with 8 worker threads.
Advantage over 'getReceipts_multithreaded':
Will also work for len(tx_hashes) > 1000
"""
start=time.monotonic()
q = Queue()
tx_receipts = {}
def worker():
while True:
tx_hash = q.get()
receiptGetter(tx_hash, timeout, tx_receipts)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for tx in tx_hashes:
q.put (tx)
q.join()
if ifPrint:
duration = time.monotonic() - start
print ("%d lookups took %.1f seconds" % (len(tx_receipts), duration))
return tx_receipts
def when_last_ones_mined__give_range_of_block_numbers(txs, txRangesSize=100, timeout=60):
"""
Also only a heuristics:
Assuming that the first 100 and the last 100 transaction hashes
that had been added to the list 'txs'
can reveal the min and max blocknumbers of this whole experiment
"""
txs_begin_and_end = txs[:txRangesSize] + txs[-txRangesSize:]
tx_receipts = getReceipts_multithreaded_Queue(tx_hashes=txs_begin_and_end,
timeout=timeout) #, ifPrint=True)
# or actually, all of them? Naaa, too slow:
# TestRPC: 2000 lookups took 122.1 seconds
# Parity: 2000 lookups took 7.2 seconds
# Geth: 2000 lookups took 8.6 seconds
# tx_receipts = getReceipts_multithreaded_Queue(tx_hashes=txs,
# timeout=timeout, ifPrint=True)
blockNumbers = [receipt.blockNumber for receipt in tx_receipts.values()]
blockNumbers = sorted(list(set(blockNumbers))) # make unique
# print (blockNumbers)
return min(blockNumbers), max(blockNumbers)
def store_experiment_data(success, num_txs,
block_from, block_to,
empty_blocks,
filename=FILE_LAST_EXPERIMENT):
"""
most basic data about this last experiment,
stored in same (overwritten) file.
Purpose: diagramming should be able to calc proper averages & select ranges
"""
data = {"send" : {
"block_first" : block_from,
"block_last": block_to,
"empty_blocks": empty_blocks,
"num_txs" : num_txs,
"sample_txs_successful": success
},
"node" : {
"rpc_address": RPCaddress,
"web3.version.node": w3.version.node,
"name" : NODENAME,
"type" : NODETYPE,
"version" : NODEVERSION,
"consensus" : CONSENSUS,
"network_id" : NETWORKID,
"chain_name" : CHAINNAME,
"chain_id" : CHAINID
}
}
with open(filename, "w") as f:
json.dump(data, f)
def wait_some_blocks(waitBlocks=EMPTY_BLOCKS_AT_END, pauseBetweenQueries=0.3):
"""
Actually, the waiting has to be done here,
because ./send.py is started later than ./tps.py
So when ./send.py ends, the analysis can happen.
"""
blockNumber_start = w3.eth.blockNumber
print ("blocknumber now:", blockNumber_start, end=" ")
print ("waiting for %d empty blocks:" % waitBlocks)
bn_previous=bn_now=blockNumber_start
while bn_now < waitBlocks + blockNumber_start:
time.sleep(pauseBetweenQueries)
bn_now=w3.eth.blockNumber
# print (bn_now, waitBlocks + blockNumber_start)
if bn_now!=bn_previous:
bn_previous=bn_now
print (bn_now, end=" "); sys.stdout.flush()
print ("Done.")
def finish(txs, success):
block_from, block_to = when_last_ones_mined__give_range_of_block_numbers(txs)
txt = "Transaction receipts from beginning and end all arrived. Blockrange %d to %d."
txt = txt % (block_from, block_to)
print (txt)
if NODETYPE=="TestRPC" or (NODENAME=="Parity" and CHAINNAME=="developmentchain" and NETWORKID==17):
print ("Do not wait for empty blocks, as this is TestRPC, or parity instantseal.")
waitBlocks=0
else:
waitBlocks=EMPTY_BLOCKS_AT_END
wait_some_blocks(waitBlocks)
store_experiment_data (success, len(txs), block_from, block_to, empty_blocks=waitBlocks)
# print ("Data stored. This will trigger tps.py to end in ~ %d blocks." % EMPTY_BLOCKS_AT_END)
print ("Data stored. This will trigger tps.py to end.\n"
"(Beware: Wait ~0.5s until tps.py stops and writes to same file.)")
# see tps.py --> pauseBetweenQueries=0.3
################################################################################
###
### choose, depending on CLI parameter
###
################################################################################
def check_CLI_or_syntax_info_and_exit():
"""
before anything, check if number of parameters is fine, or print syntax instructions
"""
#print ("len(sys.argv)=", len(sys.argv))
if not (2 <= len(sys.argv) <= 4):
print ("Needs parameters:")
print ("%s numTransactions algorithm [workers]" % sys.argv[0])
print ("at least numTransactions, e.g.")
print ("%s 1000" % sys.argv[0])
exit()
def sendmany(contract):
"""
sends many transactions to contract.
choose algorithm depending on 2nd CLI argument.
"""
# TODO: Perhaps extend this to a full blown (config.py) settings printer?
# but then in tps.py because only that output is visible in run.sh
print("\nCurrent blockNumber = ", w3.eth.blockNumber)
numTransactions = int(sys.argv[1])
if ROUTE=="RPC": route = "RPC directly"
if ROUTE=="web3": route = "web3 library"
print ("You want me to send %d transactions, via route: %s." % (numTransactions, route))
# choose algorithm depending on 2nd CLI argument:
if len(sys.argv)==2 or sys.argv[2]=="sequential":
# blocking, non-async
txs=many_transactions_consecutive(contract, numTransactions)
elif sys.argv[2]=="threaded1":
txs=many_transactions_threaded(contract, numTransactions)
elif sys.argv[2]=="threaded2":
num_workers = 100
if len(sys.argv)==4:
try:
num_workers = int(sys.argv[3])
except:
pass
txs=many_transactions_threaded_Queue(contract,
numTx=numTransactions,
num_worker_threads=num_workers)
elif sys.argv[2]=="threaded3":
batchSize=25
txs=many_transactions_threaded_in_batches(contract,
numTx=numTransactions,
batchSize=batchSize)
else:
print ("Nope. Choice '%s'" % sys.argv[2], "not recognized.")
exit()
print ("%d transaction hashes recorded, examples: %s" % (len(txs), txs[:2]))
return txs
if __name__ == '__main__':
check_CLI_or_syntax_info_and_exit()
global w3, NODENAME, NODETYPE, NODEVERSION, CONSENSUS, NETWORKID, CHAINNAME, CHAINID
w3, chainInfos = web3connection(RPCaddress=RPCaddress, account=None)
NODENAME, NODETYPE, NODEVERSION, CONSENSUS, NETWORKID, CHAINNAME, CHAINID = chainInfos
# wait_some_blocks(0); exit()
# timeit_argument_encoding(); exit()
# try_contract_set_via_web3(contract); exit()
# try_contract_set_via_RPC(contract); exit()
w3.eth.defaultAccount = w3.eth.accounts[0] # set first account as sender
contract = initialize_fromAddress()
txs = sendmany(contract)
sys.stdout.flush() # so that the log files are updated.
success = controlSample_transactionsSuccessful(txs)
sys.stdout.flush()
finish(txs, success)
sys.stdout.flush()