Skip to content

Commit

Permalink
Add LSF and jsrun support to horovodrun (horovod#1805)
Browse files Browse the repository at this point in the history
Example to run on a LSF cluster (e.g. Summit):
horovodrun python train.py

Perform cpu/mem process binding to get the best performance.

Contributors:
@bethune-bryant
@nvcastet

Signed-off-by: Nicolas V Castet <[email protected]>
  • Loading branch information
nvcastet authored Mar 24, 2020
1 parent f5ca35b commit 58e7de0
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 239 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ See `Run Horovod <docs/running.rst>`_ for more details, including RoCE/InfiniBan

7. To run in Singularity, see `Singularity <https://github.com/sylabs/examples/tree/master/machinelearning/horovod>`_.

8. To run in a LSF HPC cluster (e.g. Summit), see `LSF <docs/lsf.rst>`_.

Gloo
----
`Gloo <https://github.com/facebookincubator/gloo>`_ is an open source collective communications library developed by Facebook.
Expand Down
2 changes: 2 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ Guides

spark_include

lsf_include

tensor-fusion_include

timeline_include
Expand Down
32 changes: 32 additions & 0 deletions docs/lsf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.. inclusion-marker-start-do-not-remove
Horovod in LSF
==============

This page includes examples for running Horovod in a LSF cluster.
``horovodrun`` will automatically detect the host names and GPUs of your LSF job.
If the LSF cluster supports ``jsrun``, ``horovodrun`` will use it as launcher
otherwise it will default to ``mpirun``.

Inside a LSF batch file or in an interactive session, you just need to use:

.. code-block:: bash
horovodrun python train.py
Here, Horovod will start a process per GPU on all the hosts of the LSF job.

You can also limit the run to a subset of the job resources. For example, using only 6 GPUs:

.. code-block:: bash
horovodrun -np 6 python train.py
You can still pass extra arguments to ``horovodrun``. For example, to trigger CUDA-Aware MPI:

.. code-block:: bash
horovodrun --mpi-args="-gpu" python train.py
.. inclusion-marker-end-do-not-remove
3 changes: 3 additions & 0 deletions docs/lsf_include.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. include:: ./lsf.rst
:start-after: inclusion-marker-start-do-not-remove
:end-before: inclusion-marker-end-do-not-remove
2 changes: 2 additions & 0 deletions docs/summary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ See `Run Horovod <running.rst>`_ for more details, including RoCE/InfiniBand twe

7. To run in Singularity, see `Singularity <https://github.com/sylabs/examples/tree/master/machinelearning/horovod>`_.

8. To run in a LSF HPC cluster (e.g. Summit), see `LSF <lsf.rst>`_.

Gloo
----
`Gloo <https://github.com/facebookincubator/gloo>`_ is an open source collective communications library developed by Facebook.
Expand Down
220 changes: 219 additions & 1 deletion horovod/run/driver/driver_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@
# limitations under the License.
# ==============================================================================

from horovod.run.common.service import driver_service
import os
import six
import sys

from socket import AF_INET
from psutil import net_if_addrs

from horovod.run.util import cache, lsf, threads
from horovod.run.common.service import driver_service
from horovod.run.common.util import codec, safe_shell_exec
from horovod.run.task import task_service

class HorovodRunDriverService(driver_service.BasicDriverService):
NAME = 'horovodrun driver service'
Expand All @@ -33,3 +42,212 @@ def __init__(self, driver_addresses, key, verbose, match_intf=False):
key,
verbose,
match_intf=match_intf)


def _launch_task_servers(all_host_names, local_host_names, driver_addresses,
settings):
"""
Executes the task server and service client task for registration on the
hosts.
:param all_host_names: list of addresses. for example,
['worker-0','worker-1']
['10.11.11.11', '10.11.11.12']
:type all_host_names: list(string)
:param local_host_names: names that are resolved to one of the addresses
of local hosts interfaces. For example,
set(['localhost', '127.0.0.1'])
:type local_host_names: set
:param driver_addresses: map of interfaces and their address and port for
the service. For example:
{
'lo': [('127.0.0.1', 34588)],
'docker0': [('172.122.10.1', 34588)],
'eth0': [('11.111.33.73', 34588)]
}
:type driver_addresses: map
:param settings: the object that contains the setting for running horovod
:type settings: Horovod.run.common.util.settings.Settings
:return:
:rtype:
"""

def _exec_command(command):
host_output = six.StringIO()
try:
exit_code = safe_shell_exec.execute(command,
stdout=host_output,
stderr=host_output)
if exit_code != 0:
print(
'Launching horovodrun task function was not '
'successful:\n{host_output}'
.format(host_output=host_output.getvalue()))
os._exit(exit_code)
finally:
host_output.close()
return exit_code

if settings.ssh_port:
ssh_port_arg = '-p {ssh_port}'.format(ssh_port=settings.ssh_port)
else:
ssh_port_arg = ''
args_list = []
for index in range(len(all_host_names)):
host_name = all_host_names[index]
if host_name in local_host_names:
command = \
'{python} -m horovod.run.task_fn {index} ' \
'{driver_addresses} {settings}'\
.format(python=sys.executable,
index=codec.dumps_base64(index),
driver_addresses=codec.dumps_base64(driver_addresses),
settings=codec.dumps_base64(settings))
else:
command = \
'ssh -o StrictHostKeyChecking=no {host} {ssh_port_arg} ' \
'\'{python} -m horovod.run.task_fn {index} {driver_addresses}' \
' {settings}\''\
.format(host=host_name,
ssh_port_arg=ssh_port_arg,
python=sys.executable,
index=codec.dumps_base64(index),
driver_addresses=codec.dumps_base64(driver_addresses),
settings=codec.dumps_base64(settings))
args_list.append([command])
# Each thread will use ssh command to launch the server on one task. If an
# error occurs in one thread, entire process will be terminated. Otherwise,
# threads will keep running and ssh session -- and the the task server --
# will be bound to the thread. In case, the horovodrun process dies, all
# the ssh sessions and all the task servers will die as well.
threads.execute_function_multithreaded(_exec_command,
args_list,
block_until_all_done=False)


@cache.use_cache()
def _driver_fn(all_host_names, local_host_names, settings):
"""
launches the service service, launches the task service on each worker and
have them register with the service service. Each worker probes all the
interfaces of the worker index + 1 (in a ring manner) and only keeps the
routed interfaces. Function returns the intersection of the set of all the
routed interfaces on all the workers.
:param all_host_names: list of addresses. for example,
['worker-0','worker-1']
['10.11.11.11', '10.11.11.12']
:type all_host_names: list(string)
:param local_host_names: host names that resolve into a local addresses.
:type local_host_names: set
:param settings: the object that contains the setting for running horovod
:type settings: Horovod.run.common.util.settings.Settings
:return: example: ['eth0', 'eth1']
:rtype: list[string]
"""
# Launch a TCP server called service service on the host running
# horovodrun.
driver = HorovodRunDriverService(
settings.num_hosts, settings.key, settings.nic)
if settings.verbose >= 2:
print('Launched horovodrun server.')
# Have all the workers register themselves with the service service.
_launch_task_servers(all_host_names, local_host_names,
driver.addresses(), settings)
if settings.verbose >= 2:
print('Attempted to launch horovod task servers.')
try:
# wait for all the hosts to register with the service service.
if settings.verbose >= 2:
print('Waiting for the hosts to acknowledge.')
driver.wait_for_initial_registration(settings.timeout)
tasks = [
task_service.HorovodRunTaskClient(
index,
driver.task_addresses_for_driver(index),
settings.key,
settings.verbose) for index in range(
settings.num_hosts)]
# Notify all the drivers that the initial registration is complete.
for task in tasks:
task.notify_initial_registration_complete()
if settings.verbose >= 2:
print('Notified all the hosts that the registration is complete.')
# Each worker should probe the interfaces of the next worker in a ring
# manner and filter only the routed ones -- it should filter out
# interfaces that are not really connected to any external networks
# such as lo0 with address 127.0.0.1.
if settings.verbose >= 2:
print('Waiting for hosts to perform host-to-host '
'interface checking.')
driver.wait_for_task_to_task_address_updates(settings.timeout)
if settings.verbose >= 2:
print('Host-to-host interface checking successful.')
# Determine a set of common interfaces for task-to-task communication.
common_intfs = set(driver.task_addresses_for_tasks(0).keys())
for index in range(1, settings.num_hosts):
common_intfs.intersection_update(
driver.task_addresses_for_tasks(index).keys())
if not common_intfs:
raise Exception(
'Unable to find a set of common task-to-task communication '
'interfaces: %s'
% [(index, driver.task_addresses_for_tasks(index))
for index in range(settings.num_hosts)])
return common_intfs
finally:
driver.shutdown()


def _get_common_interfaces(settings, all_host_names, remote_host_names, fn_cache):
'''
Find the set of common and routed interfaces on all the hosts.
:param settings: the object that contains the setting for running horovod
:type settings: Horovod.run.common.util.settings.Settings
:param all_host_names: list of the host names
:type all_host_names: list(string)
:param remote_host_names: list of the remote host names.
:type remote_host_names: list(string)
:param fn_cache: Cache storing the results of checks performed by horovodrun
:type fn_cache: Horovod.run.util.cache.Cache
:return: List of common interfaces
'''
# Skipping interface discovery for LSF cluster as it slows down considerably the job start
if lsf.LSFUtils.using_lsf():
return None

if len(remote_host_names) > 0:
if settings.verbose >= 2:
print('Testing interfaces on all the hosts.')

local_host_names = set(all_host_names) - set(remote_host_names)
# Find the set of common, routed interfaces on all the hosts (remote
# and local) and specify it in the args to be used by NCCL. It is
# expected that the following function will find at least one interface
# otherwise, it will raise an exception.
common_intfs = _driver_fn(all_host_names, local_host_names,
settings, fn_cache=fn_cache)

if settings.verbose >= 2:
print('Interfaces on all the hosts were successfully checked.')
print('Common interface found: ' + ' '.join(common_intfs))

else:
if settings.verbose >= 2:
print('All hosts are local, finding the interfaces '
'with address 127.0.0.1')
# If all the given hosts are local, find the interfaces with address
# 127.0.0.1
common_intfs = set()
for iface, addrs in net_if_addrs().items():
if settings.nic and iface != settings.nic:
continue
for addr in addrs:
if addr.family == AF_INET and addr.address == '127.0.0.1':
common_intfs.add(iface)
break

if len(common_intfs) == 0:
raise ValueError('No interface is found for address 127.0.0.1.')

if settings.verbose >= 2:
print('Local interface found ' + ' '.join(common_intfs))
return common_intfs
Loading

0 comments on commit 58e7de0

Please sign in to comment.