Skip to content

Commit

Permalink
allow to prefix kafka topics directly in the url
Browse files Browse the repository at this point in the history
  • Loading branch information
FeSens committed Jul 12, 2022
1 parent e94f8cd commit 091457c
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.topic_prefix = self.get_topic_prefix(output)
print(self.connection_url, self.topic_prefix)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)

def get_connection_url(self, output):
try:
return output.split('/')[1]
except KeyError:
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/<topic-prefix>"')

def get_topic_prefix(self, output):
try:
return output.split('/')[2] + "."
except KeyError:
return ''

def open(self):
pass
Expand All @@ -34,7 +41,7 @@ def export_item(self, item):
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
print(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], value=data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

Expand Down

0 comments on commit 091457c

Please sign in to comment.