Skip to content

Commit

Permalink
Merge branch 'pro_jemalloc' into 'master'
Browse files Browse the repository at this point in the history
add jeprof + profiler hook bug fix

See merge request data/monolith!2130

GitOrigin-RevId: 3421bd1f3643fe5ff7b4fe604667dbefd5a26c8b
  • Loading branch information
黄瑞腾 authored and monolith committed Sep 21, 2023
1 parent 88095e5 commit 3d7b5f6
Show file tree
Hide file tree
Showing 4 changed files with 5,693 additions and 176 deletions.
11 changes: 7 additions & 4 deletions monolith/native_training/cpu_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from monolith.native_training.hooks import feature_engineering_hooks
from monolith.native_training.hooks.server import server_lib as server_hook_lib
from monolith.native_training.metric import cli
from monolith.native_training.metric.metric_hook import Tf2ProfilerHook, Tf2ProfilerCaptureMultipleHook, NVProfilerCaptureMultipleHook
from monolith.native_training.metric.metric_hook import Tf2ProfilerHook, NVProfilerHook
from monolith.native_training.metric.metric_hook import ByteCCLTelemetryHook
from monolith.native_training.metric.metric_hook import ThroughputMetricHook
from monolith.native_training.model_export import export_hooks
Expand Down Expand Up @@ -489,6 +489,7 @@ class CpuTrainingConfig:
enable_pipelined_fwda2a: bool = False
enable_pipelined_bwda2a: bool = False
profile_some_steps_from: int = None
profile_save_steps_interval: int = 5000
profile_with_nvprof_from_to: str = None
# Sync training optimization
reorder_fids_in_data_pipeline: bool = False
Expand Down Expand Up @@ -1281,14 +1282,16 @@ def get_hooks_for_metrics(model_dir: str, save_steps: int):
if self._params.metrics.enable_tf2_profiler_hook and is_chief(self.config):
start_step = self.config.profile_some_steps_from
end_step = None if start_step is None else start_step + 10
save_steps = self.config.profile_save_steps_interval
hooks.append(
Tf2ProfilerCaptureMultipleHook(
logdir=model_dir, capture_step_range=[start_step, end_step]))
Tf2ProfilerHook(
logdir=model_dir, init_step_range=[start_step, end_step], save_steps=save_steps))

if self.config.profile_with_nvprof_from_to and is_chief(self.config):
s, e = self.config.profile_with_nvprof_from_to.split(',')
save_steps = self.config.profile_save_steps_interval
hooks.append(
NVProfilerCaptureMultipleHook(capture_step_range=[int(s), int(e)]))
NVProfilerHook(init_step_range=[int(s), int(e)], save_steps=save_steps))

if self._params.metrics.enable_throughput_hook and is_chief(self.config):
hooks.append(
Expand Down
126 changes: 34 additions & 92 deletions monolith/native_training/metric/metric_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,78 +145,27 @@ class Tf2ProfilerHook(tf.estimator.SessionRunHook):

def __init__(self,
logdir: str,
init_step_range: Tuple[int, int],
save_steps: int = None,
save_secs: int = None,
options: tf.profiler.experimental.ProfilerOptions = None):
"""Only one of save_steps and save_secs should be provided."""
self._logdir = logdir
self._options = options
self._start_step, self._end_step = init_step_range
if self._start_step is not None and (self._end_step is None or self._end_step <= self._start_step):
raise ValueError("End step invalid, start_step: {}, end_step: {}".format(self._start_step, self._end_step))
self._default_delta = 10
self._delta = self._end_step - self._start_step if self._end_step is not None else self._default_delta
if save_steps is not None and save_steps <= self._delta:
raise ValueError("Save steps must be greater than delta steps(default: {})".format(self._default_delta))
self._timer = tf.estimator.SecondOrStepTimer(every_steps=save_steps,
every_secs=save_secs)
self._global_step_tensor = tf.compat.v1.train.get_global_step()
self._current_step = 0
self._trace_me = None

self._profiling = False

def begin(self):
self._start_profiling()

def before_run(self, run_context):
del run_context
# fix step-time graph, related issue: https://github.com/tensorflow/profiler/issues/282
# TODO(huangruiteng): remove this after updating tensorflow
if self._profiling:
self._trace_me = _pywrap_traceme.TraceMe("TraceContext", graph_type="train", step_num=self._current_step)
return tf.estimator.SessionRunArgs(self._global_step_tensor)

def after_run(self, run_context, run_values: tf.estimator.SessionRunValues):
del run_context
self._current_step = run_values.results
if self._profiling:
self._trace_me.Stop()
if self._timer.should_trigger_for_step(self._current_step):
self._stop_profiling()
self._timer.update_last_triggered_step(self._current_step)
self._start_profiling()

def end(self, sess):
del sess
self._stop_profiling()

def _start_profiling(self):
try:
tf.profiler.experimental.start(self._logdir, self._options)
self._profiling = True
except tf.errors.AlreadyExistsError:
# User profiles by themselves. OK to ignore here.
pass

def _stop_profiling(self):
try:
if self._profiling:
self._profiling = False
tf.profiler.experimental.stop()
except tf.errors.UnavailableError:
# Maybe user terminates profiling, ignore here.
pass


class Tf2ProfilerCaptureMultipleHook(tf.estimator.SessionRunHook):
"""Using TF2 profiler in esitmator to capture only once."""

def __init__(self,
logdir: str,
capture_step_range: Tuple[int, int],
options: tf.profiler.experimental.ProfilerOptions = None):
"""Capture the profiler between (start, end) step of capture_step_range."""
self._logdir = logdir
self._start_step, self._end_step = capture_step_range
self._options = options
self._current_step = 0

self._profiling = False
self._range_reset_cnt = 0

def begin(self):
try:
# if enable_sync_training, there is no tf.distribute.Server
Expand All @@ -225,45 +174,30 @@ def begin(self):
tf.profiler.experimental.server.start(6666)
except:
logging.warning("cannot start profiler server at 6666")
self._global_step_tensor = training_util._get_or_create_global_step_read()
if self._global_step_tensor is None:
raise RuntimeError(
"Global step should be created to use Tf2ProfilerCaptureMultipleHook.")

def before_run(self, run_context):
# fix step-time graph, related issue: https://github.com/tensorflow/profiler/issues/282
# TODO(huangruiteng): remove this after updating tensorflow
if self._profiling:
self._trace_me = _pywrap_traceme.TraceMe("TraceContext", graph_type="train", step_num=self._current_step)
return tf.estimator.SessionRunArgs(self._global_step_tensor)
return tf.estimator.SessionRunArgs(fetches=None)

def after_run(self, run_context, run_values: tf.estimator.SessionRunValues):
self._current_step = run_values.results
default_delta = 10
if self._start_step is None:
self._start_step = self._current_step + 500
self._end_step = self._start_step + default_delta
if self._end_step is None or self._end_step <= self._start_step:
self._end_step = self._start_step + default_delta

delta = self._end_step - self._start_step
if self._range_reset_cnt == 0: # for restore train, eg. multi stage
self._range_reset_cnt += 1
if self._current_step > self._start_step: # start_step as offset
self._start_step = self._current_step + self._start_step
self._end_step = self._start_step + delta
elif self._current_step >= self._end_step:
self._range_reset_cnt += 1
self._start_step = self._current_step + delta ** self._range_reset_cnt
self._end_step = self._start_step + delta

self._current_step += 1
if self._profiling:
self._trace_me.Stop()

if not self._profiling and self._current_step >= self._start_step - 1 and self._current_step < self._end_step - 1:
self._start_profiling()
if self._profiling and self._current_step >= self._end_step - 1:
if self._start_step is None:
self._start_step = self._current_step + 500
self._end_step = self._start_step + self._default_delta
if self._current_step < self._start_step:
return
if self._current_step >= self._end_step:
self._stop_profiling()
if self._timer.should_trigger_for_step(self._current_step):
self._start_profiling()
self._timer.update_last_triggered_step(self._current_step)
self._start_step = self._current_step
self._end_step = self._start_step + self._delta

def end(self, sess):
if self._profiling:
Expand All @@ -274,6 +208,10 @@ def _start_profiling(self):
tf.profiler.experimental.start(self._logdir, self._options)
self._profiling = True
except tf.errors.AlreadyExistsError:
# Two cases:
# 1. User profiles by themselves.
# 2. When profiling by save_secs, it's still profiling after save_secs.
# OK to ignore here.
self._profiling = True

def _stop_profiling(self):
Expand All @@ -282,7 +220,7 @@ def _stop_profiling(self):
self._profiling = False
tf.profiler.experimental.stop()
except tf.errors.UnavailableError:
# Maybe user terminates profiling, ignore here.
# Maybe user terminates profiling
self._profiling = False


Expand Down Expand Up @@ -331,10 +269,14 @@ def _log_telemetry(self):
logging.info(f'Communication telemetry: {samples} ...')


class NVProfilerCaptureMultipleHook(Tf2ProfilerCaptureMultipleHook):
class NVProfilerHook(Tf2ProfilerHook):

def __init__(self, capture_step_range: Tuple[int, int]):
super().__init__(None, capture_step_range)
def __init__(self,
init_step_range: Tuple[int, int],
save_steps: int = None,
save_secs: int = None,
options: tf.profiler.experimental.ProfilerOptions = None):
super().__init__(None, init_step_range, save_steps, save_secs)
import ctypes
self._libcudart = ctypes.cdll.LoadLibrary("libcudart.so") # linux

Expand Down
107 changes: 27 additions & 80 deletions monolith/native_training/metric/metric_hook_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,108 +38,55 @@ def _count_files(self):

def test_steps(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerHook(self.logdir, save_steps=10)
hook = metric_hook.Tf2ProfilerHook(self.logdir, init_step_range=[0, 10], save_steps=50)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
pass
sess.run(self.train_op)
self.assertEqual(self._count_files(), 1)

def test_multiple_steps(self):
def test_multiple_steps_1(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerHook(self.logdir, save_steps=10)
hook = metric_hook.Tf2ProfilerHook(self.logdir, init_step_range=[0, 10], save_steps=30)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for _ in range(19):
for _ in range(30):
sess.run(self.train_op)
# Since profiler directory is named by seconds, we need to make sure
# two dumps are in the different folder.
time.sleep(0.15)
# Triggered at 0, 10, 19
self.assertEqual(self._count_files(), 3)

def test_already_profiled(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerHook(self.logdir, save_steps=10)
tf.profiler.experimental.start(self.logdir)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for i in range(15):
sess.run(self.train_op)
tf.profiler.experimental.stop()

def test_secs(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerHook(self.logdir, save_secs=1)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for _ in range(10):
sess.run(self.train_op)
# In total, we will sleep for 1.5s
time.sleep(0.15)
# At least we will 2 dumps (maybe more depending on how fast we run the program)
self.assertGreaterEqual(self._count_files(), 2)


class Tf2ProfilerCaptureMultipleHookTest(tf.test.TestCase):

def setUp(self):
super().setUp()
self.logdir = os.path.join(os.environ["TEST_TMPDIR"], self._testMethodName)
self.filepattern = os.path.join(self.logdir, "plugins/profile/*")
self.graph = tf.Graph()
with self.graph.as_default():
self.global_step = tf.compat.v1.train.get_or_create_global_step()
self.train_op = tf.compat.v1.assign_add(self.global_step, 1)

def _count_files(self):
return len(tf.io.gfile.glob(self.filepattern))
'''
def test_basic(self):
# stop and save by .after_run in hook
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerCaptureMultipleHook(self.logdir,
capture_step_range=[10, 18])
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for _ in range(19):
sess.run(self.train_op)
time.sleep(0.15)
# Triggered at 0~9
self.assertEqual(self._count_files(), 1)

def test_exceeded_range(self):
# stop and save by .end in hook
def test_multiple_steps_2(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerCaptureMultipleHook(self.logdir,
capture_step_range=[10, 21])
hook = metric_hook.Tf2ProfilerHook(self.logdir, init_step_range=[0, 10], save_steps=30)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for _ in range(19):
for _ in range(31):
sess.run(self.train_op)
# Since profiler directory is named by seconds, we need to make sure
# two dumps are in the different folder.
time.sleep(0.15)
self.assertEqual(self._count_files(), 1)
# Triggered at 0~9, 30
self.assertEqual(self._count_files(), 2)

def test_already_profiled(self):
def test_secs_1(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerCaptureMultipleHook(self.logdir,
capture_step_range=[10, 11])
tf.profiler.experimental.start(self.logdir)
hook = metric_hook.Tf2ProfilerHook(self.logdir, init_step_range=[0, 10], save_secs=1)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for i in range(15):
for _ in range(10):
sess.run(self.train_op)
try:
tf.profiler.experimental.stop()
except tf.errors.UnavailableError:
pass
'''

def test_multi_profiled(self):
# In total, we will sleep for 1.5s, but it still remains profiling first step range
time.sleep(0.15)
self.assertGreaterEqual(self._count_files(), 1)

def test_secs_2(self):
with self.graph.as_default():
hook = metric_hook.Tf2ProfilerCaptureMultipleHook(self.logdir,
capture_step_range=[5, 10])
tf.profiler.experimental.start(self.logdir)
hook = metric_hook.Tf2ProfilerHook(self.logdir, init_step_range=[0, 10], save_secs=3)
with tf.compat.v1.train.SingularMonitoredSession(hooks=[hook]) as sess:
for i in range(100):
for _ in range(21):
sess.run(self.train_op)
# In total, we will sleep for 3.15s
time.sleep(0.15)
self.assertEqual(self._count_files(), 2)
try:
tf.profiler.experimental.stop()
except tf.errors.UnavailableError:
pass
# At least we will 2 dumps (maybe more depending on how fast we run the program)
self.assertGreaterEqual(self._count_files(), 2)


class FileMetricHookTest(tf.test.TestCase):
Expand Down
Loading

0 comments on commit 3d7b5f6

Please sign in to comment.