Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
medvedev1088 committed Jun 5, 2019
1 parent ca8cd55 commit 2ab3b7e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 9 deletions.
59 changes: 51 additions & 8 deletions ethereumetl/executors/batch_work_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
import time

from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects
from web3.utils.threads import Timeout as Web3Timeout
Expand All @@ -32,37 +34,78 @@
RETRY_EXCEPTIONS = (ConnectionError, HTTPError, RequestsTimeout, TooManyRedirects, Web3Timeout, OSError,
RetriableValueError)

TWO_MINUTES = 2 * 60


# Executes the given work in batches, reducing the batch size exponentially in case of errors.
class BatchWorkExecutor:
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS):
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5):
self.batch_size = starting_batch_size
self.max_batch_size = starting_batch_size
self.latest_batch_size_change_time = None
self.max_workers = max_workers
# Using bounded executor prevents unlimited queue growth
# and allows monitoring in-progress futures and failing fast in case of errors.
self.executor = FailSafeExecutor(BoundedExecutor(1, self.max_workers))
self.retry_exceptions = retry_exceptions
self.max_retries = max_retries
self.progress_logger = ProgressLogger()
self.logger = logging.getLogger('BatchWorkExecutor')

def execute(self, work_iterable, work_handler, total_items=None):
self.progress_logger.start(total_items=total_items)
for batch in dynamic_batch_iterator(work_iterable, lambda: self.batch_size):
self.executor.submit(self._fail_safe_execute, work_handler, batch)

# Check race conditions
def _fail_safe_execute(self, work_handler, batch):
try:
work_handler(batch)
self._try_increase_batch_size(len(batch))
except self.retry_exceptions:
batch_size = self.batch_size
# Reduce the batch size. Subsequent batches will be 2 times smaller
if batch_size == len(batch) and batch_size > 1:
self.batch_size = int(batch_size / 2)
# For the failed batch try handling items one by one
self.logger.exception('An exception occurred while executing work_handler.')
self._try_decrease_batch_size(len(batch))
self.logger.info('The batch of size {} will be retried one item at a time.'.format(len(batch)))
for item in batch:
work_handler([item])
execute_with_retries(work_handler, [item],
max_retries=self.max_retries, retry_exceptions=self.retry_exceptions)

self.progress_logger.track(len(batch))

# Some acceptable race conditions are possible
def _try_decrease_batch_size(self, current_batch_size):
batch_size = self.batch_size
if batch_size == current_batch_size and batch_size > 1:
new_batch_size = int(current_batch_size / 2)
self.logger.info('Reducing batch size to {}.'.format(new_batch_size))
self.batch_size = new_batch_size
self.latest_batch_size_change_time = time.time()

def _try_increase_batch_size(self, current_batch_size):
if current_batch_size * 2 <= self.max_batch_size:
current_time = time.time()
latest_batch_size_change_time = self.latest_batch_size_change_time
seconds_since_last_change = current_time - latest_batch_size_change_time \
if latest_batch_size_change_time is not None else 0
if seconds_since_last_change > TWO_MINUTES:
new_batch_size = current_batch_size * 2
self.logger.info('Increasing batch size to {}.'.format(new_batch_size))
self.batch_size = new_batch_size
self.latest_batch_size_change_time = current_time

def shutdown(self):
self.executor.shutdown()
self.progress_logger.finish()


def execute_with_retries(func, *args, max_retries=5, retry_exceptions=RETRY_EXCEPTIONS, sleep_seconds=1):
for i in range(max_retries):
try:
return func(*args)
except retry_exceptions:
logging.exception('An exception occurred while executing execute_with_retries. Retry #{}'.format(i))
if i < max_retries - 1:
logging.info('The request will be retried after {} seconds. Retry #{}'.format(sleep_seconds, i))
time.sleep(sleep_seconds)
continue
else:
raise
3 changes: 3 additions & 0 deletions ethereumetl/service/graph_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ def __init__(self, x, y):

def __str__(self):
return '({},{})'.format(self.x, self.y)

def __repr__(self):
return 'Point({},{})'.format(self.x, self.y)
16 changes: 15 additions & 1 deletion ethereumetl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,26 @@ def rpc_response_to_result(response):
# When nodes are behind a load balancer it makes sense to retry the request in hopes it will go to other,
# synced node
raise RetriableValueError(error_message)
elif response.get('error') is not None and response.get('error').get('code') == -32603:
elif response.get('error') is not None and is_retriable_error(response.get('error').get('code')):
raise RetriableValueError(error_message)
raise ValueError(error_message)
return result


def is_retriable_error(error_code):
if error_code is None:
return False

if not isinstance(error_code, int):
return False

# https://www.jsonrpc.org/specification#error_object
if error_code == -32603 or (-32000 >= error_code >= -32099):
return True

return False


def split_to_batches(start_incl, end_incl, batch_size):
"""start_incl and end_incl are inclusive, the returned batch ranges are also inclusive"""
for batch_start in range(start_incl, end_incl + 1, batch_size):
Expand Down

0 comments on commit 2ab3b7e

Please sign in to comment.