Skip to content

Commit

Permalink
Merge pull request yahoo#503 from yahoo/leewyang_update_gpu
Browse files Browse the repository at this point in the history
revised gpu allocation code
  • Loading branch information
elyast authored Feb 19, 2020
2 parents 147882b + d4cf9cc commit 5bb7e9d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 60 deletions.
3 changes: 3 additions & 0 deletions tensorflowonspark/TFCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def timeout_handler(signum, frame):
if len(jobs) == 0:
break

# stop reservation server
self.server.stop()

def tensorboard_url(self):
"""Utility function to get the Tensorboard URL"""
for node in self.cluster_info:
Expand Down
93 changes: 60 additions & 33 deletions tensorflowonspark/TFSparkNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,65 @@ def _mapfn(iter):
executor_id = i

# check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node
# note: for Spark 3+ w/ GPU allocation, the required number of GPUs should be guaranteed by the resource manager
if version.parse(pyspark.__version__).base_version < version.parse('3.0.0').base_version:
if gpu_info.is_gpu_available():
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
gpus_to_use = gpu_info.get_gpus(num_gpus)
def _get_gpus(cluster_spec=None):
gpus = []
is_k8s = 'SPARK_EXECUTOR_POD_IP' in os.environ

# handle explicitly configured tf_args.num_gpus
if 'num_gpus' in tf_args:
requested_gpus = tf_args.num_gpus
user_requested = True
else:
requested_gpus = 0
user_requested = False

# first, try Spark 3 resources API, returning all visible GPUs
# note: num_gpus arg is only used (if supplied) to limit/truncate visible devices
if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version:
from pyspark import TaskContext
context = TaskContext()
if 'gpu' in context.resources():
# get all GPUs assigned by resource manager
gpus = context.resources()['gpu'].addresses
logger.info("Spark gpu resources: {}".format(gpus))
if user_requested:
if requested_gpus < len(gpus):
# override/truncate list, if explicitly configured
logger.warn("Requested {} GPU(s), but {} available".format(requested_gpus, len(gpus)))
gpus = gpus[:requested_gpus]
else:
# implicitly requested by Spark 3
requested_gpus = len(gpus)

# if not in K8s pod and GPUs available, just use original allocation code (defaulting to 1 GPU if available)
# Note: for K8s, there is a bug with the Nvidia device_plugin which can show GPUs for non-GPU pods that are hosted on GPU nodes
if not is_k8s and gpu_info.is_gpu_available() and not gpus:
# default to one GPU if not specified explicitly
requested_gpus = max(1, requested_gpus) if not user_requested else requested_gpus
if requested_gpus > 0:
if cluster_spec:
# compute my index relative to other nodes on the same host (for GPU allocation)
my_addr = cluster_spec[job_name][task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)
else:
my_index = 0

# try to allocate a GPU
gpus = gpu_info.get_gpus(requested_gpus, my_index, format=gpu_info.AS_LIST)

if user_requested and len(gpus) < requested_gpus:
raise Exception("Unable to allocate {} GPU(s) from available GPUs: {}".format(requested_gpus, gpus))

gpus_to_use = ','.join(gpus)
if gpus:
logger.info("Requested {} GPU(s), setting CUDA_VISIBLE_DEVICES={}".format(requested_gpus if user_requested else len(gpus), gpus_to_use))
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use

# try GPU allocation at executor startup so we can try to fail out if unsuccessful
_get_gpus()

# assign TF job/task based on provided cluster_spec template (or use default/null values)
job_name = 'default'
Expand Down Expand Up @@ -309,34 +363,7 @@ def _mapfn(iter):

# reserve GPU(s) again, just before launching TF process (in case situation has changed)
# and setup CUDA_VISIBLE_DEVICES accordingly
if gpu_info.is_gpu_available():

gpus_to_use = None
# For Spark 3+, try to get GPU resources from TaskContext first
if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version:
from pyspark import TaskContext
context = TaskContext()
if 'gpu' in context.resources():
# use ALL GPUs assigned by resource manager
gpus = context.resources()['gpu'].addresses
num_gpus = len(gpus)
gpus_to_use = ','.join(gpus)

if not gpus_to_use:
# compute my index relative to other nodes on the same host (for GPU allocation)
my_addr = cluster_spec[job_name][task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)

# default to one GPU if not specified explicitly
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)

gpu_str = "GPUs" if num_gpus > 1 else "GPU"
logger.info("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use))
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
_get_gpus(cluster_spec=cluster_spec)

# create a context object to hold metadata for TF
ctx = TFNodeContext(executor_id, job_name, task_index, cluster_spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr)
Expand Down
36 changes: 9 additions & 27 deletions tensorflowonspark/gpu_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,16 @@
from __future__ import nested_scopes
from __future__ import print_function

import ctypes as ct
import logging
import platform
import random
import subprocess
import time

logger = logging.getLogger(__name__)

MAX_RETRIES = 3 #: Maximum retries to allocate GPUs


def _get_gpu():
"""*DEPRECATED*. Allocates first available GPU using cudaSetDevice(), or returns 0 otherwise."""
# Note: this code executes, but Tensorflow subsequently complains that the "current context was not created by the StreamExecutor cuda_driver API"
system = platform.system()
if system == "Linux":
libcudart = ct.cdll.LoadLibrary("libcudart.so")
elif system == "Darwin":
libcudart = ct.cdll.LoadLibrary("libcudart.dylib")
elif system == "Windows":
libcudart = ct.windll.LoadLibrary("libcudart.dll")
else:
raise NotImplementedError("Cannot identify system.")

device_count = ct.c_int()
libcudart.cudaGetDeviceCount(ct.byref(device_count))
gpu = 0
for i in range(device_count.value):
if (0 == libcudart.cudaSetDevice(i) and 0 == libcudart.cudaFree(0)):
gpu = i
break
return gpu
AS_STRING = 'string'
AS_LIST = 'list'


def is_gpu_available():
Expand All @@ -51,7 +28,7 @@ def is_gpu_available():
return False


def get_gpus(num_gpu=1, worker_index=-1):
def get_gpus(num_gpu=1, worker_index=-1, format=AS_STRING):
"""Get list of free GPUs according to nvidia-smi.
This will retry for ``MAX_RETRIES`` times until the requested number of GPUs are available.
Expand Down Expand Up @@ -112,7 +89,12 @@ def parse_gpu(gpu_str):
proposed_gpus = free_gpus[worker_index * num_gpu:(worker_index * num_gpu + num_gpu)]
logger.info("Proposed GPUs: {}".format(proposed_gpus))

return ','.join(str(x) for x in proposed_gpus)
if format == AS_STRING:
return ','.join(str(x) for x in proposed_gpus)
elif format == AS_LIST:
return proposed_gpus
else:
raise Exception("Unknown GPU format")


# Function to get the gpu information
Expand Down

0 comments on commit 5bb7e9d

Please sign in to comment.