Skip to content

Commit

Permalink
Merge pull request blockchain-etl#359 from CoinStatsHQ/pr/aws-kinesis…
Browse files Browse the repository at this point in the history
…-support

Added support for AWS Kinesis
  • Loading branch information
medvedev1088 authored Oct 14, 2022
2 parents 956695b + f4403a7 commit 47308f4
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
82 changes: 82 additions & 0 deletions blockchainetl/jobs/exporters/kinesis_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# MIT License
#
# Copyright (c) 2022 CoinStats LLC
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import typing as t
import uuid
from itertools import zip_longest

import boto3

_KINESIS_BATCH_LIMIT = 500


def _uuid_partition_key(_: dict) -> str:
return uuid.uuid4().hex


class KinesisItemExporter:
def __init__(
self,
stream_name: str,
partition_key_callable: t.Callable[[dict], str] = _uuid_partition_key,
):
import boto3
self._stream_name = stream_name
self._partition_key_callable = partition_key_callable
self._kinesis_client = None # initialized in .open

def open(self) -> None:
self._kinesis_client = boto3.client('kinesis')

def export_items(self, items: t.Iterable[dict]) -> None:
sentinel = object()
chunks = zip_longest(
*(iter(items),) * _KINESIS_BATCH_LIMIT,
fillvalue=sentinel,
)
for chunk in chunks:
self._kinesis_client.put_records(
StreamName=self._stream_name,
Records=[
{
'Data': _serialize_item(item),
'PartitionKey': self._partition_key_callable(item),
}
for item in chunk
if item is not sentinel
],
)

def export_item(self, item: dict) -> None:
self._kinesis_client.put_record(
StreamName=self._stream_name,
Data=_serialize_item(item),
PartitionKey=self._partition_key_callable(item),
)

def close(self):
pass


def _serialize_item(item: dict) -> bytes:
return json.dumps(item).encode()
1 change: 1 addition & 0 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'or Postgres connection url e.g. postgresql+pg8000://postgres:[email protected]:5432/ethereum; '
'or GCS bucket e.g. gs://your-bucket-name; '
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
'or Kinesis, e.g. kinesis://your-data-stream-name'
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
Expand Down
8 changes: 8 additions & 0 deletions ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def create_item_exporter(output):
batch_max_latency=2,
batch_max_messages=1000,
enable_message_ordering=enable_message_ordering)
elif item_exporter_type == ItemExporterType.KINESIS:
from blockchainetl.jobs.exporters.kinesis_item_exporter import KinesisItemExporter
item_exporter = KinesisItemExporter(
stream_name=output[len('kinesis://'):],
)
elif item_exporter_type == ItemExporterType.POSTGRES:
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
Expand Down Expand Up @@ -109,6 +114,8 @@ def get_bucket_and_path_from_gcs_output(output):
def determine_item_exporter_type(output):
if output is not None and output.startswith('projects'):
return ItemExporterType.PUBSUB
if output is not None and output.startswith('kinesis://'):
return ItemExporterType.KINESIS
if output is not None and output.startswith('kafka'):
return ItemExporterType.KAFKA
elif output is not None and output.startswith('postgresql'):
Expand All @@ -123,6 +130,7 @@ def determine_item_exporter_type(output):

class ItemExporterType:
PUBSUB = 'pubsub'
KINESIS = 'kinesis'
POSTGRES = 'postgres'
GCS = 'gcs'
CONSOLE = 'console'
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def read(fname):
# Later versions break the build in Travis CI for Python 3.7.2
'grpcio==1.46.3'
],
'streaming-kinesis': [
'boto3==1.24.11',
'botocore==1.27.11',
],
'dev': [
'pytest~=4.3.0'
]
Expand Down

0 comments on commit 47308f4

Please sign in to comment.