Skip to content

Commit

Permalink
Merge pull request blockchain-etl#160 from blockchain-etl/feature/str…
Browse files Browse the repository at this point in the history
…eaming

Feature/streaming
  • Loading branch information
medvedev1088 authored Apr 15, 2019
2 parents 66971c8 + 5c941a4 commit e695c55
Show file tree
Hide file tree
Showing 153 changed files with 2,434 additions and 170 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.*
last_synced_block.txt
output
pid.txt
output
15 changes: 15 additions & 0 deletions Dockerfile_with_streaming
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.6
MAINTAINER Evgeny Medvedev <[email protected]>
ENV PROJECT_DIR=ethereum-etl

RUN mkdir /$PROJECT_DIR
WORKDIR /$PROJECT_DIR
COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]

# Add Tini
ENV TINI_VERSION v0.18.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini

ENTRYPOINT ["/tini", "--", "python", "ethereumetl"]
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ Export traces ([Schema](#tracescsv), [Reference](#export_traces)):
--provider-uri file://$HOME/Library/Ethereum/parity.ipc --output traces.csv
```

Stream blocks, transactions, logs, token_transfers continually to console:

```bash
> pip3 install ethereum-etl[streaming]
> ethereumetl stream --start-block 500000 -e block,transaction,log,token_transfer --log-file log.txt
```

Stream blockchain data continually to Google Pub/Sub:

```bash
> export GOOGLE_APPLICATION_CREDENTIALS=/path_to_credentials_file.json
> ethereumetl stream --start-block 500000 --output projects/<your-project>/topics/crypto_ethereum
```

For the latest version, check out the repo and call
```bash
> pip3 install -e .
Expand Down Expand Up @@ -165,6 +179,7 @@ bytecode | hex_string |
function_sighashes | string |
is_erc20 | boolean |
is_erc721 | boolean |
block_number | bigint |

### tokens.csv

Expand Down Expand Up @@ -196,6 +211,7 @@ gas_used | bigint |
subtraces | bigint |
trace_address | string |
error | string |
status | bigint |

You can find column descriptions in [https://github.com/medvedev1088/ethereum-etl-airflow](https://github.com/medvedev1088/ethereum-etl-airflow/tree/master/dags/resources/stages/raw/schemas)

Expand Down Expand Up @@ -289,6 +305,15 @@ Read this article for details https://medium.com/@medvedev1088/how-to-export-the
> docker run -v $HOME/output:/ethereum-etl/output ethereum-etl:latest export_all -s 2018-01-01 -e 2018-01-01 -p https://mainnet.infura.io
```

1. Run streaming to console or Pub/Sub
```bash
> docker build -t ethereum-etl:latest-streaming -f Dockerfile_with_streaming .
> echo "Stream to console"
> docker run ethereum-etl:latest-streaming stream --start-block 500000 --log-file log.txt
> echo "Stream to Pub/Sub"
> docker run -v /path_to_credentials_file/:/ethereum-etl/ --env GOOGLE_APPLICATION_CREDENTIALS=/ethereum-etl/credentials_file.json ethereum-etl:latest-streaming stream --start-block 500000 --output projects/<your-project>/topics/crypto_ethereum
```

### Command Reference

- [export_blocks_and_transactions](#export_blocks_and_transactions)
Expand All @@ -302,6 +327,7 @@ Read this article for details https://medium.com/@medvedev1088/how-to-export-the
- [extract_geth_traces](#extract_geth_traces)
- [get_block_range_for_date](#get_block_range_for_date)
- [get_keccak_hash](#get_keccak_hash)
- [stream](#stream)

All the commands accept `-h` parameter for help, e.g.:

Expand Down Expand Up @@ -486,10 +512,31 @@ You can tune `--batch-size`, `--max-workers` for performance.
0xa9059cbb2ab09eb219583f4a59a5d0623ade346d962bcd4e46b11da047c9049b
```

#### stream

```bash
> pip3 install ethereum-etl[streaming]
> ethereumetl stream --provider-uri https://mainnet.infura.io --start-block 500000
```

- This command outputs blocks, transactions, logs, token_transfers to the console by default.
- Entity types can be specified with the `-e` option, e.g. `-e block,transaction,log,token_transfer,trace,contract,token`
- Use `--output` option to specify the Google Pub/Sub topic where to publish blockchain data,
e.g. `projects/<your-project>/topics/bitcoin_blockchain`. Data will be pushed to
`projects/<your-project>/topics/bitcoin_blockchain.blocks`, `projects/<your-project>/topics/bitcoin_blockchain.transactions`
etc. topics.
- The command saves its state to `last_synced_block.txt` file where the last synced block number is saved periodically.
- Specify either `--start-block` or `--last-synced-block-file` option. `--last-synced-block-file` should point to the
file where the block number, from which to start streaming the blockchain data, is saved.
- Use the `--lag` option to specify how many blocks to lag behind the head of the blockchain. It's the simplest way to
handle chain reorganizations - they are less likely the further a block from the head.
- Use the `--chain` option to specify the type of the chain, e.g. `bitcoin`, `litecoin`, `dash`, `zcash`, etc.
- You can tune `--period-seconds`, `--batch-size`, `--max-workers` for performance.

### Running Tests

```bash
> pip3 install -e .[dev]
> pip3 install -e .[dev,streaming]
> export ETHEREUM_ETL_RUN_SLOW_TESTS=True
> pytest -vv
```
Expand Down
Empty file added blockchainetl/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions blockchainetl/atomic_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import itertools


# https://stackoverflow.com/a/27062830/1580227
class AtomicCounter:
def __init__(self):
self._counter = itertools.count()
# init to 0
next(self._counter)

def increment(self, increment=1):
assert increment > 0
return [next(self._counter) for _ in range(0, increment)][-1]
42 changes: 42 additions & 0 deletions blockchainetl/csv_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


# https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072

import sys
import csv


def set_max_field_size_limit():
max_int = sys.maxsize
decrement = True
while decrement:
# decrease the maxInt value by factor 10
# as long as the OverflowError occurs.

decrement = False
try:
csv.field_size_limit(max_int)
except OverflowError:
max_int = int(max_int / 10)
decrement = True
Loading

0 comments on commit e695c55

Please sign in to comment.