Skip to content

Commit

Permalink
Add throughput timer configuration (microsoft#5363)
Browse files Browse the repository at this point in the history
The new "timers" section describes configuration for different timers.

Specifically, in the "throughput" section, it is possible to disable the
throughput timer (enabled by default). This allows to avoid the
performance degradation whenever the throughput measurement is not
needed, for example in production environment.

No device synchronize() is invoked when "synchronized" is set to False
(default is True). This allows to produce approximate throughput
measurements with minimal performance penalty.

---------

Co-authored-by: Logan Adams <[email protected]>
Co-authored-by: Olatunji Ruwase <[email protected]>
  • Loading branch information
3 people authored May 22, 2024
1 parent f4efef2 commit 995ba11
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
4 changes: 4 additions & 0 deletions deepspeed/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
from .data_pipeline.config import get_data_efficiency_enabled, get_data_efficiency_config, get_curriculum_enabled_legacy, get_curriculum_params_legacy
from .data_pipeline.constants import *

from ..utils.config import get_timers_config

TENSOR_CORE_ALIGN_SIZE = 8

ADAGRAD_OPTIMIZER = 'adagrad'
Expand Down Expand Up @@ -911,6 +913,8 @@ def _initialize_params(self, param_dict):

self.compile_config = get_compile_config(param_dict)

self.timers_config = get_timers_config(param_dict)

def _batch_assertion(self):

train_batch = self.train_batch_size
Expand Down
9 changes: 4 additions & 5 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,10 @@ def __init__(self,
# Configure wall clock timers
self.timers = SynchronizedWallClockTimer()
# Throughput timer
self.tput_timer = ThroughputTimer(
batch_size=self.train_batch_size(),
steps_per_output=self.steps_per_print(),
monitor_memory=False,
)
self.tput_timer = ThroughputTimer(self._config.timers_config,
batch_size=self.train_batch_size(),
steps_per_output=self.steps_per_print(),
monitor_memory=False)

log_dist(f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}", ranks=[0])

Expand Down
3 changes: 2 additions & 1 deletion deepspeed/runtime/pipe/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs):

self._force_grad_boundary = False

self.batch_timer = ThroughputTimer(batch_size=self.train_batch_size(),
self.batch_timer = ThroughputTimer(self._config.timers_config,
batch_size=self.train_batch_size(),
logging_fn=self.tput_log,
monitor_memory=False,
steps_per_output=self.steps_per_print())
Expand Down
46 changes: 46 additions & 0 deletions deepspeed/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

from deepspeed.runtime.config_utils import DeepSpeedConfigModel

#########################################
# Timers
#########################################
# Timers. By default, timers are enabled.
# Users can configure in ds_config.json as below example:
TIMERS_FORMAT = '''
Timers should be enabled as:
"timers": {
"throughput": {
"enabled": true,
"synchronized": true
}
}
'''

TIMERS = "timers"
TIMERS_THROUGHPUT = "throughput"


def get_timers_config(param_dict):
if param_dict and TIMERS in param_dict and TIMERS_THROUGHPUT in param_dict[TIMERS]:
timers_config_dict = param_dict[TIMERS][TIMERS_THROUGHPUT]
else:
timers_config_dict = {}
return DeepSpeedThroughputTimerConfig(**timers_config_dict)


class DeepSpeedThroughputTimerConfig(DeepSpeedConfigModel):
""" Configure throughput timers """

enabled: bool = True
""" Turn on/off throughput timers """

synchronized: bool = True
""" Whether to synchronize a device when measuring the time.
Synchronizing a device is required to produce the most accurate timer measurements.
However, this comes at the expense of performance degradation. The CPU timer provides
sufficient accuracy in many cases.
"""
20 changes: 9 additions & 11 deletions deepspeed/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,9 @@ def get_mean(self, names, normalizer=1.0, reset=True):

class ThroughputTimer:

def __init__(
self,
batch_size,
start_step=2,
steps_per_output=50,
monitor_memory=False,
logging_fn=None,
):
def __init__(self, config, batch_size, start_step=2, steps_per_output=50, monitor_memory=False, logging_fn=None):
from deepspeed.utils import logger
self.config = config
self.start_time = 0
self.end_time = 0
self.started = False
Expand Down Expand Up @@ -234,22 +228,26 @@ def _init_timer(self):
self.initialized = True

def start(self):
if not self.config.enabled:
return
self._init_timer()
self.started = True
if self.global_step_count >= self.start_step:
get_accelerator().synchronize()
if self.config.synchronized:
get_accelerator().synchronize()
self.start_time = time.time()

def stop(self, global_step=False, report_speed=True):
if not self.started:
if not self.config.enabled or not self.started:
return
self.started = False
self.micro_step_count += 1
if global_step:
self.global_step_count += 1

if self.start_time > 0:
get_accelerator().synchronize()
if self.config.synchronized:
get_accelerator().synchronize()
self.end_time = time.time()
duration = self.end_time - self.start_time
self.total_elapsed_time += duration
Expand Down

0 comments on commit 995ba11

Please sign in to comment.