Skip to content

Commit

Permalink
Add pid file to streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
medvedev1088 committed Apr 5, 2019
1 parent 7214d77 commit c9fa2a1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 33 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.*
last_synced_block.txt
pid.txt
output
7 changes: 6 additions & 1 deletion Dockerfile_with_streaming
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ WORKDIR /$PROJECT_DIR
COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]

ENTRYPOINT ["python", "ethereumetl"]
# Add Tini
ENV TINI_VERSION v0.18.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini

ENTRYPOINT ["/tini", "--", "python", "ethereumetl"]
9 changes: 9 additions & 0 deletions blockchainetl/logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import logging


def logging_basic_config(filename=None):
format = '%(asctime)s - %(name)s [%(levelname)s] - %(message)s'
if filename is not None:
logging.basicConfig(level=logging.INFO, format=format, filename=filename)
else:
logging.basicConfig(level=logging.INFO, format=format)
64 changes: 42 additions & 22 deletions blockchainetl/streaming/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def __init__(
end_block=None,
period_seconds=10,
block_batch_size=10,
retry_errors=True):
retry_errors=True,
pid_file=None):
self.blockchain_streamer_adapter = blockchain_streamer_adapter
self.last_synced_block_file = last_synced_block_file
self.lag = lag
Expand All @@ -48,50 +49,58 @@ def __init__(
self.period_seconds = period_seconds
self.block_batch_size = block_batch_size
self.retry_errors = retry_errors
self.pid_file = pid_file

def stream(self):
if self.start_block is not None or not os.path.isfile(self.last_synced_block_file):
init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file)

last_synced_block = read_last_synced_block(self.last_synced_block_file)

self.blockchain_streamer_adapter.open()
self.last_synced_block = read_last_synced_block(self.last_synced_block_file)

while True and (self.end_block is None or last_synced_block < self.end_block):
new_last_synced_block = last_synced_block
def stream(self):
try:
if self.pid_file is not None:
logging.info('Creating pid file {}'.format(self.pid_file))
write_to_file(self.pid_file, str(os.getpid()))
self.blockchain_streamer_adapter.open()
self._do_stream()
finally:
self.blockchain_streamer_adapter.close()
if self.pid_file is not None:
logging.info('Deleting pid file {}'.format(self.pid_file))
delete_file(self.pid_file)

def _do_stream(self):
while True and (self.end_block is None or self.last_synced_block < self.end_block):
synced_blocks = 0

try:
new_last_synced_block = self._sync_cycle(last_synced_block)
synced_blocks = self._sync_cycle()
except Exception as e:
# https://stackoverflow.com/a/4992124/1580227
logging.exception('An exception occurred while fetching block data.')
if not self.retry_errors:
raise e

synced_blocks = new_last_synced_block - last_synced_block
last_synced_block = new_last_synced_block
if synced_blocks <= 0:
logging.info('Nothing to sync or exception. Sleeping for {} seconds...'.format(self.period_seconds))
logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))
time.sleep(self.period_seconds)

self.blockchain_streamer_adapter.close()

def _sync_cycle(self, last_synced_block):
def _sync_cycle(self):
current_block = self.blockchain_streamer_adapter.get_current_block_number()

target_block = self._calculate_target_block(current_block, last_synced_block)
blocks_to_sync = max(target_block - last_synced_block, 0)
target_block = self._calculate_target_block(current_block, self.last_synced_block)
blocks_to_sync = max(target_block - self.last_synced_block, 0)

logging.info('Current block {}, target block {}, last synced block {}, blocks to sync {}'.format(
current_block, target_block, last_synced_block, blocks_to_sync))
current_block, target_block, self.last_synced_block, blocks_to_sync))

if blocks_to_sync != 0:
self.blockchain_streamer_adapter.export_all(last_synced_block + 1, target_block)
self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)
logging.info('Writing last synced block {}'.format(target_block))
write_last_synced_block(self.last_synced_block_file, target_block)
last_synced_block = target_block
self.last_synced_block = target_block

return last_synced_block
return blocks_to_sync

def _calculate_target_block(self, current_block, last_synced_block):
target_block = current_block - self.lag
Expand All @@ -100,9 +109,15 @@ def _calculate_target_block(self, current_block, last_synced_block):
return target_block


def delete_file(file):
try:
os.remove(file)
except OSError:
pass


def write_last_synced_block(file, last_synced_block):
with smart_open(file, 'w') as last_synced_block_file:
return last_synced_block_file.write(str(last_synced_block) + '\n')
write_to_file(file, str(last_synced_block) + '\n')


def init_last_synced_block_file(start_block, last_synced_block_file):
Expand All @@ -117,3 +132,8 @@ def init_last_synced_block_file(start_block, last_synced_block_file):
def read_last_synced_block(file):
with smart_open(file, 'r') as last_synced_block_file:
return int(last_synced_block_file.read())


def write_to_file(file, content):
with smart_open(file, 'w') as file_handle:
file_handle.write(content)
19 changes: 19 additions & 0 deletions blockchainetl/streaming/streaming_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import logging
import signal
import sys

from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.logging_utils import logging_basic_config


def get_item_exporter(output):
Expand All @@ -17,3 +22,17 @@ def get_item_exporter(output):
item_exporter = ConsoleItemExporter()

return item_exporter


def configure_signals():
def sigterm_handler(_signo, _stack_frame):
# Raises SystemExit(0):
sys.exit(0)

signal.signal(signal.SIGTERM, sigterm_handler)


def configure_logging(filename):
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging_basic_config(filename=filename)
15 changes: 5 additions & 10 deletions ethereumetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging

import click
from blockchainetl.streaming.streaming_utils import configure_signals, configure_logging
from ethereumetl.enumeration.entity_type import EntityType

from ethereumetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy

Expand All @@ -47,10 +45,12 @@
@click.option('-B', '--block-batch-size', default=1, type=int, help='How many blocks to batch in single sync round')
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers')
@click.option('--log-file', default=None, type=str, help='Log file')
@click.option('--pid-file', default=None, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None):
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
entity_types = parse_entity_types(entity_types)

from blockchainetl.streaming.streaming_utils import get_item_exporter
Expand All @@ -71,16 +71,11 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
start_block=start_block,
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file
)
streamer.stream()


def configure_logging(filename):
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging_basic_config(filename=filename)


def parse_entity_types(entity_types):
entity_types = [c.strip() for c in entity_types.split(',')]

Expand Down

0 comments on commit c9fa2a1

Please sign in to comment.