Skip to content

Commit

Permalink
[2][resource reporting] Encapsulate poller and broadcaster into synce…
Browse files Browse the repository at this point in the history
…r in gcs (ray-project#22464)

This PR move the poller and broadcaster from gcs server to ray syncer. 

TODO in next PR: deprecate the code path of placement group resource reporting and move the broadcaster out of gcs cluster resource manager.
  • Loading branch information
fishbone authored Mar 2, 2022
1 parent 1752f17 commit 271ed44
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 181 deletions.
59 changes: 13 additions & 46 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
#include "ray/gcs/gcs_server/gcs_resource_manager.h"

#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_server/ray_syncer.h"
#include "ray/stats/metric_defs.h"

namespace ray {
namespace gcs {

GcsResourceManager::GcsResourceManager(
instrumented_io_context &main_io_service, std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
syncer::RaySyncer *ray_syncer)
: periodical_runner_(main_io_service),
gcs_publisher_(gcs_publisher),
gcs_table_storage_(gcs_table_storage),
max_broadcasting_batch_size_(
RayConfig::instance().resource_broadcast_batch_size()) {}
ray_syncer_(ray_syncer) {}

void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request,
rpc::GetResourcesReply *reply,
Expand Down Expand Up @@ -84,9 +85,7 @@ void GcsResourceManager::HandleUpdateResources(
node_resource_change.set_node_id(node_id.Binary());
node_resource_change.mutable_updated_resources()->insert(changed_resources->begin(),
changed_resources->end());
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_proto_.add_batch()->mutable_change()->Swap(&node_resource_change);

ray_syncer_->Update(std::move(node_resource_change));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(DEBUG) << "Finished updating resources, node id = " << node_id;
};
Expand Down Expand Up @@ -133,11 +132,7 @@ void GcsResourceManager::HandleDeleteResources(
for (const auto &resource_name : resource_names) {
node_resource_change.add_deleted_resources(resource_name);
}
{
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_proto_.add_batch()->mutable_change()->Swap(
&node_resource_change);
}
ray_syncer_->Update(std::move(node_resource_change));

GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Expand Down Expand Up @@ -168,31 +163,21 @@ void GcsResourceManager::HandleGetAllAvailableResources(

void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(data);

if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
UpdateNodeNormalTaskResources(node_id, *resources_data);
UpdateNodeNormalTaskResources(node_id, data);
} else {
if (node_resource_usages_.count(node_id) == 0 ||
resources_data->resources_available_changed()) {
const auto &resource_changed =
MapFromProtobuf(resources_data->resources_available());
if (node_resource_usages_.count(node_id) == 0 || data.resources_available_changed()) {
const auto &resource_changed = MapFromProtobuf(data.resources_available());
SetAvailableResources(node_id, ResourceSet(resource_changed));
}
}

UpdateNodeResourceUsage(node_id, data);

if (resources_data->should_global_gc() || resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed()) {
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_[node_id] = *resources_data;
// Clear the fields that will not be used by raylet.
resources_buffer_[node_id].clear_resource_load();
resources_buffer_[node_id].clear_resource_load_by_shape();
resources_buffer_[node_id].clear_resources_normal_task();
// TODO (iycheng): This will only happen in testing. We'll clean this code path
// in follow up PRs.
if (ray_syncer_ != nullptr) {
ray_syncer_->Update(data);
}
}

Expand Down Expand Up @@ -338,10 +323,6 @@ void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) {
}

void GcsResourceManager::OnNodeDead(const NodeID &node_id) {
{
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_.erase(node_id);
}
node_resource_usages_.erase(node_id);
cluster_scheduling_resources_.erase(node_id);
latest_resources_normal_task_timestamp_.erase(node_id);
Expand Down Expand Up @@ -372,20 +353,6 @@ bool GcsResourceManager::ReleaseResources(const NodeID &node_id,
return true;
}

void GcsResourceManager::GetResourceUsageBatchForBroadcast(
rpc::ResourceUsageBroadcastData &buffer) {
absl::MutexLock guard(&resource_buffer_mutex_);
resources_buffer_proto_.Swap(&buffer);
auto beg = resources_buffer_.begin();
auto ptr = beg;
for (size_t cnt = buffer.batch().size();
cnt < max_broadcasting_batch_size_ && cnt < resources_buffer_.size();
++ptr, ++cnt) {
buffer.add_batch()->mutable_data()->Swap(&ptr->second);
}
resources_buffer_.erase(beg, ptr);
}

void GcsResourceManager::UpdatePlacementGroupLoad(
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load) {
placement_group_load_ = absl::make_optional(placement_group_load);
Expand Down
32 changes: 11 additions & 21 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
namespace syncer {
class RaySyncer;
}
namespace gcs {

/// Gcs resource manager interface.
/// It is responsible for handing node resource related rpc requests and it is used for
/// actor and placement group scheduling. It obtains the available resources of nodes
Expand All @@ -43,7 +45,9 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsResourceManager(instrumented_io_context &main_io_service,
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
// TODO(iycheng): Remove sync from GcsResourceManager
syncer::RaySyncer *ray_syncer = nullptr);

virtual ~GcsResourceManager() {}

Expand Down Expand Up @@ -151,13 +155,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
void UpdatePlacementGroupLoad(
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load);

/// Move the lightweight heartbeat information for broadcast into the buffer. This
/// method MOVES the information, clearing an internal buffer, so it is NOT idempotent.
///
/// \param buffer return parameter
void GetResourceUsageBatchForBroadcast(rpc::ResourceUsageBroadcastData &buffer)
LOCKS_EXCLUDED(resource_buffer_mutex_);

private:
/// Delete the scheduling resources of the specified node.
///
Expand All @@ -171,16 +168,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
/// Newest resource usage of all nodes.
absl::flat_hash_map<NodeID, rpc::ResourcesData> node_resource_usages_;

/// Protect the lightweight heartbeat deltas which are accessed by different threads.
absl::Mutex resource_buffer_mutex_;
// TODO (Alex): This buffer is only needed for the legacy redis based broadcast.
/// A buffer containing the lightweight heartbeats since the last broadcast.
absl::flat_hash_map<NodeID, rpc::ResourcesData> resources_buffer_
GUARDED_BY(resource_buffer_mutex_);
/// A buffer containing the lightweight heartbeats since the last broadcast.
rpc::ResourceUsageBroadcastData resources_buffer_proto_
GUARDED_BY(resource_buffer_mutex_);

/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Storage for GCS tables.
Expand All @@ -196,8 +183,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
absl::flat_hash_map<NodeID, int64_t> latest_resources_normal_task_timestamp_;
/// The resources changed listeners.
std::vector<std::function<void()>> resources_changed_listeners_;
/// Max batch size for broadcasting
size_t max_broadcasting_batch_size_;

/// Debug info.
enum CountType {
Expand All @@ -210,6 +195,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
CountType_MAX = 6,
};
uint64_t counts_[CountType::CountType_MAX] = {0};

// For the updates from placement group, it needs to report to the syncer
// so it can be broadcasted to other nodes.
// TODO (iycheng): remove this one once we change how pg is reported.
syncer::RaySyncer *ray_syncer_;
};

} // namespace gcs
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_resource_report_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
Expand Down
76 changes: 43 additions & 33 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "ray/gcs/gcs_server/gcs_job_manager.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
#include "ray/pubsub/publisher.h"
Expand Down Expand Up @@ -119,6 +121,9 @@ void GcsServer::Start() {
}

void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init synchronization service
InitRaySyncer(gcs_init_data);

// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);

Expand Down Expand Up @@ -158,12 +163,6 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init stats handler.
InitStatsHandler();

// Init resource report polling.
InitResourceReportPolling(gcs_init_data);

// Init resource report broadcasting.
InitResourceReportBroadcasting(gcs_init_data);

// Install event listeners.
InstallEventListeners();

Expand Down Expand Up @@ -209,9 +208,7 @@ void GcsServer::Stop() {
// time, causing many nodes die after GCS's failure.
gcs_heartbeat_manager_->Stop();

gcs_resource_report_poller_->Stop();

grpc_based_resource_broadcaster_->Stop();
ray_syncer_->Stop();

// Shutdown the rpc server
rpc_server_.Shutdown();
Expand Down Expand Up @@ -256,7 +253,7 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
main_service_, gcs_publisher_, gcs_table_storage_);
main_service_, gcs_publisher_, gcs_table_storage_, ray_syncer_.get());

// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
Expand Down Expand Up @@ -403,24 +400,38 @@ void GcsServer::StoreGcsServerAddressInRedis() {
RAY_LOG(INFO) << "Finished setting gcs server address: " << address;
}

void GcsServer::InitResourceReportPolling(const GcsInitData &gcs_init_data) {
gcs_resource_report_poller_.reset(new GcsResourceReportPoller(
raylet_client_pool_, [this](const rpc::ResourcesData &report) {
gcs_resource_manager_->UpdateFromResourceReport(report);
}));
void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
/*
The current synchronization flow is:
syncer::poller -> gcs_resource_manager -> syncer::buffer -> syncer::broadcast
Ideally, we should make it:
syncer::poller --> gcs_resource_manager
|-> syncer::buffer -> syncer::broadcast
But right now, placement group resource update is preventing us from doing this.
TODO (iycheng): Update placement group resource reporting and update it to the
right flow.
*/
auto on_update = [this](const rpc::ResourcesData &report) {
main_service_.dispatch(
[this, report]() mutable {
gcs_resource_manager_->UpdateFromResourceReport(std::move(report));
},
"UpdateResourceReport");
};

gcs_resource_report_poller_->Initialize(gcs_init_data);
gcs_resource_report_poller_->Start();
}
auto gcs_resource_report_poller = std::make_unique<GcsResourceReportPoller>(
raylet_client_pool_, std::move(on_update));

void GcsServer::InitResourceReportBroadcasting(const GcsInitData &gcs_init_data) {
grpc_based_resource_broadcaster_ = std::make_unique<GrpcBasedResourceBroadcaster>(
raylet_client_pool_, [this](rpc::ResourceUsageBroadcastData &buffer) {
gcs_resource_manager_->GetResourceUsageBatchForBroadcast(buffer);
});
gcs_resource_report_poller->Initialize(gcs_init_data);

grpc_based_resource_broadcaster_->Initialize(gcs_init_data);
grpc_based_resource_broadcaster_->Start();
auto grpc_based_resource_broadcaster =
std::make_unique<GrpcBasedResourceBroadcaster>(raylet_client_pool_);
grpc_based_resource_broadcaster->Initialize(gcs_init_data);
ray_syncer_ = std::make_unique<syncer::RaySyncer>(
main_service_, std::move(grpc_based_resource_broadcaster),
std::move(gcs_resource_report_poller));
ray_syncer_->Start();
}

void GcsServer::InitStatsHandler() {
Expand Down Expand Up @@ -477,7 +488,8 @@ void GcsServer::InitRuntimeEnvManager() {
int protocol_len = protocol_end_pos - protocol_pos;
auto protocol = plugin_uri.substr(protocol_pos, protocol_len);
if (protocol != "gcs") {
// Some URIs do not correspond to files in the GCS. Skip deletion for these.
// Some URIs do not correspond to files in the GCS. Skip deletion for
// these.
callback(true);
} else {
auto uri = plugin_uri.substr(protocol_pos);
Expand Down Expand Up @@ -507,8 +519,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeAdd(NodeID::FromBinary(node->node_id()));
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
gcs_resource_report_poller_->HandleNodeAdded(*node);
grpc_based_resource_broadcaster_->HandleNodeAdded(*node);
ray_syncer_->AddNode(*node);
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
Expand All @@ -520,8 +531,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id, node_ip_address);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
gcs_resource_report_poller_->HandleNodeRemoved(*node);
grpc_based_resource_broadcaster_->HandleNodeRemoved(*node);
ray_syncer_->RemoveNode(*node);
});

// Install worker event listener.
Expand Down Expand Up @@ -551,8 +561,8 @@ void GcsServer::InstallEventListeners() {
gcs_resource_manager_->AddResourcesChangedListener([this] {
main_service_.post(
[this] {
// Because resources have been changed, we need to try to schedule the pending
// actors.
// Because resources have been changed, we need to try to schedule the
// pending actors.
gcs_actor_manager_->SchedulePendingActors();
},
"GcsServer.SchedulePendingActors");
Expand Down Expand Up @@ -586,7 +596,7 @@ std::string GcsServer::GetDebugState() const {
<< gcs_publisher_->DebugString() << "\n\n"
<< runtime_env_manager_->DebugString() << "\n\n";

stream << grpc_based_resource_broadcaster_->DebugString();
stream << ray_syncer_->DebugString();
return stream.str();
}

Expand Down
Loading

0 comments on commit 271ed44

Please sign in to comment.