Skip to content

Commit

Permalink
Merge pull request spotify#94 from spotify/lynn/add-audit-steps
Browse files Browse the repository at this point in the history
[exec] Add multithread tf & numpy version audit steps
  • Loading branch information
econchick authored Oct 14, 2020
2 parents 3372f77 + 7e54c51 commit 7e5637b
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 7 deletions.
2 changes: 2 additions & 0 deletions exec/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def find_meta(meta):
AUDIT_PLUGIN_PATH = "klio_exec.commands.audit_steps."
AUDIT_PLUGINS = [
"tempfile=" + AUDIT_PLUGIN_PATH + "tempfile_usage:_init",
"multithreaded_tf=" + AUDIT_PLUGIN_PATH + "multithreaded_tf:_init",
"numpy_broken_blas=" + AUDIT_PLUGIN_PATH + "numpy_broken_blas:_init",
]


Expand Down
46 changes: 46 additions & 0 deletions exec/src/klio_exec/commands/audit_steps/multithreaded_tf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

from klio_exec.commands.audit_steps import base


class MultithreadedTFUsage(base.BaseKlioAuditStep):
"""Use caution when running tensorflow in a multithreaded environment."""

AUDIT_STEP_NAME = "multithreaded_tf"

@property
def _is_tensorflow_loaded(self):
return any(["tensorflow" in module for module in sys.modules])

@property
def _is_job_single_threaded_per_container(self):
exps = self.klio_config.pipeline_options.experiments
return "worker_threads=1" in exps

def after_tests(self):
if not self._is_job_single_threaded_per_container:
if self._is_tensorflow_loaded:
self.emit_warning(
"TensorFlow usage detected within job, but "
"`worker_threads` is not explicitly set to 1 under "
"`pipeline_options.experiments` in the job's configuration "
"file! This can cause threading issues. Be careful."
)


_init = MultithreadedTFUsage
81 changes: 81 additions & 0 deletions exec/src/klio_exec/commands/audit_steps/numpy_broken_blas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import platform

from packaging import version

from klio_exec.commands.audit_steps import base


class NumPyBrokenBLASUsage(base.BaseKlioAuditStep):
"""Detect numpy version for potential threading issues."""

AUDIT_STEP_NAME = "numpy_broken_blas"
MINIMUM_NUMPY_VERSION = version.parse("1.16.3")

@staticmethod
def get_description():
# tmp turn off black formatting to skip long URL
# fmt: off
return (
"The 1.16.3 version of numpy links against a newer version of "
"OpenBLAS that fixes some important threading issues - notably, "
"the `dot` function that calls into OpenBLAS' _dgemv function, "
"which on older versions, is non-reentrant and can cause both "
"incorrect results and deadlocks.\n\n"
"See:\n"
"\t- https://github.com/numpy/numpy/blob/2f70544179e24b0ebc0263111f36e6450bbccf94/doc/source/release/1.16.3-notes.rst#numpy-1163-release-notes\n" # noqa: E501
"\t- https://github.com/xianyi/OpenBLAS/issues/1844\n"
"\t- https://github.com/numpy/numpy/issues/12394\n"
"\t- https://github.com/xianyi/OpenBLAS/pull/1865\n"
)
# fmt: on

@property
def _is_job_single_threaded_per_container(self):
exps = self.klio_config.pipeline_options.experiments
return "worker_threads=1" in exps

@staticmethod
def _get_current_numpy_version():
try:
import numpy

return version.parse(numpy.version.short_version)
except ImportError:
return None

def after_tests(self):
if self._is_job_single_threaded_per_container:
return

if platform.system().lower() != "linux":
return

numpy_version = self._get_current_numpy_version()
if numpy_version is None:
return

if numpy_version < NumPyBrokenBLASUsage.MINIMUM_NUMPY_VERSION:
msg = (
"Multiple threads are used, but a NumPy version older than %s "
"was detected. Older versions of NumPy are known to be "
"thread-unsafe due to a broken OpenBLAS dependency on Linux."
) % NumPyBrokenBLASUsage.MINIMUM_NUMPY_VERSION
self.emit_error(msg)


_init = NumPyBrokenBLASUsage
40 changes: 40 additions & 0 deletions exec/tests/unit/commands/audit_steps/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest

from klio_exec.commands.audit_steps import base


@pytest.fixture
def klio_config(mocker):
conf = mocker.Mock()
conf.pipeline_options = mocker.Mock()
conf.pipeline_options.experiments = []
return conf


@pytest.fixture
def mock_emit_warning(mocker, monkeypatch):
mock = mocker.Mock()
monkeypatch.setattr(base.BaseKlioAuditStep, "emit_warning", mock)
return mock


@pytest.fixture
def mock_emit_error(mocker, monkeypatch):
mock = mocker.Mock()
monkeypatch.setattr(base.BaseKlioAuditStep, "emit_error", mock)
return mock
44 changes: 44 additions & 0 deletions exec/tests/unit/commands/audit_steps/test_multithreaded_tf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest

from klio_exec.commands.audit_steps import multithreaded_tf


@pytest.mark.parametrize("tf_loaded", (True, False))
@pytest.mark.parametrize("worker_threads", (0, 1, 2))
def test_multithreaded_tf_usage(
tf_loaded, worker_threads, klio_config, mock_emit_warning, mocker
):
if worker_threads:
klio_config.pipeline_options.experiments = [
"worker_threads={}".format(worker_threads)
]

if tf_loaded:
mocker.patch.dict("sys.modules", {"tensorflow": ""})

mt_tf_usage = multithreaded_tf.MultithreadedTFUsage(
"job/dir", klio_config, "term_writer"
)

mt_tf_usage.after_tests()

if worker_threads != 1 and tf_loaded:
# don't care about the actual message
assert 1 == mock_emit_warning.call_count
else:
mock_emit_warning.assert_not_called()
62 changes: 62 additions & 0 deletions exec/tests/unit/commands/audit_steps/test_numpy_broken_blas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest

from klio_exec.commands.audit_steps import numpy_broken_blas


@pytest.mark.parametrize(
"worker_threads,platform,has_numpy,version,exp_emit_calls",
(
(1, None, False, None, 0),
(2, "Darwin", False, None, 0),
(0, "Linux", False, None, 0),
(2, "Linux", False, None, 0),
(2, "Linux", True, "1.17.0", 0),
(2, "Linux", True, "1.16.3", 0),
(2, "Linux", True, "1.16.2", 1),
),
)
def test_numpy_broken_blas_usage(
worker_threads,
platform,
has_numpy,
version,
exp_emit_calls,
klio_config,
mock_emit_error,
mocker,
monkeypatch,
):
if worker_threads:
klio_config.pipeline_options.experiments = [
"worker_threads={}".format(worker_threads)
]

monkeypatch.setattr(numpy_broken_blas.platform, "system", lambda: platform)
if has_numpy is False:
mocker.patch.dict("sys.modules", {"numpy": None})
else:
monkeypatch.setattr("numpy.version.short_version", version)

numpy_broken_blas_usage = numpy_broken_blas.NumPyBrokenBLASUsage(
"job/dir", klio_config, "term_writer"
)

numpy_broken_blas_usage.after_tests()

# don't care about the actual message
assert exp_emit_calls == mock_emit_error.call_count
9 changes: 2 additions & 7 deletions exec/tests/unit/commands/audit_steps/test_tempfile_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@
from klio_exec.commands.audit_steps import tempfile_usage


def test_tempfile_usage(mocker, monkeypatch):
mock_emit_error = mocker.Mock()
monkeypatch.setattr(
tempfile_usage.TempFileUsage, "emit_error", mock_emit_error
)

inst = tempfile_usage.TempFileUsage("job/dir", "config", "tw")
def test_tempfile_usage(klio_config, mock_emit_error, mocker, monkeypatch):
inst = tempfile_usage.TempFileUsage("job/dir", klio_config, "tw")

assert "TemporaryFile" == tempfile.TemporaryFile.__name__ # sanity check

Expand Down

0 comments on commit 7e5637b

Please sign in to comment.