Skip to content

Commit

Permalink
[lib] Add thread limiting to @handle_klio
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/src/reference/lib/changelog.rst
  • Loading branch information
econchick committed Jan 11, 2021
1 parent 38c6050 commit f117962
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 48 deletions.
2 changes: 1 addition & 1 deletion docs/src/reference/lib/api/transforms/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Decorators

.. currentmodule:: klio.transforms.decorators

.. autodecorator:: handle_klio()
.. autodecorator:: handle_klio(max_thread_count=None, thread_limiter=None)
.. autodecorator:: timeout(seconds, exception=None, exception_message=None)
.. autodecorator:: retry(tries=-1, delay=0, exception=None, raise_exception=None, exception_message=None)
.. autodecorator:: set_klio_context()
Expand Down
1 change: 1 addition & 0 deletions docs/src/reference/lib/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Added
*****

* Add thread limiting context manager utility (See `KEP 2: Thread Management <https://docs.klio.io/en/latest/keps/kep-002.html>`_).
* Add default thread management to ``@handle_klio`` decorator (See `KEP 2: Thread Management <https://docs.klio.io/en/latest/keps/kep-002.html>`_).


0.2.3 (2021-01-04)
Expand Down
3 changes: 3 additions & 0 deletions integration/audio-spectrograms/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
"apache_beam.internal.gcp.auth",
"oauth2client.transport",
"oauth2client.client",
# The concurrency logs may be different for every machine, so let's
# just turn them off
"klio.concurrency",
)
for logger in loggers_to_mute:
logging.getLogger(logger).setLevel(logging.ERROR)
Expand Down
3 changes: 3 additions & 0 deletions integration/batch-modular-default/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"apache_beam.internal.gcp.auth",
"oauth2client.transport",
"oauth2client.client",
# The concurrency logs may be different for every machine, so let's
# just turn them off
"klio.concurrency",
)
for logger in loggers_to_mute:
logging.getLogger(logger).setLevel(logging.ERROR)
Expand Down
3 changes: 3 additions & 0 deletions integration/multi-event-input-batch/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
"apache_beam.internal.gcp.auth",
"oauth2client.transport",
"oauth2client.client",
# The concurrency logs may be different for every machine, so let's
# just turn them off
"klio.concurrency",
)
for logger in loggers_to_mute:
logging.getLogger(logger).setLevel(logging.ERROR)
Expand Down
3 changes: 3 additions & 0 deletions integration/read-file/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
"apache_beam.internal.gcp.auth",
"oauth2client.transport",
"oauth2client.client",
# The concurrency logs may be different for every machine, so let's
# just turn them off
"klio.concurrency",
)
for logger in loggers_to_mute:
logging.getLogger(logger).setLevel(logging.ERROR)
Expand Down
227 changes: 183 additions & 44 deletions lib/src/klio/transforms/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import apache_beam as beam
from apache_beam import pvalue

from klio import utils as kutils
from klio.message import serializer
from klio.transforms import _retry as kretry
from klio.transforms import _timeout as ktimeout
from klio.transforms import _utils
from klio.transforms import _utils as txf_utils
from klio.transforms import core


Expand Down Expand Up @@ -109,6 +110,56 @@ def __get_user_error_message(err, func_path, kmsg):
return msg, exc_info


def __get_thread_limiter(max_thread_count, thread_limiter, func_name=None):
if max_thread_count is not None and thread_limiter is not None:
# raise a runtime error so it actually crashes klio/beam rather than
# just continue processing elements
raise RuntimeError(
"`max_thread_count` and `thread_limiter` are mutually exclusive "
"arguments."
)

if thread_limiter is not None:
if not isinstance(thread_limiter, kutils.ThreadLimiter):
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"'thread_limiter' must be an instance of `klio.utils."
"ThreadLimiter`."
)

if max_thread_count is not None:
is_int_enum = isinstance(max_thread_count, (int, kutils.ThreadLimit))
is_func = callable(max_thread_count)
if not any([is_int_enum, is_func]):
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"Invalid type for handle_klio's argument 'max_thread_count'. "
"Expected an `int`, a callable returning an `int`, or "
"`klio.utils.ThreadLimit`, got `%s`."
% type(max_thread_count).__name__
)

if isinstance(max_thread_count, int) and max_thread_count <= 0:
# raise a runtime error so it actually crashes klio/beam rather
# than just continue processing elements
raise RuntimeError(
"'max_thread_count' must be greater than 0. Set "
"'max_thread_count' to `None` or `klio.utils.ThreadLimiter."
"NONE` to turn off thread limitations."
)
if max_thread_count is None and thread_limiter is None:
max_thread_count = kutils.ThreadLimit.DEFAULT

if thread_limiter is None:
thread_limiter = kutils.ThreadLimiter(
max_thread_count=max_thread_count, name=func_name
)

return thread_limiter


# A separate function from __serialize_klio_message_generator so we can
# specifically `yield from` it (and exhaust transforms that have multiple
# yields)
Expand Down Expand Up @@ -329,42 +380,60 @@ def func_wrapper(*args, **kwargs):
return func_wrapper


def _handle_klio(func_or_meth):
@functools.wraps(func_or_meth)
def method_wrapper(self, *args, **kwargs):
with _klio_context() as ctx:
setattr(self, "_klio", ctx)

# SO. HACKY. We check to see if this method is named "expand"
# to designate if the class is a Composite-type transform
# (rather than a DoFn with a "process" method).
# A Composite transform handles a pcoll / pipeline,
# not the individual elements, and therefore doesn't need
# to be given a KlioMessage. It should only need the KlioContext
# attached.
if func_or_meth.__name__ == "expand":
return func_or_meth(self, *args, **kwargs)
def _handle_klio(*args, max_thread_count=None, thread_limiter=None, **kwargs):
def inner(func_or_meth):
func_name = getattr(
func_or_meth, "__qualname__", func_or_meth.__name__
)
thd_limiter = __get_thread_limiter(
max_thread_count, thread_limiter, func_name
)

wrapper = __serialize_klio_message
# Only the process method of a DoFn is a generator - otherwise
# beam can't pickle a generator
if __is_dofn_process_method(self, func_or_meth):
wrapper = __serialize_klio_message_generator
@functools.wraps(func_or_meth)
def method_wrapper(self, *args, **kwargs):
with thd_limiter:
with _klio_context() as ctx:
setattr(self, "_klio", ctx)

# SO. HACKY. We check to see if this method is named "expand"
# to designate if the class is a Composite-type transform
# (rather than a DoFn with a "process" method).
# A Composite transform handles a pcoll / pipeline,
# not the individual elements, and therefore doesn't need
# to be given a KlioMessage. It should only need the KlioContext
# attached.
if func_or_meth.__name__ == "expand":
return func_or_meth(self, *args, **kwargs)

wrapper = __serialize_klio_message
# Only the process method of a DoFn is a generator - otherwise
# beam can't pickle a generator
if __is_dofn_process_method(self, func_or_meth):
wrapper = __serialize_klio_message_generator

incoming_item = args[0]
args = args[1:]
return wrapper(
self, func_or_meth, incoming_item, *args, **kwargs
)

incoming_item = args[0]
args = args[1:]
return wrapper(self, func_or_meth, incoming_item, *args, **kwargs)
@functools.wraps(func_or_meth)
def func_wrapper(incoming_item, *args, **kwargs):
with thd_limiter:
with _klio_context() as ctx:
return __serialize_klio_message(
ctx, func_or_meth, incoming_item, *args, **kwargs
)

@functools.wraps(func_or_meth)
def func_wrapper(incoming_item, *args, **kwargs):
with _klio_context() as ctx:
return __serialize_klio_message(
ctx, func_or_meth, incoming_item, *args, **kwargs
)
if __is_method(func_or_meth):
return method_wrapper
return func_wrapper

if __is_method(func_or_meth):
return method_wrapper
return func_wrapper
# allows @handle_klio to be used without parens (i.e. no need to do
# `@handle_klio()`) when there are no args/kwargs provided
if args and callable(args[0]):
return inner(args[0])
return inner


def _timeout(seconds=None, exception=None, exception_message=None):
Expand Down Expand Up @@ -489,7 +558,7 @@ def func_wrapper(ctx, kmsg, *args, **kwargs):

# Allow internals to call semiprivate funcs without triggering
# user-facing warnings
@_utils.experimental()
@txf_utils.experimental()
def serialize_klio_message(*args, **kwargs):
"""Serialize/deserialize incoming PCollections as a KlioMessage.
Expand All @@ -502,7 +571,7 @@ def serialize_klio_message(*args, **kwargs):
return _serialize_klio_message(*args, **kwargs)


@_utils.experimental()
@txf_utils.experimental()
def set_klio_context(*args, **kwargs):
"""Set :class:`KlioContext <klio.transforms.core.KlioContext>` to the
class instance.
Expand All @@ -523,7 +592,7 @@ def expand(self, element):
return _set_klio_context(*args, **kwargs)


@_utils.experimental()
@txf_utils.experimental()
def inject_klio_context(*args, **kwargs):
"""Provide :class:`KlioContext <klio.transforms.core.KlioContext>` as the
first argument to a decorated method/func.
Expand All @@ -549,13 +618,26 @@ def process(self, ctx, element):
return _inject_klio_context(*args, **kwargs)


@_utils.experimental()
def handle_klio(*args, **kwargs):
# TODO: Update docstrings w/ new kwargs & examples
@txf_utils.experimental()
def handle_klio(*args, max_thread_count=None, thread_limiter=None, **kwargs):
"""Serialize & deserialize incoming PCollections as a KlioMessage.
Behind the scenes, this generates :class:`KlioContext
<klio.transforms.core.KlioContext>` as well as handles de/serialize the
incoming PCollection as a Klio Message.
<klio.transforms.core.KlioContext>`, handles de/serialize the
incoming PCollection as a Klio Message, as well as manage thread
concurrency.
.. admonition:: Default Thread Concurrency Management
:class: caution
The ``@handle_klio`` decorator will default to limiting the amount
of active threads a decorated transform can use. The default
maximum number of active threads is the number of CPUs on the
worker machine.
See examples below on how to adjust this behavior.
If decorating a class method, the ``KlioContext`` will be attached
to the ``self`` argument of the class instance.
Expand All @@ -581,11 +663,68 @@ class MyComposite(beam.PTransform):
def expand(self, pcoll):
kms_config = self._klio.config.job_config.kms_config
return pcoll | MyKMSTransform(**kms_config)
To adjust the maximum threads a decorated transform uses:
.. code-block:: python
from klio import utils as klio_utils
# Set the limit to 4 threads
@handle_klio(max_thread_count=4):
def my_map_func(ctx, item):
...
# Set the limit to 2x CPU count
import multiprocessing
@handle_klio(max_thread_count=lambda: 2 * multiprocessing.cpu_count()):
def my_map_func(ctx, item):
...
# Turn off any thread limits
@handle_klio(max_thread_count=klio_utils.ThreadLimit.NONE):
def my_map_func(ctx, item):
...
# Explicitly set the limit to Klio's default
@handle_klio(max_thread_count=klio_utils.ThreadLimit.DEFAULT):
def my_map_func(ctx, item):
...
# Share thread limits between multiple transforms
global_thread_limiter = klio_utils.ThreadLimiter(max_thread_count=4)
@handle_klio(thread_limiter=global_thread_limiter)
def first_map_func(ctx, item):
...
@handle_klio(thread_limiter=global_thread_limiter)
def second_map_func(ctx, item):
...
Args:
max_thread_count (int, callable, klio.utils.ThreadLimit): number of
threads to make available to the decorated function, or a
:func:`callable` that returns an ``int``. Set to
:attr:`klio.utils.ThreadLimit.NONE` for no thread limits.
Defaults to :attr:`klio.utils.ThreadLimit.DEFAULT` (worker CPU
count) if ``thread_limiter`` is not provided.
**Mutually exclusive** with ``thread_limiter`` argument.
thread_limiter (klio.utils.ThreadLimiter): the ``ThreadLimiter``
instance that the decorator should use instead of creating its own.
Defaults to ``None``. **Mutually exclusive** with
``max_thread_count``.
"""
return _handle_klio(*args, **kwargs)
return _handle_klio(
*args,
max_thread_count=max_thread_count,
thread_limiter=thread_limiter,
**kwargs,
)


@_utils.experimental()
@txf_utils.experimental()
def timeout(seconds, *args, exception=None, exception_message=None, **kwargs):
"""Run the decorated method/function with a timeout in a separate process.
Expand Down Expand Up @@ -641,7 +780,7 @@ def my_nonklio_map_func(item):
)


@_utils.experimental()
@txf_utils.experimental()
def retry(
*args,
tries=None,
Expand Down Expand Up @@ -730,7 +869,7 @@ def my_nonklio_map_func(item):
ACTIVE_PROFILER = None


@_utils.experimental()
@txf_utils.experimental()
def profile(func_or_meth):
"""Decorator to mark a function/method for profiling. This is used in
conjunction with the ``klio job profile`` commands to selectively profile
Expand Down
4 changes: 2 additions & 2 deletions lib/src/klio/utils/_thread_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ def __init__(self, max_thread_count=ThreadLimit.DEFAULT, name=None):
if max_thread_count is ThreadLimit.NONE:
self._dummy = True
self._semaphore = _DummySemaphore()
self.logger.info(f"{self} Using unlimited semaphore")
self.logger.debug(f"{self} Using unlimited semaphore")

else:
self._dummy = False
if callable(max_thread_count):
max_thread_count = max_thread_count()
self._semaphore = threading.BoundedSemaphore(max_thread_count)
self.logger.info(
self.logger.debug(
f"{self} Initial semaphore value: {self._semaphore._value}"
)

Expand Down
Loading

0 comments on commit f117962

Please sign in to comment.