Skip to content

Commit

Permalink
Merge pull request blockchain-etl#302 from blockjoe/develop
Browse files Browse the repository at this point in the history
Fallback to `web3.eth.getLogs` when calling to nodes without `eth_newFilter`
  • Loading branch information
medvedev1088 authored Oct 14, 2022
2 parents b1acfa3 + db59018 commit f7f1925
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
20 changes: 16 additions & 4 deletions ethereumetl/jobs/export_origin_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
self.receipt_log_mapper = EthReceiptLogMapper()
self.marketplace_listing_mapper = OriginMarketplaceListingMapper()
self.shop_listing_mapper = OriginShopProductMapper()
self._supports_eth_newFilter = True


def _start(self):
Expand Down Expand Up @@ -100,8 +101,18 @@ def _export_batch(self, block_number_batch):
'fromBlock': batch['from_block'],
'toBlock': batch['to_block']
}
event_filter = self.web3.eth.filter(filter_params)
events = event_filter.get_all_entries()
if self._supports_eth_newFilter:
try:
event_filter = self.web3.eth.filter(filter_params)
events = event_filter.get_all_entries()
except ValueError as e:
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
self._supports_eth_newFilter = False
events = self.web3.eth.getLogs(filter_params)
else:
raise(e)
else:
events = self.web3.eth.getLogs(filter_params)
for event in events:
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
listing, shop_products = self.event_extractor.extract_event_from_log(log, batch['contract_version'])
Expand All @@ -112,9 +123,10 @@ def _export_batch(self, block_number_batch):
item = self.shop_listing_mapper.product_to_dict(product)
self.shop_product_exporter.export_item(item)

self.web3.eth.uninstallFilter(event_filter.filter_id)
if self._supports_eth_newFilter:
self.web3.eth.uninstallFilter(event_filter.filter_id)

def _end(self):
self.batch_work_executor.shutdown()
self.marketplace_listing_exporter.close()
self.shop_product_exporter.close()
self.shop_product_exporter.close()
15 changes: 12 additions & 3 deletions ethereumetl/jobs/export_token_transfers_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
self.receipt_log_mapper = EthReceiptLogMapper()
self.token_transfer_mapper = EthTokenTransferMapper()
self.token_transfer_extractor = EthTokenTransferExtractor()
self._supports_eth_newFilter = True

def _start(self):
self.item_exporter.open()
Expand All @@ -74,15 +75,23 @@ def _export_batch(self, block_number_batch):
if self.tokens is not None and len(self.tokens) > 0:
filter_params['address'] = self.tokens

event_filter = self.web3.eth.filter(filter_params)
events = event_filter.get_all_entries()
try:
event_filter = self.web3.eth.filter(filter_params)
events = event_filter.get_all_entries()
except ValueError as e:
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
self._supports_eth_newFilter = False
events = self.web3.eth.getLogs(filter_params)
else:
raise(e)
for event in events:
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
token_transfer = self.token_transfer_extractor.extract_transfer_from_log(log)
if token_transfer is not None:
self.item_exporter.export_item(self.token_transfer_mapper.token_transfer_to_dict(token_transfer))

self.web3.eth.uninstallFilter(event_filter.filter_id)
if self._supports_eth_newFilter:
self.web3.eth.uninstallFilter(event_filter.filter_id)

def _end(self):
self.batch_work_executor.shutdown()
Expand Down

0 comments on commit f7f1925

Please sign in to comment.