Skip to content

Commit

Permalink
Added exporter for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush3298 committed Dec 17, 2021
1 parent e0ca8f9 commit de4380f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
46 changes: 46 additions & 0 deletions blockchainetl/jobs/exporters/kafka_expoerter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import collections
import json
import logging

from kafka import KafkaProducer

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter


class KafkaItemExporter:

def __init__(self, connection_url, item_type_to_topic_mapping, converters=()):
self.connection_url = connection_url
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.producer = KafkaProducer(bootstrap_servers=connection_url)

def open(self):
pass

def export_items(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

def convert_items(self, items):
for item in items:
yield self.converter.convert_item(item)

def close(self):
pass


def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
result[item.get('type')].append(item)

return result
6 changes: 4 additions & 2 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers')
@click.option('--log-file', default=None, show_default=True, type=str, help='Log file')
@click.option('--pid-file', default=None, show_default=True, type=str, help='pid file')
@click.option('--connection-url', default=None, show_default=True, type=str, help='Connection url for file, required for kafka')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None,
connection_url=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
Expand All @@ -67,7 +69,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit

streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporter(output),
item_exporter=create_item_exporter(output, connection_url),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
Expand Down
17 changes: 16 additions & 1 deletion ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter


def create_item_exporter(output):
def create_item_exporter(output, connection_url):
item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.PUBSUB:
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
Expand Down Expand Up @@ -56,6 +56,18 @@ def create_item_exporter(output):
ListFieldItemConverter('topics', 'topic', fill=4)])
elif item_exporter_type == ItemExporterType.CONSOLE:
item_exporter = ConsoleItemExporter()
elif item_exporter_type == ItemExporterType.KAFKA:
from blockchainetl.jobs.exporters.kafka_expoerter import KafkaItemExporter
item_exporter = KafkaItemExporter(connection_url, item_type_to_topic_mapping={
'block': 'blocks',
'transaction': 'transactions',
'log': 'logs',
'token_transfer': 'token_transfers',
'trace': 'traces',
'contract': 'contracts',
'token': 'tokens',
})

else:
raise ValueError('Unable to determine item exporter type for output ' + output)

Expand All @@ -65,6 +77,8 @@ def create_item_exporter(output):
def determine_item_exporter_type(output):
if output is not None and output.startswith('projects'):
return ItemExporterType.PUBSUB
if output is not None and output.startswith('kafka'):
return ItemExporterType.KAFKA
elif output is not None and output.startswith('postgresql'):
return ItemExporterType.POSTGRES
elif output is None or output == 'console':
Expand All @@ -77,4 +91,5 @@ class ItemExporterType:
PUBSUB = 'pubsub'
POSTGRES = 'postgres'
CONSOLE = 'console'
KAFKA = 'kafka'
UNKNOWN = 'unknown'
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def read(fname):
'ethereum-dasm==0.1.4',
'base58',
'requests',
"kafka-python==2.0.2"
],
extras_require={
'streaming': [
Expand Down

0 comments on commit de4380f

Please sign in to comment.