Skip to content

Commit

Permalink
Add BatchHTTPProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
medvedev1088 committed Jul 15, 2018
1 parent 9e970ac commit 0233207
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 37 deletions.
2 changes: 1 addition & 1 deletion ethereumetl/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# Mostly copied from web3.py/providers/ipc.py. Supports batch requests.
# Will be removed once batch feature is added to web3.py https://github.com/ethereum/web3.py/issues/832
# Also see this optimization https://github.com/ethereum/web3.py/pull/849
class IPCWrapper:
class BatchIPCProvider:
_socket = None

def __init__(self, ipc_path=None, testnet=False, timeout=10):
Expand Down
6 changes: 3 additions & 3 deletions ethereumetl/jobs/export_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ def __init__(
start_block,
end_block,
batch_size,
ipc_wrapper,
batch_web3_provider,
max_workers,
item_exporter,
export_blocks=True,
export_transactions=True):
super().__init__(start_block, end_block, batch_size, max_workers)
self.ipc_wrapper = ipc_wrapper
self.batch_web3_provider = batch_web3_provider

self.item_exporter = item_exporter

Expand All @@ -62,7 +62,7 @@ def _start(self):

def _export_batch(self, batch_start, batch_end):
blocks_rpc = list(generate_get_block_by_number_json_rpc(batch_start, batch_end, self.export_transactions))
response = self.ipc_wrapper.make_request(json.dumps(blocks_rpc))
response = self.batch_web3_provider.make_request(json.dumps(blocks_rpc))
results = rpc_response_batch_to_results(response)
blocks = [self.block_mapper.json_dict_to_block(result) for result in results]

Expand Down
6 changes: 3 additions & 3 deletions ethereumetl/jobs/export_contracts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(
self,
contract_addresses_iterable,
batch_size,
ipc_wrapper,
batch_web3_provider,
max_workers,
item_exporter):
self.ipc_wrapper = ipc_wrapper
self.batch_web3_provider = batch_web3_provider
self.contract_addresses_iterable = contract_addresses_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
Expand All @@ -54,7 +54,7 @@ def _export(self):

def _export_contracts(self, contract_addresses):
contracts_code_rpc = list(generate_get_code_json_rpc(contract_addresses))
response_batch= self.ipc_wrapper.make_request(json.dumps(contracts_code_rpc))
response_batch= self.batch_web3_provider.make_request(json.dumps(contracts_code_rpc))

contracts = []
for response in response_batch:
Expand Down
6 changes: 3 additions & 3 deletions ethereumetl/jobs/export_receipts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def __init__(
self,
tx_hashes_iterable,
batch_size,
ipc_wrapper,
batch_web3_provider,
max_workers,
item_exporter,
export_receipts=True,
export_logs=True):
self.ipc_wrapper = ipc_wrapper
self.batch_web3_provider = batch_web3_provider
self.tx_hashes_iterable = tx_hashes_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
Expand All @@ -64,7 +64,7 @@ def _export(self):

def _export_receipts(self, tx_hashes):
receipts_rpc = list(generate_get_receipt_json_rpc(tx_hashes))
response = self.ipc_wrapper.make_request(json.dumps(receipts_rpc))
response = self.batch_web3_provider.make_request(json.dumps(receipts_rpc))
results = rpc_response_batch_to_results(response)
receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results]
for receipt in receipts:
Expand Down
20 changes: 20 additions & 0 deletions ethereumetl/providers/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from web3 import HTTPProvider
from web3.utils.request import make_post_request


class BatchHTTPProvider(HTTPProvider):

def make_request(self, text):
self.logger.debug("Making request HTTP. URI: %s, Request: %s",
self.endpoint_uri, text)
request_data = text.encode('utf-8')
raw_response = make_post_request(
self.endpoint_uri,
request_data,
**self.get_request_kwargs()
)
response = self.decode_rpc_response(raw_response)
self.logger.debug("Getting response HTTP. URI: %s, "
"Request: %s, Response: %s",
self.endpoint_uri, text, response)
return response
2 changes: 1 addition & 1 deletion ethereumetl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def rpc_response_batch_to_results(response):
for response_item in response:
result = response_item.get('result', None)
if result is None:
raise ValueError('result is null in response {}'.format(response_item))
raise ValueError('result is None in response {}'.format(response_item))
yield result


Expand Down
13 changes: 13 additions & 0 deletions ethereumetl/web3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

from web3 import IPCProvider, HTTPProvider

from ethereumetl.ipc import BatchIPCProvider
from ethereumetl.providers.rpc import BatchHTTPProvider

DEFAULT_IPC_TIMEOUT = 60


Expand All @@ -36,3 +39,13 @@ def get_provider_from_uri(uri_string):
return HTTPProvider(uri_string)
else:
raise ValueError('Unknown uri scheme {}'.format(uri_string))


def get_batch_provider_from_uri(uri_string):
uri = urlparse(uri_string)
if uri.scheme == 'file':
return BatchIPCProvider(uri.path, timeout=DEFAULT_IPC_TIMEOUT)
elif uri.scheme == 'http' or uri.scheme == 'https':
return BatchHTTPProvider(uri_string)
else:
raise ValueError('Unknown uri scheme {}'.format(uri_string))
9 changes: 5 additions & 4 deletions export_blocks_and_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@

import argparse

from ethereumetl.ipc import IPCWrapper
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.export_blocks_job_item_exporter import export_blocks_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.web3_utils import get_batch_provider_from_uri

parser = argparse.ArgumentParser(description='Export blocks and transactions.')
parser.add_argument('-s', '--start-block', default=0, type=int, help='Start block')
parser.add_argument('-e', '--end-block', required=True, type=int, help='End block')
parser.add_argument('-b', '--batch-size', default=100, type=int, help='The number of blocks to export at a time.')
parser.add_argument('--ipc-path', required=True, type=str, help='The full path to the ipc file.')
parser.add_argument('--ipc-timeout', default=60, type=int, help='The timeout in seconds for ipc calls.')
parser.add_argument('-p', '--provider-uri', required=True, type=str,
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io/')
parser.add_argument('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
parser.add_argument('--blocks-output', default=None, type=str,
help='The output file for blocks. If not provided blocks will not be exported. '
Expand All @@ -48,7 +49,7 @@
start_block=args.start_block,
end_block=args.end_block,
batch_size=args.batch_size,
ipc_wrapper=ThreadLocalProxy(lambda: IPCWrapper(args.ipc_path, timeout=args.ipc_timeout)),
batch_web3_provider=ThreadLocalProxy(lambda: get_batch_provider_from_uri(args.provider_uri)),
max_workers=args.max_workers,
item_exporter=export_blocks_job_item_exporter(args.blocks_output, args.transactions_output),
export_blocks=args.blocks_output is not None,
Expand Down
9 changes: 5 additions & 4 deletions export_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import argparse

from ethereumetl.file_utils import smart_open
from ethereumetl.ipc import IPCWrapper
from ethereumetl.jobs.export_contracts_job import ExportContractsJob
from ethereumetl.jobs.export_contracts_job_item_exporter import export_contracts_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.web3_utils import get_batch_provider_from_uri

parser = argparse.ArgumentParser(
description='Exports contracts bytecode using eth_getCode JSON RPC APIs.')
Expand All @@ -36,8 +36,9 @@
help='The file containing contract addresses, one per line.')
parser.add_argument('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
parser.add_argument('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
parser.add_argument('--ipc-path', required=True, type=str, help='The full path to the ipc socket file.')
parser.add_argument('--ipc-timeout', default=60, type=int, help='The timeout in seconds for ipc calls.')
parser.add_argument('-p', '--provider-uri', required=True, type=str,
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io/')

args = parser.parse_args()

Expand All @@ -47,7 +48,7 @@
job = ExportContractsJob(
contract_addresses_iterable=contract_addresses,
batch_size=args.batch_size,
ipc_wrapper=ThreadLocalProxy(lambda: IPCWrapper(args.ipc_path, timeout=args.ipc_timeout)),
batch_web3_provider=ThreadLocalProxy(lambda: get_batch_provider_from_uri(args.provider_uri)),
item_exporter=export_contracts_job_item_exporter(args.output),
max_workers=args.max_workers)

Expand Down
10 changes: 6 additions & 4 deletions export_erc20_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,28 @@

import argparse

from web3 import IPCProvider, Web3
from web3 import Web3

from ethereumetl.file_utils import smart_open
from ethereumetl.jobs.export_erc20_tokens_job import ExportErc20TokensJob
from ethereumetl.jobs.export_erc20_tokens_job_item_exporter import export_erc20_tokens_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.web3_utils import get_provider_from_uri

parser = argparse.ArgumentParser(description='Exports ERC20 tokens.')
parser.add_argument('-t', '--token-addresses', type=str, help='The file containing token addresses, one per line.')
parser.add_argument('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
parser.add_argument('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
parser.add_argument('--ipc-path', required=True, type=str, help='The full path to the ipc socket file.')
parser.add_argument('--ipc-timeout', default=60, type=int, help='The timeout in seconds for ipc calls.')
parser.add_argument('-p', '--provider-uri', default=None, type=str,
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io/')

args = parser.parse_args()

with smart_open(args.token_addresses, 'r') as token_addresses_file:
job = ExportErc20TokensJob(
token_addresses_iterable=(token_address.strip() for token_address in token_addresses_file),
web3=ThreadLocalProxy(lambda: Web3(IPCProvider(args.ipc_path, timeout=args.ipc_timeout))),
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(args.provider_uri))),
item_exporter=export_erc20_tokens_job_item_exporter(args.output),
max_workers=args.max_workers)

Expand Down
10 changes: 6 additions & 4 deletions export_erc20_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@

import argparse

from web3 import IPCProvider, Web3
from web3 import Web3

from ethereumetl.jobs.export_erc20_transfers_job import ExportErc20TransfersJob
from ethereumetl.jobs.export_erc20_transfers_job_item_exporter import export_erc20_transfers_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.web3_utils import get_provider_from_uri

parser = argparse.ArgumentParser(
description='Exports ERC20 transfers using eth_newFilter and eth_getFilterLogs JSON RPC APIs.')
Expand All @@ -36,8 +37,9 @@
parser.add_argument('-b', '--batch-size', default=100, type=int, help='The number of blocks to filter at a time.')
parser.add_argument('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
parser.add_argument('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
parser.add_argument('--ipc-path', required=True, type=str, help='The full path to the ipc socket file.')
parser.add_argument('--ipc-timeout', default=60, type=int, help='The timeout in seconds for ipc calls.')
parser.add_argument('-p', '--provider-uri', default=None, type=str,
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc')
parser.add_argument('-t', '--tokens', default=None, type=str, nargs='+',
help='The list of token addresses to filter by.')

Expand All @@ -47,7 +49,7 @@
start_block=args.start_block,
end_block=args.end_block,
batch_size=args.batch_size,
web3=ThreadLocalProxy(lambda: Web3(IPCProvider(args.ipc_path, timeout=args.ipc_timeout))),
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(args.provider_uri))),
item_exporter=export_erc20_transfers_job_item_exporter(args.output),
max_workers=args.max_workers,
tokens=args.tokens)
Expand Down
9 changes: 5 additions & 4 deletions export_receipts_and_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import argparse

from ethereumetl.file_utils import smart_open
from ethereumetl.ipc import IPCWrapper
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
from ethereumetl.jobs.export_receipts_job_item_exporter import export_receipts_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.web3_utils import get_batch_provider_from_uri

parser = argparse.ArgumentParser(description='Export receipts and logs.')
parser.add_argument('-b', '--batch-size', default=100, type=int, help='The number of receipts to export at a time.')
parser.add_argument('-t', '--tx-hashes', type=str, help='The file containing transaction hashes, one per line.')
parser.add_argument('--ipc-path', required=True, type=str, help='The full path to the ipc file.')
parser.add_argument('--ipc-timeout', default=60, type=int, help='The timeout in seconds for ipc calls.')
parser.add_argument('-p', '--provider-uri', required=True, type=str,
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io/')
parser.add_argument('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
parser.add_argument('--receipts-output', default=None, type=str,
help='The output file for receipts. If not provided receipts will not be exported. '
Expand All @@ -48,7 +49,7 @@
job = ExportReceiptsJob(
tx_hashes_iterable=(tx_hash.strip() for tx_hash in tx_hashes_file),
batch_size=args.batch_size,
ipc_wrapper=ThreadLocalProxy(lambda: IPCWrapper(args.ipc_path, timeout=args.ipc_timeout)),
batch_web3_provider=ThreadLocalProxy(lambda: get_batch_provider_from_uri(args.provider_uri)),
max_workers=args.max_workers,
item_exporter=export_receipts_job_item_exporter(args.receipts_output, args.logs_output),
export_receipts=args.receipts_output is not None,
Expand Down
3 changes: 2 additions & 1 deletion get_block_range_for_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

parser = argparse.ArgumentParser(description='Outputs the start block and end block for a given date.')
parser.add_argument('-p', '--provider-uri', default=None, type=str,
help='The URI of the web3 provider e.g. file://$HOME/Library/Ethereum/geth.ipc. or https://mainnet.infura.io/')
help='The URI of the web3 provider e.g. '
'file:///$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io/')
parser.add_argument('-d', '--date', required=True, type=lambda d: datetime.strptime(d, '%Y-%m-%d'),
help='The date e.g. 2018-01-01.')
parser.add_argument('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ethereumetl.utils import hex_to_dec


class MockIPCWrapper(object):
class MockBatchWeb3Provider(object):
def __init__(self, read_resource):
self.read_resource = read_resource

Expand Down
4 changes: 2 additions & 2 deletions tests/ethereumetl/job/test_export_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.export_blocks_job_item_exporter import export_blocks_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from tests.ethereumetl.job.mock_ipc_wrapper import MockIPCWrapper
from tests.ethereumetl.job.mock_batch_web3_provider import MockBatchWeb3Provider
from tests.helpers import compare_lines_ignore_order, read_file

RESOURCE_GROUP = 'test_export_blocks_job'
Expand All @@ -49,7 +49,7 @@ def test_export_blocks_job(tmpdir, start_block, end_block, batch_size, resource_

job = ExportBlocksJob(
start_block=start_block, end_block=end_block, batch_size=batch_size,
ipc_wrapper=ThreadLocalProxy(lambda: MockIPCWrapper(lambda file: read_resource(resource_group, file))),
batch_web3_provider=ThreadLocalProxy(lambda: MockBatchWeb3Provider(lambda file: read_resource(resource_group, file))),
max_workers=5,
item_exporter=export_blocks_job_item_exporter(blocks_output_file, transactions_output_file),
export_blocks=blocks_output_file is not None,
Expand Down
4 changes: 2 additions & 2 deletions tests/ethereumetl/job/test_export_receipts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
from ethereumetl.jobs.export_receipts_job_item_exporter import export_receipts_job_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from tests.ethereumetl.job.mock_ipc_wrapper import MockIPCWrapper
from tests.ethereumetl.job.mock_batch_web3_provider import MockBatchWeb3Provider
from tests.helpers import compare_lines_ignore_order, read_file

RESOURCE_GROUP = 'test_export_receipts_job'
Expand All @@ -54,7 +54,7 @@ def test_export_receipts_job(tmpdir, batch_size, tx_hashes, resource_group):
job = ExportReceiptsJob(
tx_hashes_iterable=tx_hashes,
batch_size=batch_size,
ipc_wrapper=ThreadLocalProxy(lambda: MockIPCWrapper(lambda file: read_resource(resource_group, file))),
batch_web3_provider=ThreadLocalProxy(lambda: MockBatchWeb3Provider(lambda file: read_resource(resource_group, file))),
max_workers=5,
item_exporter=export_receipts_job_item_exporter(receipts_output_file, logs_output_file),
export_receipts=receipts_output_file is not None,
Expand Down

0 comments on commit 0233207

Please sign in to comment.