diff --git a/.bazelrc b/.bazelrc index a5c9f009f236b0..0491eb500c7a71 100644 --- a/.bazelrc +++ b/.bazelrc @@ -92,7 +92,6 @@ build:sycl_nodouble --config=sycl build:sycl_trisycl --define=using_trisycl=true # Options extracted from configure script -build:gdr --define=with_gdr_support=true build:ngraph --define=with_ngraph_support=true build:numa --define=with_numa_support=true diff --git a/configure.py b/configure.py index 63aa32c8de75cd..743198eea47c1b 100644 --- a/configure.py +++ b/configure.py @@ -1503,7 +1503,6 @@ def main(): 'details.') config_info_line('mkl', 'Build with MKL support.') config_info_line('monolithic', 'Config for mostly static monolithic build.') - config_info_line('gdr', 'Build with GDR support.') config_info_line('ngraph', 'Build with Intel nGraph support.') config_info_line('numa', 'Build with NUMA support.') config_info_line( diff --git a/tensorflow/BUILD b/tensorflow/BUILD index 5113eafda489fb..ed1fb3a46181f2 100644 --- a/tensorflow/BUILD +++ b/tensorflow/BUILD @@ -309,12 +309,6 @@ config_setting( visibility = ["//visibility:public"], ) -config_setting( - name = "with_gdr_support", - define_values = {"with_gdr_support": "true"}, - visibility = ["//visibility:public"], -) - config_setting( name = "with_numa_support", define_values = {"with_numa_support": "true"}, diff --git a/tensorflow/contrib/gdr/BUILD b/tensorflow/contrib/gdr/BUILD deleted file mode 100644 index 1eead8bff44985..00000000000000 --- a/tensorflow/contrib/gdr/BUILD +++ /dev/null @@ -1,132 +0,0 @@ -# Description: -# GPU Direct RDMA Out-of-Band Tensor transport for TensorFlow. - -# For platform specific build config -load( - "//tensorflow/core/platform:default/build_config.bzl", - "tf_proto_library_cc", -) - -package( - default_visibility = [ - "//tensorflow:__subpackages__", - ], - licenses = ["notice"], # Apache 2.0 -) - -exports_files(["LICENSE"]) - -filegroup( - name = "c_srcs", - data = glob([ - "**/*.cc", - "**/*.h", - ]), -) - -tf_proto_library_cc( - name = "gdr_proto", - srcs = ["gdr.proto"], - cc_api_version = 2, - visibility = [ - "//tensorflow:__subpackages__", - ], -) - -cc_library( - name = "gdr_memory_manager", - srcs = ["gdr_memory_manager.cc"], - hdrs = ["gdr_memory_manager.h"], - linkopts = select({ - "//tensorflow:with_gdr_support": [ - "-libverbs", - "-lrdmacm", - ], - "//conditions:default": [], - }), - deps = [ - ":gdr_proto_cc", - "//tensorflow/core:core_cpu_internal", - "//tensorflow/core:framework", - "//tensorflow/core:gpu_runtime", - "//tensorflow/core:lib", - "//tensorflow/core:lib_internal", - ], -) - -cc_library( - name = "gdr_worker", - srcs = ["gdr_worker.cc"], - hdrs = ["gdr_worker.h"], - deps = [ - ":gdr_memory_manager", - "//tensorflow/core:core_cpu_internal", - "//tensorflow/core:framework", - "//tensorflow/core:lib", - "//tensorflow/core:lib_internal", - "//tensorflow/core/distributed_runtime:graph_mgr", - "//tensorflow/core/distributed_runtime:recent_request_ids", - "//tensorflow/core/distributed_runtime:rendezvous_mgr_interface", - "//tensorflow/core/distributed_runtime:worker", - "//tensorflow/core/distributed_runtime:worker_cache", - "//tensorflow/core/distributed_runtime:worker_session", - "//tensorflow/core/distributed_runtime/rpc:grpc_call", - "//tensorflow/core/distributed_runtime/rpc:grpc_tensor_coding", - "//tensorflow/core/distributed_runtime/rpc:grpc_util", - "//tensorflow/core/distributed_runtime/rpc:grpc_worker_service", - ], -) - -cc_library( - name = "gdr_rendezvous_mgr", - srcs = ["gdr_rendezvous_mgr.cc"], - hdrs = ["gdr_rendezvous_mgr.h"], - deps = [ - ":gdr_memory_manager", - "//tensorflow/core:core_cpu_internal", - "//tensorflow/core:framework", - "//tensorflow/core:lib", - "//tensorflow/core/distributed_runtime:base_rendezvous_mgr", - "//tensorflow/core/distributed_runtime:request_id", - "//tensorflow/core/distributed_runtime:tensor_coding", - "//tensorflow/core/distributed_runtime:worker_cache", - "//tensorflow/core/distributed_runtime:worker_env", - "//tensorflow/core/distributed_runtime:worker_interface", - ], -) - -cc_library( - name = "gdr_collective_executor_mgr", - srcs = ["gdr_collective_executor_mgr.cc"], - hdrs = ["gdr_collective_executor_mgr.h"], - deps = [ - ":gdr_memory_manager", - "//tensorflow/core:core_cpu_internal", - "//tensorflow/core:framework", - "//tensorflow/core:lib_internal", - "//tensorflow/core/distributed_runtime:cancellable_call", - "//tensorflow/core/distributed_runtime:collective_param_resolver_distributed", - "//tensorflow/core/distributed_runtime:device_resolver_distributed", - "//tensorflow/core/distributed_runtime:request_id", - "//tensorflow/core/distributed_runtime:rpc_collective_executor_mgr", - "//tensorflow/core/distributed_runtime:worker_cache", - ], -) - -cc_library( - name = "gdr_server_lib", - srcs = ["gdr_server_lib.cc"], - hdrs = ["gdr_server_lib.h"], - linkstatic = 1, # Seems to be needed since alwayslink is broken in bazel - deps = [ - ":gdr_collective_executor_mgr", - ":gdr_memory_manager", - ":gdr_rendezvous_mgr", - ":gdr_worker", - "//tensorflow/core:core_cpu_internal", - "//tensorflow/core/distributed_runtime:collective_param_resolver_distributed", - "//tensorflow/core/distributed_runtime:device_resolver_distributed", - "//tensorflow/core/distributed_runtime/rpc:grpc_server_lib", - ], - alwayslink = 1, -) diff --git a/tensorflow/contrib/gdr/README.md b/tensorflow/contrib/gdr/README.md deleted file mode 100644 index 711adc865f37fc..00000000000000 --- a/tensorflow/contrib/gdr/README.md +++ /dev/null @@ -1,131 +0,0 @@ -Introduction -=== - -This is an implementation of GDR out-of-band transport for TensorFlow distributed runtime, complementary to current gRPC transport. It uses gRPC as control plane to setup rendezvous for each tensor transmission, and utilizes [GPU Direct RDMA](https://developer.nvidia.com/gpudirect) whenever possible to transmit tensors in remote GPU memory through network interface card (NIC), bypassing host memory and CPU entirely. It gracefully falls back to ordinary RDMA or even gRPC when GDR is not available. - -Design -=== - -The GDR out-of-band transport is designed to avoid any unnecessary memory copies, especially for large tensors (>100MB). That typically requires registration of tensor buffers to NIC in an ad-hoc manner, which is rather slow as described in the design trade-off of the verbs runtime. The verbs runtime thus chooses to manage its own NIC-registered buffers and copy the tensors from/to those buffers for every single tensor transfer. - -We show that, however, such design trade-off is not always relevant. In this patch, we manage both computation and communication buffers in a unified manner. By pre-registration of large buffers to NIC and allocating small tensors from the buffer pool using a BFC allocator, it is possible to avoid both ad-hoc buffer registration and memory copies all together. - -For the actual tensor transport, we rely on gRPC to transmit the [remote buffer information](gdr.proto). This greatly simplifies our design, and there are only 2 types of RDMA messages: a single READ to retrieve the tensor data (bypassing remote CPU), and another invalidate using WRITE with IMM to release the tensor buffer on the remote side. The remote side will only be polling the invalidate message and `Unref` the tensor buffers that read by its peer. - -Environment -=== - -To fully utilize GDR, the target environment has to meet 3 conditions: - -1. There is an RDMA capable device with corresponding [OFED package](https://www.openfabrics.org/index.php/overview.html) installed (detailed information is available from your [Infiniband/RoCE](http://www.mellanox.com/page/products_dyn?product_family=116)/[iWarp](http://www.chelsio.com/gpudirect-rdma/) vendor), which could be verified through `ibv_devinfo`, e.g. - -``` -$ ibv_devinfo -hca_id: mlx4_0 - transport: InfiniBand (0) - fw_ver: 2.40.7000 - node_guid: 248a:0703:00f6:3370 - sys_image_guid: 248a:0703:00f6:3370 - vendor_id: 0x02c9 - vendor_part_id: 4099 - hw_ver: 0x1 - board_id: MT_1090110023 - phys_port_cnt: 2 - Device ports: - port: 1 - state: PORT_ACTIVE (4) - max_mtu: 4096 (5) - active_mtu: 1024 (3) - sm_lid: 0 - port_lid: 0 - port_lmc: 0x00 - link_layer: Ethernet - - port: 2 - state: PORT_ACTIVE (4) - max_mtu: 4096 (5) - active_mtu: 1024 (3) - sm_lid: 0 - port_lid: 0 - port_lmc: 0x00 - link_layer: Ethernet -``` - -2. There is a GDR capable GPU, i.e. of Fermi, Kepler or later architecture with [corresponding driver](http://docs.nvidia.com/cuda/gpudirect-rdma/index.html) installed. The PCI-e topology could be confirmed by `nvidia-smi topo -m`. For example, in the following topology, `GPU2` and `GPU3` are adjacent to `mlx4_0`, and tensors on these devices could benefit from GDR in current implementation. - -``` -$ nvidia-smi topo -m - GPU0 GPU1 GPU2 GPU3 mlx4_0 CPU Affinity -GPU0 X PHB SOC SOC SOC 0-5 -GPU1 PHB X SOC SOC SOC 0-5 -GPU2 SOC SOC X PHB PHB 6-11 -GPU3 SOC SOC PHB X PHB 6-11 -mlx4_0 SOC SOC PHB PHB X - -Legend: - - X = Self - SOC = Connection traversing PCIe as well as the SMP link between CPU sockets(e.g. QPI) - PHB = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU) - PXB = Connection traversing multiple PCIe switches (without traversing the PCIe Host Bridge) - PIX = Connection traversing a single PCIe switch - NV# = Connection traversing a bonded set of # NVLinks -``` - -3. The [`nv_peer_mem`](https://github.com/Mellanox/nv_peer_memory) kernel module is installed. - -How to build and run in GDR mode -=== - -To test it out on a GDR capable environment, choose to enable GDR in your configure script. - -``` -Do you wish to build TensorFlow with GDR support? [y/N]: y -GDR support will be enabled for TensorFlow. -``` - -Change your `protocol` to `grpc+gdr` to enable GDR in your deployment. - -``` -server = tf.train.Server(cluster, job_name="local", task_index=0, protocol='grpc+gdr') # default protocol is 'grpc' -``` - -Currently the out-of-band transport service listens to the same IP and port address as specified in gRPC. - -A successful initialization looks like this: - -``` -2017-08-05 19:10:38.601718: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:0) -> (device: 0, name: Tesla K40m, pci bus id: 0000:02:00.0) -2017-08-05 19:10:38.601728: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:1) -> (device: 1, name: Tesla K40m, pci bus id: 0000:03:00.0) -2017-08-05 19:10:38.601736: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:2) -> (device: 2, name: Tesla K40m, pci bus id: 0000:82:00.0) -2017-08-05 19:10:38.601742: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1045] Creating TensorFlow device (/gpu:3) -> (device: 3, name: Tesla K40m, pci bus id: 0000:83:00.0) -2017-08-05 19:10:39.591026: I tensorflow/contrib/gdr/gdr_memory_manager.cc:235] RDMA server is listening on 10.40.2.200:5001 -2017-08-05 19:10:39.591071: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cuda_host_bfc -2017-08-05 19:10:39.591083: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cpu_pool -2017-08-05 19:10:39.591095: I tensorflow/contrib/gdr/gdr_memory_manager.cc:285] Instrumenting CPU allocator cpu_rdma_bfc -2017-08-05 19:10:39.591278: I tensorflow/contrib/gdr/gdr_memory_manager.cc:78] NUMA node for device: mlx4_0 is 1 -2017-08-05 19:10:39.740253: I tensorflow/contrib/gdr/gdr_memory_manager.cc:296] Instrumenting GPU allocator with bus_id 2 -``` - -The last line suggests that the GPUs with bus id 2 (mapped to pci bus id prefixed 0000:8) will benefit from GDR and host memory bypass, which is `/gpu:2` and `/gpu:3` in this case. - -Caveats -=== - -In current implementation, only tensors that reside in host memory or in GPU memory such that the GPU is adjacent to an RDMA capable NIC will use direct RDMA as its transport. When RDMA is available but not GDR, a temporary tensor copy on host memory will be used as RDMA source/destination (and copied from/to the target device). When there is no RDMA device present, it can even fallback to the original gRPC runtime. While it is theoretically possible to mix GDR enabled TF with non-GDR deployments in the same job, make sure the environment is properly setup so the GDR mode is enabled whenever possible (i.e. do not fall back to gRPC when it is not absolutely necessary). - -In the original design (as in the reference), tensor buffers are only registered -to NIC when we could determine that the tensor will be either a source of Send -or a sink of Recv across physical machine boundary. However, to implement the -precise allocations, we need to change all the devices to possibly return a NIC -compatible allocator. As GDR is currently in contrib, we would like to avoid the -unnecessary code disruption to the TF core, so we allocate all tensors from -NIC-registered buffers using a BFC allocator. This behavior is similar to the -effect of enabling the extra GPU option `force_gpu_compatible`, which allocate -all host tensors in GPU-registered buffers no matter they will be transferred -from/to GPUs or not. - -Reference -=== - -Bairen Yi, Jiacheng Xia, Li Chen, and Kai Chen. 2017. Towards Zero Copy Dataflows using RDMA. In Proceedings of SIGCOMM Posters and Demos'17, Los Angeles, CA, USA, August 22-24, 2017, 3 pages. https://doi.org/10.1145/3123878.3131975 diff --git a/tensorflow/contrib/gdr/gdr.proto b/tensorflow/contrib/gdr/gdr.proto deleted file mode 100644 index bd438787c3374b..00000000000000 --- a/tensorflow/contrib/gdr/gdr.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; - -package tensorflow; -option cc_enable_arenas = true; - -message RemoteMemoryRegion { - string host = 1; - string port = 2; - uint64 addr = 3; - uint32 rkey = 4; - uint32 tensor_key = 5; -} diff --git a/tensorflow/contrib/gdr/gdr_collective_executor_mgr.cc b/tensorflow/contrib/gdr/gdr_collective_executor_mgr.cc deleted file mode 100644 index 4988ce6d2fe15d..00000000000000 --- a/tensorflow/contrib/gdr/gdr_collective_executor_mgr.cc +++ /dev/null @@ -1,162 +0,0 @@ -/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ -#include "tensorflow/contrib/gdr/gdr_collective_executor_mgr.h" - -#include "tensorflow/core/common_runtime/base_collective_executor.h" -#include "tensorflow/core/common_runtime/collective_executor_mgr.h" -#include "tensorflow/core/common_runtime/collective_rma_local.h" -#include "tensorflow/core/common_runtime/dma_helper.h" -#include "tensorflow/core/distributed_runtime/cancellable_call.h" -#include "tensorflow/core/distributed_runtime/collective_param_resolver_distributed.h" -#include "tensorflow/core/distributed_runtime/device_resolver_distributed.h" -#include "tensorflow/core/distributed_runtime/request_id.h" -#include "tensorflow/core/distributed_runtime/worker_cache.h" -#include "tensorflow/core/lib/random/random.h" -#include "tensorflow/core/platform/unbounded_work_queue.h" - -namespace tensorflow { - -class WorkerCacheInterface; - -namespace { - -class RecvBufCall : public CancellableCall { - public: - RecvBufCall(int64 step_id, const string& peer_device, const string& peer_task, - const string& key, Device* to_device, - DeviceContext* to_device_ctx, - const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor, - const DeviceLocality& client_locality, - const DeviceAttributes& server_attributes, - CancellationManager* cancel_mgr, WorkerCacheInterface* wc) - : CancellableCall(cancel_mgr, peer_task, wc) { - req_.set_step_id(step_id); - req_.set_buf_rendezvous_key(key); - *req_.mutable_client_locality() = client_locality; - *req_.mutable_server_locality() = server_attributes.locality(); - req_.set_num_bytes(to_tensor->TotalBytes()); - req_.set_buf_ptr(reinterpret_cast(DMAHelper::base(to_tensor))); - req_.set_src_device(peer_device); - req_.set_src_incarnation(server_attributes.incarnation()); - req_.set_dst_device(to_device->name()); - req_.set_request_id(GetUniqueRequestId()); - } - - ~RecvBufCall() override {} - - void IssueCall(const StatusCallback& done) override { - wi_->RecvBufAsync(&opts_, &req_, &resp_, done); - } - - RecvBufRequest req_; - RecvBufResponse resp_; -}; - -class CollectiveRemoteAccessDistributed : public CollectiveRemoteAccessLocal { - public: - CollectiveRemoteAccessDistributed( - const DeviceMgr* dev_mgr, DeviceResolverInterface* dev_resolver, - std::shared_ptr work_queue, - WorkerCacheInterface* worker_cache, int64 step_id, - RemoteMemoryManager* remote_memory_manager) - : CollectiveRemoteAccessLocal(dev_mgr, dev_resolver, work_queue, step_id), - worker_cache_(worker_cache), - remote_memory_manager_(remote_memory_manager) {} - - ~CollectiveRemoteAccessDistributed() override {} - - void RecvFromPeer(const string& peer_device, const string& peer_task, - bool peer_is_local, const string& key, Device* to_device, - DeviceContext* to_device_ctx, - const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor, - const DeviceLocality& client_locality, - int dev_to_dev_stream_index, - const StatusCallback& done) override { - if (peer_is_local) { - CollectiveRemoteAccessLocal::RecvFromPeer( - peer_device, peer_task, peer_is_local, key, to_device, to_device_ctx, - to_alloc_attr, to_tensor, client_locality, dev_to_dev_stream_index, - done); - return; - } - - // State that needs to be threaded through a couple of async calls - // in order to make this function completely non-blocking. - struct State { - DeviceAttributes server_attributes; - std::unique_ptr call; - }; - State* state = new State; - - // Logic to be executed on the RecvBufAsync callback. - auto recv_buf_callback = [this, state, peer_task, to_device, to_alloc_attr, - to_device_ctx, to_tensor, done](const Status& s) { - if (s.ok()) { - remote_memory_manager_->TensorFromTransportOptions( - to_tensor, state->call->resp_.transport_options(), to_device, - to_device_ctx, to_alloc_attr.on_host(), done); - } else if (errors::IsFailedPrecondition(s)) { - dev_resolver_->ClearTask(peer_task); - done(s); - } - - delete state; - }; - - // Logic to execute once we have the device attributes for the server-side - // device. - auto dev_attributes_callback = [this, state, peer_device, peer_task, key, - to_device, to_device_ctx, to_alloc_attr, - to_tensor, client_locality, - recv_buf_callback](const Status& s) { - if (!s.ok()) { - recv_buf_callback(s); - } else { - state->call.reset(new RecvBufCall( - step_id_, peer_device, peer_task, key, to_device, to_device_ctx, - to_alloc_attr, to_tensor, client_locality, state->server_attributes, - &cancel_mgr_, worker_cache_)); - state->call->Start(recv_buf_callback); - } - }; - - dev_resolver_->GetDeviceAttributesAsync(peer_device, peer_task, - &state->server_attributes, - dev_attributes_callback); - } - - void StartAbort(const Status& s) override { - CollectiveRemoteAccessLocal::StartAbort(s); - cancel_mgr_.StartCancel(); - } - - protected: - WorkerCacheInterface* worker_cache_; // Not owned - CancellationManager cancel_mgr_; - RemoteMemoryManager* remote_memory_manager_; -}; - -} // namespace - -CollectiveExecutor* GdrCollectiveExecutorMgr::Create(int64 step_id) { - CollectiveRemoteAccessDistributed* rma = - new CollectiveRemoteAccessDistributed(dev_mgr_, dev_resolver_.get(), - work_queue_, worker_cache_, step_id, - remote_memory_manager_); - return new BaseCollectiveExecutor(this, rma, step_id, dev_mgr_, - &gpu_ring_order_); -} - -} // namespace tensorflow diff --git a/tensorflow/contrib/gdr/gdr_collective_executor_mgr.h b/tensorflow/contrib/gdr/gdr_collective_executor_mgr.h deleted file mode 100644 index 1417e51e82c310..00000000000000 --- a/tensorflow/contrib/gdr/gdr_collective_executor_mgr.h +++ /dev/null @@ -1,56 +0,0 @@ -/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ -#ifndef TENSORFLOW_CONTRIB_GDR_GDR_COLLECTIVE_EXECUTOR_MGR_H_ -#define TENSORFLOW_CONTRIB_GDR_GDR_COLLECTIVE_EXECUTOR_MGR_H_ - -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" -#include "tensorflow/core/distributed_runtime/collective_param_resolver_distributed.h" -#include "tensorflow/core/distributed_runtime/device_resolver_distributed.h" -#include "tensorflow/core/distributed_runtime/rpc_collective_executor_mgr.h" -#include "tensorflow/core/framework/collective.h" - -namespace tensorflow { -class ConfigProto; -class DeviceMgr; -class WorkerCacheInterface; -class StepSequenceRequest; -class StepSequenceResponse; - -// An implementation of CollectiveExecutorMgr for a distributed environment -// that uses WorkerInterface::RecvBufAsync to route data transfers over RDMA. -class GdrCollectiveExecutorMgr : public RpcCollectiveExecutorMgr { - public: - GdrCollectiveExecutorMgr( - const ConfigProto& config, const DeviceMgr* dev_mgr, - std::unique_ptr dev_resolver, - std::unique_ptr param_resolver, - WorkerCacheInterface* worker_cache, const string& task_name, - RemoteMemoryManager* remote_memory_manager) - : RpcCollectiveExecutorMgr(config, dev_mgr, std::move(dev_resolver), - std::move(param_resolver), worker_cache, - task_name), - remote_memory_manager_(remote_memory_manager) {} - - ~GdrCollectiveExecutorMgr() override {} - - protected: - virtual CollectiveExecutor* Create(int64 step_id) override; - - private: - RemoteMemoryManager* remote_memory_manager_; // Not owned. -}; - -} // namespace tensorflow -#endif // TENSORFLOW_CONTRIB_GDR_GDR_COLLECTIVE_EXECUTOR_MGR_H_ diff --git a/tensorflow/contrib/gdr/gdr_memory_manager.cc b/tensorflow/contrib/gdr/gdr_memory_manager.cc deleted file mode 100644 index 9b8e832fd96c89..00000000000000 --- a/tensorflow/contrib/gdr/gdr_memory_manager.cc +++ /dev/null @@ -1,622 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifdef TENSORFLOW_USE_GDR - -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "tensorflow/contrib/gdr/gdr.pb.h" -#include "tensorflow/core/common_runtime/device.h" -#include "tensorflow/core/common_runtime/dma_helper.h" -#include "tensorflow/core/common_runtime/gpu/gpu_process_state.h" -#include "tensorflow/core/common_runtime/process_state.h" -#include "tensorflow/core/lib/core/status.h" -#include "tensorflow/core/lib/random/random.h" -#include "tensorflow/core/platform/macros.h" -#include "tensorflow/core/platform/mutex.h" -#include "tensorflow/core/platform/numa.h" - -namespace tensorflow { - -namespace { - -bool IsGDRAvailable() { -#if defined(__APPLE__) - return false; -#elif defined(PLATFORM_WINDOWS) - return false; -#else - std::ifstream ifs("/proc/modules"); - string line; - while (std::getline(ifs, line)) { - auto sep = line.find(' '); - CHECK_NE(sep, std::string::npos); - if (line.substr(0, sep) == "nv_peer_mem") { - return true; - } - } - return false; -#endif -} - -int TryToReadNumaNode(ibv_device* device) { -#if defined(__APPLE__) - LOG(INFO) << "OS X does not support NUMA - returning NUMA node 0"; - return port::kNUMANoAffinity; -#elif defined(PLATFORM_WINDOWS) - // Windows support for NUMA is not currently implemented. Return node 0. - return port::kNUMANoAffinity; -#else - auto filename = string(device->ibdev_path) + "/device/numa_node"; - - std::ifstream ifs(filename.c_str()); - string content; - const auto& ret = std::getline(ifs, content); - if (!ret) { - return port::kNUMANoAffinity; - } - - int32 value; - if (strings::safe_strto32(content, &value)) { - if (value < 0) { - return port::kNUMANoAffinity; - } - LOG(INFO) << "NUMA node for device: " << device->name << " is " << value; - return value; - } - return port::kNUMANoAffinity; -#endif -} - -void EndpointDeleter(rdma_cm_id* id) { - if (id) { - rdma_destroy_ep(id); - } -} - -void MRDeleter(ibv_mr* mr) { - if (mr) { - rdma_dereg_mr(mr); - } -} - -using RdmaEndpointPtr = std::unique_ptr; - -using MemoryRegionPtr = std::unique_ptr; - -class GdrMemoryManager : public RemoteMemoryManager { - public: - GdrMemoryManager(const string& host, const string& port); - - virtual ~GdrMemoryManager() {} - - virtual Status Init() override; - - virtual void Run() override; - - virtual void Stop() override; - - virtual void TransportOptionsFromTensor( - ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) override; - - virtual void TensorFromTransportOptions( - Tensor* tensor, const ::google::protobuf::Any& transport_options, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) override; - - protected: - Status CreateEndpoint(const string& host, const string& port, - RdmaEndpointPtr& endpoint); - - static bool Comparator(const void* ptr, const MemoryRegionPtr& other) { - return ptr < reinterpret_cast(other->addr) + other->length; - } - - ibv_mr* FindMemoryRegion(const Tensor* tensor); - - void InsertMemoryRegion(void* addr, size_t length, - const std::string& allocator_name); - - void EvictMemoryRegion(void* addr, size_t length); - - private: - const string host_; - const string port_; - RdmaEndpointPtr listening_; - std::atomic stopped_; - int numa_node_; - - // Server side endpoints - // Accessed sequentially in Run() so not protected by lock - std::list server_clients_; - - using TensorKey = uint32_t; - std::atomic next_key_; - - // Server side on-the-fly tensor buffers - mutex buf_mu_; - std::map tensor_buffers_ GUARDED_BY(buf_mu_); - - // Client side endpoints - mutex client_mu_; - std::map, RdmaEndpointPtr> clients_ - GUARDED_BY(client_mu_); - - // Client side callbacks - mutex callback_mu_; - std::map tensor_callbacks_ - GUARDED_BY(callback_mu_); - - // Managed memory regions - mutex alloc_mu_; - std::vector mrs_ GUARDED_BY(alloc_mu_); - - TF_DISALLOW_COPY_AND_ASSIGN(GdrMemoryManager); -}; - -GdrMemoryManager::GdrMemoryManager(const string& host, const string& port) - : host_(host), - port_(port), - listening_(nullptr, EndpointDeleter), - stopped_(true), - next_key_(static_cast(random::New64())) {} - -Status GdrMemoryManager::Init() { - rdma_addrinfo* addrinfo; - rdma_addrinfo hints = {}; - hints.ai_port_space = RDMA_PS_TCP; - hints.ai_flags = RAI_PASSIVE; - if (rdma_getaddrinfo(const_cast(host_.c_str()), - const_cast(port_.c_str()), &hints, &addrinfo)) { - return errors::Unavailable(strerror(errno), ": ", "cannot resolve rdma://", - host_, ":", port_); - } - - ibv_qp_init_attr init_attr = {}; - init_attr.qp_type = IBV_QPT_RC; - init_attr.cap.max_recv_wr = 1024; - init_attr.cap.max_send_wr = 1; - init_attr.cap.max_recv_sge = 1; - init_attr.cap.max_send_sge = 1; - - // Create listening endpoint - rdma_cm_id* id; - if (rdma_create_ep(&id, addrinfo, nullptr, &init_attr)) { - return errors::Unavailable(strerror(errno), ": ", "cannot bind to rdma://", - host_, ":", port_); - } - listening_.reset(id); - rdma_freeaddrinfo(addrinfo); - - // Listen without backlog - if (rdma_listen(listening_.get(), 0)) { - return errors::Unavailable(strerror(errno), ": ", - "cannot listen on rdma://", host_, ":", port_); - } - LOG(INFO) << "RDMA server is listening on " << host_ << ":" << port_; - - if (listening_->verbs == nullptr) { - return errors::Unimplemented( - "Unsupported address ", host_, ":", port_, - " as it does not bind to a particular RDMA device"); - } - - int flags = fcntl(listening_->channel->fd, F_GETFL, 0); - if (fcntl(listening_->channel->fd, F_SETFL, flags | O_NONBLOCK)) { - return errors::Unavailable(strerror(errno), ": ", - "cannot set server to non-blocking mode"); - } - - numa_node_ = TryToReadNumaNode(listening_->verbs->device); - - SubAllocator::Visitor alloc_visitor = [this](void* ptr, int numa_node, - size_t num_bytes) { - VLOG(2) << "Registering RDMA capable memory region on numa_node " - << numa_node; - InsertMemoryRegion(ptr, num_bytes, strings::StrCat("CPU:", numa_node)); - }; - SubAllocator::Visitor free_visitor = [this](void* ptr, int numa_node, - size_t num_bytes) { - VLOG(2) << "De-registering RDMA capable memory region on numa_node " - << numa_node; - EvictMemoryRegion(ptr, num_bytes); - }; - ProcessState::singleton()->AddCPUAllocVisitor(alloc_visitor); - ProcessState::singleton()->AddCPUFreeVisitor(free_visitor); - LOG(INFO) << "Instrumenting CPU allocator(s)"; - - for (int numa_idx = 0; numa_idx < port::NUMANumNodes(); ++numa_idx) { - GPUProcessState::singleton()->AddGpuHostAllocVisitor(numa_idx, - alloc_visitor); - GPUProcessState::singleton()->AddGpuHostFreeVisitor(numa_idx, free_visitor); - } - - if (IsGDRAvailable()) { - SubAllocator::Visitor cuda_alloc_visitor = [this](void* ptr, int gpu_id, - size_t num_bytes) { - VLOG(2) << "Registering RDMA capable memory region on GPU " << gpu_id; - InsertMemoryRegion(ptr, num_bytes, strings::StrCat("GPU:", gpu_id)); - }; - GPUProcessState::singleton()->AddGPUAllocVisitor(numa_node_, - cuda_alloc_visitor); - LOG(INFO) << "Instrumenting GPU allocator for NUMA " << numa_node_; - } - - return Status::OK(); -} - -void GdrMemoryManager::Run() { - stopped_ = false; - while (!stopped_) { - rdma_cm_id* id = nullptr; - // Accept incoming connections - if (!rdma_get_request(listening_.get(), &id)) { - if (!rdma_accept(id, nullptr)) { - LOG(INFO) << "Accepted new RDMA connection"; - for (int i = 0; i < 1024; i++) { - if (rdma_post_recvv(id, nullptr, nullptr, 0)) { - LOG(ERROR) << strerror(errno) << ": rdma_post_recvv failed"; - EndpointDeleter(id); - continue; - } - } - server_clients_.push_back({id, EndpointDeleter}); - } - } - // Polling server side work completions - for (const auto& client : server_clients_) { - ibv_wc wc[32]; - int ret = ibv_poll_cq(client->recv_cq, 32, wc); - if (ret < 0) { - LOG(ERROR) << "ibv_poll_cq failed"; - continue; - } - for (int i = 0; i < ret; i++) { - if (wc[i].opcode != IBV_WC_RECV_RDMA_WITH_IMM) { - LOG(ERROR) << "Received unknown operation " << wc[i].opcode; - } - if (wc[i].status != 0) { - LOG(ERROR) << ibv_wc_status_str(wc[i].status); - } - TensorKey tensor_key = ntohl(wc[i].imm_data); - - if (rdma_post_recvv(client.get(), nullptr, nullptr, 0)) { - perror("rdma_post_recvv"); - LOG(ERROR) << "rdma_post_recvv failed"; - } - - mutex_lock l(buf_mu_); - auto iter = tensor_buffers_.find(tensor_key); - if (iter == std::end(tensor_buffers_)) { - LOG(ERROR) << "Cannot find tensor buffer for tensor key " - << tensor_key; - } else { - const TensorBuffer* buffer = iter->second; - buffer->Unref(); - tensor_buffers_.erase(iter); - } - } - } - // Polling client side work completions - if (client_mu_.try_lock()) { - for (const auto& client : clients_) { - ibv_wc wc[32]; - int ret = ibv_poll_cq(client.second->send_cq, 32, wc); - for (int i = 0; i < ret; i++) { - Status s; - if (wc[i].status) { - s = errors::Unavailable(ibv_wc_status_str(wc[i].status)); - } else { - s = Status::OK(); - } - TensorKey key = wc[i].wr_id; - - ibv_send_wr wr = {}; - wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; - wr.imm_data = htonl(key); - ibv_send_wr* bad_wr; - if (ibv_post_send(client.second->qp, &wr, &bad_wr)) { - LOG(ERROR) << strerror(errno) - << ": ibv_post_send failed for tensor_key " << key; - } - - mutex_lock l(callback_mu_); - auto iter = tensor_callbacks_.find(key); - if (iter != std::end(tensor_callbacks_)) { - iter->second(s); - tensor_callbacks_.erase(iter); - } else { - LOG(WARNING) << "Cannot find client callback with tensor key " - << key; - } - } - } - client_mu_.unlock(); - } - } -} - -void GdrMemoryManager::Stop() { stopped_ = true; } - -void GdrMemoryManager::TransportOptionsFromTensor( - ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) { - ibv_mr* mr = FindMemoryRegion(&tensor); - const TensorBuffer* buffer = DMAHelper::buffer(&tensor); - - Tensor* copy = nullptr; - - if (mr == nullptr) { - AllocatorAttributes alloc_attrs; - alloc_attrs.set_gpu_compatible(true); - alloc_attrs.set_nic_compatible(true); - alloc_attrs.set_on_host(true); - Allocator* alloc = device->GetAllocator(alloc_attrs); - copy = new Tensor(alloc, tensor.dtype(), tensor.shape()); - - mr = FindMemoryRegion(copy); - buffer = DMAHelper::buffer(copy); - if (mr == nullptr) { - done(errors::Unavailable("Cannot find pinned memory region")); - delete copy; - return; - } - } - - TensorKey tensor_key = next_key_++; - buffer->Ref(); - { - mutex_lock l(buf_mu_); - tensor_buffers_.insert(std::make_pair(tensor_key, buffer)); - } - - RemoteMemoryRegion remote_mr; - remote_mr.set_host(host_); - remote_mr.set_port(port_); - remote_mr.set_addr(reinterpret_cast(buffer->data())); - remote_mr.set_rkey(mr->rkey); - remote_mr.set_tensor_key(tensor_key); - mutable_transport_options->PackFrom(remote_mr); - - if (copy && device->tensorflow_gpu_device_info() && !on_host) { - device_context->CopyDeviceTensorToCPU(&tensor, "" /* tensor_name */, device, - copy, [done, copy](const Status& s) { - done(s); - delete copy; - }); - return; - } else if (copy) { - std::memcpy(buffer->data(), DMAHelper::buffer(&tensor)->data(), - buffer->size()); - done(Status::OK()); - delete copy; // OK to delete; we have reffed the underlying TensorBuffer - } else { - done(Status::OK()); - } -} - -void GdrMemoryManager::TensorFromTransportOptions( - Tensor* tensor, const ::google::protobuf::Any& transport_options, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) { - RemoteMemoryRegion remote_mr; - if (!transport_options.UnpackTo(&remote_mr)) { - done(errors::NotFound("No RDMA transport options found")); - return; - } - - rdma_cm_id* id = nullptr; - { - decltype(clients_)::iterator iter; - bool success; - mutex_lock l(client_mu_); - std::tie(iter, success) = clients_.insert( - std::make_pair(std::make_pair(remote_mr.host(), remote_mr.port()), - RdmaEndpointPtr(nullptr, EndpointDeleter))); - if (success || iter->second.get() == nullptr) { - Status s = - CreateEndpoint(remote_mr.host(), remote_mr.port(), iter->second); - if (!s.ok()) { - done(s); - return; - } - } - id = iter->second.get(); - } - - ibv_mr* mr = FindMemoryRegion(tensor); - const TensorBuffer* buffer = DMAHelper::buffer(tensor); - - const Tensor* copy = nullptr; - - if (mr == nullptr) { - AllocatorAttributes alloc_attrs; - alloc_attrs.set_gpu_compatible(true); - alloc_attrs.set_nic_compatible(true); - alloc_attrs.set_on_host(true); - Allocator* alloc = device->GetAllocator(alloc_attrs); - copy = new Tensor(alloc, tensor->dtype(), tensor->shape()); - - mr = FindMemoryRegion(copy); - buffer = DMAHelper::buffer(copy); - if (mr == nullptr) { - done(errors::Unavailable("Cannot find pinned memory region")); - delete copy; - return; - } - } - - uint64_t start = Env::Default()->NowMicros(); - - TensorKey tensor_key = remote_mr.tensor_key(); - - StatusCallback callback = [done, copy, device, device_context, on_host, - tensor, start, tensor_key](const Status& s) { - if (!s.ok()) { - done(s); - if (copy) { - delete copy; - } - return; - } - - VLOG(2) << "RDMA of tensor " << tensor_key << " of size " - << DMAHelper::buffer(tensor)->size() << " took " - << (Env::Default()->NowMicros() - start) << " micros"; - - if (copy && device->tensorflow_gpu_device_info() && !on_host) { - device_context->CopyCPUTensorToDevice(copy, device, tensor, - [done, copy](const Status& s) { - done(s); - delete copy; - }); - } else if (copy) { - std::memcpy(DMAHelper::buffer(tensor)->data(), - DMAHelper::buffer(copy)->data(), - DMAHelper::buffer(copy)->size()); - done(s); - delete copy; - } else { - done(s); - } - }; - - { - mutex_lock l(callback_mu_); - if (tensor_callbacks_.find(tensor_key) == std::end(tensor_callbacks_)) { - tensor_callbacks_.insert(std::make_pair(tensor_key, std::move(callback))); - } else { - done(errors::Unavailable("Received duplicated tensor key")); - if (copy) { - delete copy; - } - return; - } - } - - if (rdma_post_read(id, reinterpret_cast(tensor_key), buffer->data(), - buffer->size(), mr, IBV_SEND_SIGNALED, remote_mr.addr(), - remote_mr.rkey())) { - done(errors::Unavailable(strerror(errno), ": ", "rdma_post_read failed")); - { - mutex_lock l(callback_mu_); - auto iter = tensor_callbacks_.find(tensor_key); - if (iter != std::end(tensor_callbacks_)) { - tensor_callbacks_.erase(iter); - } - } - if (copy) { - delete copy; - } - } -} - -Status GdrMemoryManager::CreateEndpoint(const string& host, const string& port, - RdmaEndpointPtr& endpoint) { - rdma_addrinfo* addrinfo; - rdma_addrinfo hints = {}; - hints.ai_port_space = RDMA_PS_TCP; - if (rdma_getaddrinfo(const_cast(host.c_str()), - const_cast(port.c_str()), &hints, &addrinfo)) { - return errors::InvalidArgument( - strerror(errno), ": ", "cannot connect to rdma://", host, ":", port); - } - - ibv_qp_init_attr init_attr = {}; - init_attr.qp_type = IBV_QPT_RC; - init_attr.cap.max_recv_wr = 1; - init_attr.cap.max_send_wr = 1024; - init_attr.cap.max_recv_sge = 1; - init_attr.cap.max_send_sge = 1; - - rdma_cm_id* id; - if (rdma_create_ep(&id, addrinfo, nullptr, &init_attr)) { - rdma_freeaddrinfo(addrinfo); - return errors::Unavailable(strerror(errno), ": ", - "cannot create endpoint to rdma://", host, ":", - port); - } - rdma_freeaddrinfo(addrinfo); - - if (rdma_connect(id, nullptr)) { - rdma_destroy_ep(id); - return errors::Unavailable(strerror(errno), ": ", - "cannot connect to rdma://", host, ":", port); - } - - LOG(INFO) << "RDMA endpoint connected to rdma://" << host << ":" << port; - endpoint = RdmaEndpointPtr(id, EndpointDeleter); - return Status::OK(); -} - -ibv_mr* GdrMemoryManager::FindMemoryRegion(const Tensor* tensor) { - const void* addr = DMAHelper::buffer(tensor)->data(); - mutex_lock l(alloc_mu_); - auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator); - if (iter == std::end(mrs_) || iter->get()->addr > addr) { - return nullptr; - } else { - return iter->get(); - } -} - -void GdrMemoryManager::InsertMemoryRegion(void* addr, size_t length, - const std::string& allocator_name) { - if (length == 0) return; - ibv_mr* mr = rdma_reg_read(listening_.get(), addr, length); - if (mr != nullptr) { - mutex_lock l(alloc_mu_); - auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator); - mrs_.insert(iter, {mr, &MRDeleter}); - } else { - LOG(WARNING) << "Cannot register memory region allocated by " - << allocator_name; - } -} - -void GdrMemoryManager::EvictMemoryRegion(void* addr, size_t length) { - if (length == 0) return; - mutex_lock l(alloc_mu_); - auto iter = std::upper_bound(mrs_.begin(), mrs_.end(), addr, &Comparator); - if (iter != std::end(mrs_) && iter->get()->addr == addr) { - mrs_.erase(iter); - } else { - LOG(WARNING) << "Failed to de-register memory region"; - } -} - -} // namespace - -RemoteMemoryManager* CreateRemoteMemoryManager(const string& host, - const string& port) { - return new GdrMemoryManager(host, port); -} - -} // namespace tensorflow - -#endif // TENSORFLOW_USE_GDR diff --git a/tensorflow/contrib/gdr/gdr_memory_manager.h b/tensorflow/contrib/gdr/gdr_memory_manager.h deleted file mode 100644 index c85886863ee59b..00000000000000 --- a/tensorflow/contrib/gdr/gdr_memory_manager.h +++ /dev/null @@ -1,60 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CONTRIB_GDR_GDR_MEMORY_MANAGER_H_ -#define TENSORFLOW_CONTRIB_GDR_GDR_MEMORY_MANAGER_H_ - -#include "google/protobuf/any.pb.h" -#include "tensorflow/core/lib/core/status.h" - -namespace tensorflow { - -class Device; -class DeviceContext; -class Tensor; - -// Abstract interface that handles out-of-band tensor transport. -// -// The transport options are encoded into a protocol buffer and transmitted via -// some other communication channels like RPC. -// See RecvTensorRequest in tensorflow/core/protobuf/worker.proto -class RemoteMemoryManager { - public: - virtual ~RemoteMemoryManager() {} - virtual Status Init() = 0; - virtual void Run() = 0; - virtual void Stop() = 0; - - // Encodes the tensor information to an arbitrary protocol buffer - // The protocol buffer needs to be transmitted via some other channel - virtual void TransportOptionsFromTensor( - ::google::protobuf::Any* mutable_transport_options, const Tensor& tensor, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) = 0; - - // Retrieve the tensor from the encoded protocol buffer - // Note that the tensor has to be allocated, but not initialized - virtual void TensorFromTransportOptions( - Tensor* tensor, const ::google::protobuf::Any& transport_options, - Device* device, DeviceContext* device_context, bool on_host, - StatusCallback done) = 0; -}; - -RemoteMemoryManager* CreateRemoteMemoryManager(const string& host, - const string& port); - -} // namespace tensorflow - -#endif // TENSORFLOW_CONTRIB_GDR_GDR_MEMORY_MANAGER_H_ diff --git a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc deleted file mode 100644 index 51f6201005aa1f..00000000000000 --- a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.cc +++ /dev/null @@ -1,216 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/contrib/gdr/gdr_rendezvous_mgr.h" - -#include "google/protobuf/any.pb.h" -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" -#include "tensorflow/core/common_runtime/device.h" -#include "tensorflow/core/common_runtime/device_mgr.h" -#include "tensorflow/core/common_runtime/process_util.h" -#include "tensorflow/core/distributed_runtime/request_id.h" -#include "tensorflow/core/distributed_runtime/tensor_coding.h" -#include "tensorflow/core/distributed_runtime/worker_cache.h" -#include "tensorflow/core/distributed_runtime/worker_interface.h" -#include "tensorflow/core/framework/types.h" -#include "tensorflow/core/lib/core/errors.h" -#include "tensorflow/core/lib/strings/numbers.h" -#include "tensorflow/core/lib/strings/str_util.h" -#include "tensorflow/core/platform/logging.h" -#include "tensorflow/core/platform/macros.h" -#include "tensorflow/core/platform/types.h" - -namespace tensorflow { - -namespace { - -class GdrRecvTensorCall : public BaseRecvTensorCall { - public: - GdrRecvTensorCall(WorkerInterface* wi, Device* dst_device, - RemoteMemoryManager* remote_memory_manager, - const Rendezvous::Args& recv_args, int64 step_id, - StringPiece key) - : wi_(wi), - dst_device_(dst_device), - remote_memory_manager_(remote_memory_manager), - recv_args_(recv_args) { - req_.set_step_id(step_id); - req_.set_rendezvous_key(key.data(), key.size()); - req_.set_request_id(GetUniqueRequestId()); - } - - ~GdrRecvTensorCall() override {} - - void Start(std::function recv_done) override { - req_.set_dma_ok(true); - resp_.InitAlloc(dst_device_, recv_args_.alloc_attrs); - StatusCallback cb = [this, recv_done](const Status& s) { - bool dma_ok = resp_.metadata().has_transport_options(); - if (s.ok() && tensor().TotalBytes() > 1024 && (!is_dead()) && dma_ok) { - auto transport_options = resp_.metadata().transport_options(); - const bool on_host = recv_args_.alloc_attrs.on_host(); - remote_memory_manager_->TensorFromTransportOptions( - const_cast(&tensor()), transport_options, dst_device_, - recv_args_.device_context, on_host, - [this, recv_done](const Status& s) { - if (!s.ok()) { - mutex_lock l(mu_); - status_.Update(s); - } - recv_done(); - }); - return; - } - if (!s.ok()) { - mutex_lock l(mu_); - status_.Update(s); - } - recv_done(); - }; - wi_->RecvTensorAsync(&opts_, &req_, &resp_, std::move(cb)); - } - - void StartAbort(const Status& s) override { - { - mutex_lock l(mu_); - status_.Update(s); - } - opts_.StartCancel(); - } - - Status status() const override { - mutex_lock l(mu_); - return status_; - } - - const Tensor& tensor() const { return resp_.tensor(); } - - bool is_dead() const { return resp_.metadata().is_dead(); } - - Device* dst_device() const { return dst_device_; } - - const Rendezvous::Args& recv_args() const { return recv_args_; } - - private: - WorkerInterface* wi_; - Device* dst_device_; - RemoteMemoryManager* remote_memory_manager_; - CallOptions opts_; - RecvTensorRequest req_; - TensorResponse resp_; - Rendezvous::Args recv_args_; - - mutable mutex mu_; - Status status_ GUARDED_BY(mu_); - - TF_DISALLOW_COPY_AND_ASSIGN(GdrRecvTensorCall); -}; - -class GdrRemoteRendezvous : public BaseRemoteRendezvous { - public: - GdrRemoteRendezvous(const WorkerEnv* env, int64 step_id, - RemoteMemoryManager* remote_memory_manager) - : BaseRemoteRendezvous(env, step_id), - remote_memory_manager_(remote_memory_manager) {} - - protected: - void RecvFromRemoteAsync(const Rendezvous::ParsedKey& parsed, - const Rendezvous::Args& recv_args, - DoneCallback done) override { - CHECK(is_initialized()); - - string src_worker; - string src_rel_device; - if (!DeviceNameUtils::SplitDeviceName(parsed.src_device, &src_worker, - &src_rel_device)) { - Status s = errors::Internal(parsed.src_device, - " is invalid remote source device."); - done(s, Args(), recv_args, Tensor{}, false); - return; - } - - WorkerSession* sess = session(); - WorkerInterface* rwi = sess->worker_cache->GetOrCreateWorker(src_worker); - if (rwi == nullptr) { - Status s = errors::Internal("No worker known as ", src_worker); - done(s, Args(), recv_args, Tensor{}, false); - return; - } - - Device* dst_device; - Status s = sess->device_mgr()->LookupDevice(parsed.dst_device, &dst_device); - if (!s.ok()) { - sess->worker_cache->ReleaseWorker(src_worker, rwi); - done(s, Args(), recv_args, Tensor{}, false); - return; - } - - // Prepare a RecvTensor call that can handle being aborted. - GdrRecvTensorCall* call = - new GdrRecvTensorCall(rwi, dst_device, remote_memory_manager_, - recv_args, step_id_, parsed.FullKey()); - - // Record "call" in active_ so that it can be aborted cleanly. - RegisterCall(call, recv_args); - - // RendezvousMgr already aborted, shouldn't send RPC call any more - if (!call->status().ok()) { - // NOTE: `*session()` can potentially be deleted before we return from - // `call->done()(...)`, so we must release the worker before calling the - // callback. - session()->worker_cache->ReleaseWorker(src_worker, rwi); - done(call->status(), Args(), Args(), Tensor(), false); - delete call; - return; - } - - // Start "call". - Ref(); - call->Start([this, call, src_worker, rwi, done]() { - // Removes "call" from active_. Prevent StartAbort(). - DeregisterCall(call); - // If StartAbort was called prior to DeregisterCall, then the - // current status should be bad. - Status s = call->status(); - // NOTE: `*session()` can potentially be deleted before we return from - // `call->done()(...)`, so we must release the worker before calling the - // callback. - session()->worker_cache->ReleaseWorker(src_worker, rwi); - done(s, Args(), call->recv_args(), call->tensor(), call->is_dead()); - delete call; - Unref(); - }); - } - - private: - ~GdrRemoteRendezvous() override {} - - RemoteMemoryManager* remote_memory_manager_; - - TF_DISALLOW_COPY_AND_ASSIGN(GdrRemoteRendezvous); -}; - -} // namespace - -GdrRendezvousMgr::GdrRendezvousMgr(const WorkerEnv* env, - RemoteMemoryManager* remote_memory_manager) - : BaseRendezvousMgr(env), remote_memory_manager_(remote_memory_manager) {} - -BaseRemoteRendezvous* GdrRendezvousMgr::Create(int64 step_id, - const WorkerEnv* worker_env) { - return new GdrRemoteRendezvous(worker_env, step_id, remote_memory_manager_); -} - -} // end namespace tensorflow diff --git a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h b/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h deleted file mode 100644 index 47a36efdb7ccc7..00000000000000 --- a/tensorflow/contrib/gdr/gdr_rendezvous_mgr.h +++ /dev/null @@ -1,42 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CONTRIB_GDR_GDR_RENDEZVOUS_MGR_H_ -#define TENSORFLOW_CONTRIB_GDR_GDR_RENDEZVOUS_MGR_H_ - -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" -#include "tensorflow/core/distributed_runtime/base_rendezvous_mgr.h" -#include "tensorflow/core/distributed_runtime/worker_env.h" -#include "tensorflow/core/platform/macros.h" - -namespace tensorflow { - -class GdrRendezvousMgr : public BaseRendezvousMgr { - public: - explicit GdrRendezvousMgr(const WorkerEnv* env, - RemoteMemoryManager* remote_memory_manager); - - protected: - BaseRemoteRendezvous* Create(int64 step_id, const WorkerEnv* worker_env); - - private: - RemoteMemoryManager* remote_memory_manager_; // Not owned - - TF_DISALLOW_COPY_AND_ASSIGN(GdrRendezvousMgr); -}; - -} // end namespace tensorflow - -#endif // TENSORFLOW_CONTRIB_GDR_GDR_RENDEZVOUS_MGR_H_ diff --git a/tensorflow/contrib/gdr/gdr_server_lib.cc b/tensorflow/contrib/gdr/gdr_server_lib.cc deleted file mode 100644 index c39cc0f9bcecc2..00000000000000 --- a/tensorflow/contrib/gdr/gdr_server_lib.cc +++ /dev/null @@ -1,155 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/contrib/gdr/gdr_server_lib.h" - -#include "grpc/support/alloc.h" -#include "tensorflow/contrib/gdr/gdr_collective_executor_mgr.h" -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" -#include "tensorflow/contrib/gdr/gdr_rendezvous_mgr.h" -#include "tensorflow/contrib/gdr/gdr_worker.h" -#include "tensorflow/core/common_runtime/collective_rma_local.h" -#include "tensorflow/core/distributed_runtime/collective_param_resolver_distributed.h" -#include "tensorflow/core/distributed_runtime/device_resolver_distributed.h" - -namespace tensorflow { - -GdrServer::GdrServer(const ServerDef& server_def, Env* env) - : GrpcServer(server_def, env) { - string host; - string port; - for (const auto& job : server_def.cluster().job()) { - if (job.name() == server_def.job_name()) { - auto iter = job.tasks().find(server_def.task_index()); - if (iter != job.tasks().end()) { - const std::vector hostname_port = - str_util::Split(iter->second, ':'); - if (hostname_port.size() == 2) { - host = hostname_port[0]; - port = hostname_port[1]; - } - } - } - } - remote_memory_manager_ = std::unique_ptr( - CreateRemoteMemoryManager(host, port)); -} - -GdrServer::~GdrServer() {} - -Status GdrServer::Init() { - RendezvousMgrCreationFunction rendezvous_mgr_func = - [this](const WorkerEnv* env) { - return new GdrRendezvousMgr(env, remote_memory_manager_.get()); - }; - WorkerCreationFunction worker_func = [this](WorkerEnv* env, - const ConfigProto& config) { - return std::unique_ptr( - new GdrWorker(env, config, remote_memory_manager_.get())); - }; - CollectiveMgrCreationFunction collective_mgr_func = - [this](const ConfigProto& config, const WorkerEnv* env, - WorkerCacheInterface* worker_cache) { - string unused; - string default_worker_name; - DeviceNameUtils::SplitDeviceName( - env->device_mgr->ListDevices()[0]->name(), &default_worker_name, - &unused); - - std::unique_ptr dev_resolver( - new DeviceResolverDistributed(env->device_mgr, worker_cache, - default_worker_name)); - std::unique_ptr param_resolver( - new CollectiveParamResolverDistributed( - config, env->device_mgr, dev_resolver.get(), worker_cache, - default_worker_name)); - return new GdrCollectiveExecutorMgr( - config, env->device_mgr, std::move(dev_resolver), - std::move(param_resolver), worker_cache, default_worker_name, - remote_memory_manager_.get()); - }; - TF_RETURN_IF_ERROR(remote_memory_manager_->Init()); - - GrpcServerOptions opts; - opts.rendezvous_mgr_func = rendezvous_mgr_func; - opts.collective_mgr_func = collective_mgr_func; - opts.worker_func = worker_func; - return GrpcServer::Init(opts); -} - -Status GdrServer::Start() { - { - mutex_lock l(mu_); - gdr_thread_.reset(worker_env()->env->StartThread( - ThreadOptions(), "TF_gdr_service", - [this] { remote_memory_manager_->Run(); })); - } - return GrpcServer::Start(); -} - -Status GdrServer::Stop() { - remote_memory_manager_->Stop(); - return GrpcServer::Stop(); -} - -Status GdrServer::Join() { - { - mutex_lock l(mu_); - gdr_thread_.reset(); - } - return GrpcServer::Join(); -} - -/* static */ -Status GdrServer::Create(const ServerDef& server_def, Env* env, - std::unique_ptr* out_server) { - std::unique_ptr ret( - new GdrServer(server_def, env == nullptr ? Env::Default() : env)); - TF_RETURN_IF_ERROR(ret->Init()); - *out_server = std::move(ret); - return Status::OK(); -} - -namespace { - -class GdrServerFactory : public ServerFactory { - public: - bool AcceptsOptions(const ServerDef& server_def) override { - return server_def.protocol() == "grpc+gdr"; - } - - Status NewServer(const ServerDef& server_def, - std::unique_ptr* out_server) override { - return GdrServer::Create(server_def, Env::Default(), out_server); - } -}; - -// Registers a `ServerFactory` for `GdrServer` instances. -class GdrServerRegistrar { - public: - GdrServerRegistrar() { - gpr_allocation_functions alloc_fns; - memset(&alloc_fns, 0, sizeof(alloc_fns)); - alloc_fns.malloc_fn = port::Malloc; - alloc_fns.realloc_fn = port::Realloc; - alloc_fns.free_fn = port::Free; - gpr_set_allocation_functions(alloc_fns); - ServerFactory::Register("GDR_SERVER", new GdrServerFactory()); - } -}; -static GdrServerRegistrar registrar; - -} // namespace -} // namespace tensorflow diff --git a/tensorflow/contrib/gdr/gdr_server_lib.h b/tensorflow/contrib/gdr/gdr_server_lib.h deleted file mode 100644 index efa2390d332279..00000000000000 --- a/tensorflow/contrib/gdr/gdr_server_lib.h +++ /dev/null @@ -1,52 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CONTRIB_GDR_GDR_SERVER_LIB_H_ -#define TENSORFLOW_CONTRIB_GDR_GDR_SERVER_LIB_H_ - -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" -#include "tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h" - -namespace tensorflow { - -class GdrServer : public GrpcServer { - protected: - GdrServer(const ServerDef& server_def, Env* env); - - public: - static Status Create(const ServerDef& server_def, Env* env, - std::unique_ptr* out_server); - - virtual ~GdrServer() override; - - virtual Status Start() override; - - virtual Status Stop() override; - - virtual Status Join() override; - - protected: - Status Init(); - - private: - mutex mu_; - - std::unique_ptr remote_memory_manager_; - std::unique_ptr gdr_thread_ GUARDED_BY(mu_); -}; - -} // namespace tensorflow - -#endif // TENSORFLOW_CONTRIB_GDR_GDR_SERVER_LIB_H_ diff --git a/tensorflow/contrib/gdr/gdr_worker.cc b/tensorflow/contrib/gdr/gdr_worker.cc deleted file mode 100644 index c08d2d1781c112..00000000000000 --- a/tensorflow/contrib/gdr/gdr_worker.cc +++ /dev/null @@ -1,189 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/contrib/gdr/gdr_worker.h" - -#include "tensorflow/core/common_runtime/buf_rendezvous.h" -#include "tensorflow/core/common_runtime/device.h" -#include "tensorflow/core/common_runtime/device_mgr.h" -#include "tensorflow/core/common_runtime/dma_helper.h" -#include "tensorflow/core/common_runtime/process_util.h" -#include "tensorflow/core/common_runtime/step_stats_collector.h" -#include "tensorflow/core/distributed_runtime/graph_mgr.h" -#include "tensorflow/core/distributed_runtime/rendezvous_mgr_interface.h" -#include "tensorflow/core/distributed_runtime/rpc/grpc_call.h" -#include "tensorflow/core/distributed_runtime/rpc/grpc_tensor_coding.h" -#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h" -#include "tensorflow/core/distributed_runtime/worker.h" -#include "tensorflow/core/distributed_runtime/worker_cache.h" -#include "tensorflow/core/distributed_runtime/worker_session.h" -#include "tensorflow/core/framework/cancellation.h" -#include "tensorflow/core/framework/collective.h" -#include "tensorflow/core/framework/tensor.h" -#include "tensorflow/core/lib/core/errors.h" -#include "tensorflow/core/platform/logging.h" -#include "tensorflow/core/platform/tracing.h" - -namespace tensorflow { - -GdrWorker::GdrWorker(WorkerEnv* worker_env, const ConfigProto& config, - RemoteMemoryManager* remote_memory_manager) - : GrpcWorker(worker_env, config), - remote_memory_manager_(remote_memory_manager), - recent_request_ids_(100000) {} - -void GdrWorker::GrpcRecvTensorAsync(CallOptions* opts, - const RecvTensorRequest* request, - ::grpc::ByteBuffer* response, - StatusCallback done) { - Status s = recent_request_ids_.TrackUnique( - request->request_id(), "RecvTensor (GdrWorker)", *request); - if (!s.ok()) { - done(s); - return; - } - - const int64 step_id = request->step_id(); - const string& key = request->rendezvous_key(); - TRACEPRINTF("RecvTensor: %lld %s", step_id, key.c_str()); - Rendezvous::ParsedKey parsed; - s = Rendezvous::ParseKey(key, &parsed); - Device* src_dev = nullptr; - if (s.ok()) { - s = PrepareRecvTensor(parsed, &src_dev); - } - if (!s.ok()) { - done(s); - return; - } - - // Request the tensor associated with the rendezvous key. Any time - // while waiting for the tensor to be produced, up until the start - // of execution of the callback lambda body below, an RPC - // cancellation should abort the rendezvous. - opts->SetCancelCallback([this, step_id]() { AbortStep(step_id); }); - const bool dma_ok = request->dma_ok(); - env_->rendezvous_mgr->RecvLocalAsync( - step_id, parsed, - [this, opts, response, done, src_dev, request, dma_ok]( - const Status& status, const Rendezvous::Args& send_args, - const Rendezvous::Args&, const Tensor& val, const bool is_dead) { - opts->ClearCancelCallback(); - if (status.ok()) { - // DMA can only be used for Tensors that do not fall into - // the following three odd edge cases: 1) a zero-size - // buffer, 2) a dead tensor which has an uninit value, and - // 3) the tensor has the on_host allocation attribute, - // i.e. it's in CPU RAM *independent of its assigned - // device type*. - const bool on_host = send_args.alloc_attrs.on_host(); - if (val.TotalBytes() > 1024 && (!is_dead) && - DMAHelper::CanUseDMA(&val) && dma_ok) { - // DMA cases. - RecvTensorResponse* proto = new RecvTensorResponse; - proto->set_is_dead(is_dead); - proto->set_send_start_micros(Env::Default()->NowMicros()); - TensorProto* tensor_proto = proto->mutable_tensor(); - tensor_proto->set_dtype(val.dtype()); - val.shape().AsProto(tensor_proto->mutable_tensor_shape()); - auto transport_options = proto->mutable_transport_options(); - remote_memory_manager_->TransportOptionsFromTensor( - transport_options, val, src_dev, send_args.device_context, - on_host, [proto, done, response](const Status& s) { - if (s.ok()) { - grpc::EncodeRecvTensorResponseToByteBuffer(*proto, - response); - done(Status::OK()); - } else { - done(s); - } - delete proto; - }); - } else { - // Non-DMA cases. - if (src_dev->tensorflow_gpu_device_info() && (!on_host)) { - DeviceContext* send_dev_context = send_args.device_context; - AllocatorAttributes alloc_attrs; - alloc_attrs.set_gpu_compatible(true); - alloc_attrs.set_on_host(true); - Allocator* alloc = src_dev->GetAllocator(alloc_attrs); - Tensor* copy = new Tensor(alloc, val.dtype(), val.shape()); - CHECK(send_dev_context) - << "send dev name: " << src_dev->name() - << " gpu_info: " << src_dev->tensorflow_gpu_device_info(); - // "val" is on an accelerator device. Uses the device_context to - // fill the copy on host. - StatusCallback copy_ready = [response, done, copy, - is_dead](const Status& s) { - // The value is now ready to be returned on the wire. - grpc::EncodeTensorToByteBuffer(is_dead, *copy, false, response); - done(s); - delete copy; - }; - - send_dev_context->CopyDeviceTensorToCPU( - &val, request->rendezvous_key(), src_dev, copy, copy_ready); - } else { - grpc::EncodeTensorToByteBuffer(is_dead, val, false, response); - done(Status::OK()); - } - } - } else { - // !s.ok() - done(status); - } - }); -} - -void GdrWorker::RecvBufAsync(CallOptions* opts, const RecvBufRequest* request, - RecvBufResponse* response, StatusCallback done) { - // This is an RDMA enabled implementation augmenting grpc. - Status s = recent_request_ids_.TrackUnique(request->request_id(), - "RecvBuf (GdrWorker)", *request); - if (!s.ok()) { - done(s); - return; - } - - CollectiveExecutor::Handle ce_handle( - env_->collective_executor_mgr->FindOrCreate(request->step_id()), true); - CollectiveRemoteAccess* rma = ce_handle.get()->remote_access(); - rma->buf_rendezvous()->ConsumeBuf( - request->buf_rendezvous_key(), request->src_device(), - request->src_incarnation(), - [this, request, response, done](const Status& status, - BufRendezvous::Hook* hook) { - Status s = status; - if (s.ok()) { - if (!DMAHelper::CanUseDMA(hook->prod_value)) { - s = errors::Internal("Tensor value for key ", - request->buf_rendezvous_key(), - " is not of a type supported by RecvBuf"); - } - } - if (s.ok()) { - remote_memory_manager_->TransportOptionsFromTensor( - response->mutable_transport_options(), *hook->prod_value, - hook->prod_dev, hook->prod_ctx, hook->prod_attr.on_host(), - [this, response, done, hook](const Status& s) { - response->set_send_start_micros(env_->env->NowMicros()); - done(s); - BufRendezvous::DoneWithHook(hook); - }); - } - }); -} - -} // namespace tensorflow diff --git a/tensorflow/contrib/gdr/gdr_worker.h b/tensorflow/contrib/gdr/gdr_worker.h deleted file mode 100644 index 9a85cfd4263ad8..00000000000000 --- a/tensorflow/contrib/gdr/gdr_worker.h +++ /dev/null @@ -1,52 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_CONTRIB_GDR_GDR_WORKER_H_ -#define TENSORFLOW_CONTRIB_GDR_GDR_WORKER_H_ - -#include "tensorflow/contrib/gdr/gdr_memory_manager.h" - -#include "tensorflow/core/distributed_runtime/recent_request_ids.h" -#include "tensorflow/core/distributed_runtime/rpc/grpc_worker_service.h" - -namespace tensorflow { - -class GdrWorker : public GrpcWorker { - public: - GdrWorker(WorkerEnv* env, const ConfigProto& config, - RemoteMemoryManager* remote_memory_manager); - - // Serve the RecvTensorRequest but omit the tensor content and transmit it - // out-of-band using GPU Direct RDMA whenever possible. - // If it's not possible, it falls back to gRPC in-band tensor transport by - // encoding the tensor content into the grpc::ByteBuffer. - // The RecvTensorResponse will carry the necessary information for RDMA. - virtual void GrpcRecvTensorAsync(CallOptions* opts, - const RecvTensorRequest* request, - ::grpc::ByteBuffer* response, - StatusCallback done) override; - - virtual void RecvBufAsync(CallOptions* opts, const RecvBufRequest* request, - RecvBufResponse* response, - StatusCallback done) override; - - private: - RemoteMemoryManager* remote_memory_manager_; // Not owned - RecentRequestIds recent_request_ids_; -}; - -} // namespace tensorflow - -#endif // TENSORFLOW_CONTRIB_GDR_GDR_WORKER_H_ diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 131c24088a3c26..18703df898962d 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -113,7 +113,6 @@ load( "tf_additional_device_tracer_cuda_deps", "tf_additional_device_tracer_deps", "tf_additional_device_tracer_test_flags", - "tf_additional_gdr_lib_defines", "tf_additional_human_readable_json_deps", "tf_additional_lib_defines", "tf_additional_lib_deps", @@ -2422,8 +2421,7 @@ LIB_INTERNAL_PUBLIC_HEADERS = [ LIB_INTERNAL_DEFINES = ( tf_additional_lib_defines() + [ "TF_USE_SNAPPY", - ] + tf_additional_gdr_lib_defines() + - tf_additional_numa_lib_defines() + ] + tf_additional_numa_lib_defines() ) cc_library( diff --git a/tensorflow/core/platform/default/build_config.bzl b/tensorflow/core/platform/default/build_config.bzl index ae5708785b878a..322eaa5bf958fb 100644 --- a/tensorflow/core/platform/default/build_config.bzl +++ b/tensorflow/core/platform/default/build_config.bzl @@ -731,12 +731,6 @@ def tf_lib_proto_compiler_deps(): "@com_google_protobuf//:protoc_lib", ] -def tf_additional_gdr_lib_defines(): - return select({ - "//tensorflow:with_gdr_support": ["TENSORFLOW_USE_GDR"], - "//conditions:default": [], - }) - def tf_additional_numa_lib_defines(): return select({ "//tensorflow:with_numa_support": ["TENSORFLOW_USE_NUMA"], diff --git a/tensorflow/core/platform/default/build_config_root.bzl b/tensorflow/core/platform/default/build_config_root.bzl index 448e17b82cbe56..2c0f73c8e905df 100644 --- a/tensorflow/core/platform/default/build_config_root.bzl +++ b/tensorflow/core/platform/default/build_config_root.bzl @@ -40,14 +40,6 @@ def tf_additional_license_deps(): "//conditions:default": [], }) -def tf_additional_gdr_deps(): - return select({ - str(Label("//tensorflow:with_gdr_support")): [ - str(Label("//tensorflow/contrib/gdr:gdr_server_lib")), - ], - "//conditions:default": [], - }) - # Include specific extra dependencies when building statically, or # another set of dependencies otherwise. If "macos" is provided, that # dependency list is used when using the framework_shared_object config diff --git a/tensorflow/python/BUILD b/tensorflow/python/BUILD index c69014d77f79f1..c8a380ea40daf1 100644 --- a/tensorflow/python/BUILD +++ b/tensorflow/python/BUILD @@ -26,7 +26,7 @@ load("//tensorflow:tensorflow.bzl", "tf_py_wrap_cc") load("//tensorflow:tensorflow.bzl", "cuda_py_test") load("//tensorflow:tensorflow.bzl", "cuda_py_tests") load("//tensorflow/core/platform:default/build_config.bzl", "pyx_library", "tf_additional_all_protos", "tf_additional_cupti_test_flags", "tf_additional_lib_deps", "tf_proto_library", "tf_proto_library_py", "tf_protos_grappler") # @unused -load("//tensorflow/core/platform:default/build_config_root.bzl", "if_static", "tf_additional_gdr_deps", "tf_additional_plugin_deps") +load("//tensorflow/core/platform:default/build_config_root.bzl", "if_static", "tf_additional_plugin_deps") load("//tensorflow/python:build_defs.bzl", "tf_gen_op_wrapper_private_py") load( "//third_party/ngraph:build_defs.bzl", @@ -5057,8 +5057,7 @@ tf_py_wrap_cc( "//tensorflow/lite/toco/python:toco_python_api", "//tensorflow/python/eager:pywrap_tfe_lib", ] + (tf_additional_lib_deps() + - tf_additional_plugin_deps() + - tf_additional_gdr_deps()) + if_ngraph([ + tf_additional_plugin_deps()) + if_ngraph([ "@ngraph_tf//:ngraph_tf", ]), )