Skip to content

Commit

Permalink
[gcs/ha] Fix cpp tests related to redis removal (ray-project#21628)
Browse files Browse the repository at this point in the history
This PR fixed cpp tests and also make ray cpp able to pass.
  • Loading branch information
fishbone authored Jan 19, 2022
1 parent 74d4e7c commit 82103bf
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 66 deletions.
8 changes: 7 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,18 @@
//streaming:all
- bash streaming/src/test/run_streaming_queue_test.sh

- label: ":cpp: Worker"
- label: ":cpp: Ray CPP Worker"
conditions: [ "RAY_CI_CPP_AFFECTED" ]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- ./ci/travis/ci.sh test_cpp

- label: ":redis: Ray CPP Worker"
conditions: [ "RAY_CI_CPP_AFFECTED" ]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- RAY_bootstrap_with_gcs=1 RAY_gcs_grpc_based_pubsub=1 RAY_gcs_storage=memory ./ci/travis/ci.sh test_cpp

- label: ":cpp: Tests"
conditions: [ "RAY_CI_CORE_CPP_AFFECTED" ]
commands:
Expand Down
17 changes: 10 additions & 7 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "config_internal.h"

#include <boost/dll/runtime_symbol_info.hpp>
#include <charconv>

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
Expand Down Expand Up @@ -62,7 +63,7 @@ namespace internal {

void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
if (!config.address.empty()) {
SetRedisAddress(config.address);
SetBootstrapAddress(config.address);
}
run_mode = config.local_mode ? RunMode::SINGLE_PROCESS : RunMode::CLUSTER;
if (!config.code_search_path.empty()) {
Expand All @@ -84,7 +85,7 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
absl::SkipEmpty());
}
if (!FLAGS_ray_address.CurrentValue().empty()) {
SetRedisAddress(FLAGS_ray_address.CurrentValue());
SetBootstrapAddress(FLAGS_ray_address.CurrentValue());
}
// Don't rewrite `ray_redis_password` when it is not set in the command line.
if (FLAGS_ray_redis_password.CurrentValue() !=
Expand Down Expand Up @@ -118,12 +119,12 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
startup_token = absl::GetFlag<int64_t>(FLAGS_startup_token);
}
if (worker_type == WorkerType::DRIVER && run_mode == RunMode::CLUSTER) {
if (redis_ip.empty()) {
if (bootstrap_ip.empty()) {
auto ray_address_env = std::getenv("RAY_ADDRESS");
if (ray_address_env) {
RAY_LOG(DEBUG) << "Initialize Ray cluster address to \"" << ray_address_env
<< "\" from environment variable \"RAY_ADDRESS\".";
SetRedisAddress(ray_address_env);
SetBootstrapAddress(ray_address_env);
}
}
if (code_search_path.empty()) {
Expand All @@ -145,11 +146,13 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
}
};

void ConfigInternal::SetRedisAddress(const std::string address) {
void ConfigInternal::SetBootstrapAddress(std::string_view address) {
auto pos = address.find(':');
RAY_CHECK(pos != std::string::npos);
redis_ip = address.substr(0, pos);
redis_port = std::stoi(address.substr(pos + 1, address.length()));
bootstrap_ip = address.substr(0, pos);
auto ret = std::from_chars(address.data() + pos + 1, address.data() + address.size(),
bootstrap_port);
RAY_CHECK(ret.ec == std::errc());
}
} // namespace internal
} // namespace ray
7 changes: 4 additions & 3 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>
#include <string>
#include <string_view>

#include "ray/core_worker/common.h"

Expand All @@ -33,9 +34,9 @@ class ConfigInternal {

RunMode run_mode = RunMode::SINGLE_PROCESS;

std::string redis_ip;
std::string bootstrap_ip;

int redis_port = 6379;
int bootstrap_port = 6379;

std::string redis_password = "5241590000000000";

Expand Down Expand Up @@ -66,7 +67,7 @@ class ConfigInternal {

void Init(RayConfig &config, int argc, char **argv);

void SetRedisAddress(const std::string address);
void SetBootstrapAddress(std::string_view address);

ConfigInternal(ConfigInternal const &) = delete;

Expand Down
22 changes: 14 additions & 8 deletions cpp/src/ray/runtime/native_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "./object/native_object_store.h"
#include "./object/object_store.h"
#include "./task/native_task_submitter.h"
#include "ray/common/ray_config.h"

namespace ray {
namespace internal {
Expand All @@ -28,19 +29,24 @@ NativeRayRuntime::NativeRayRuntime() {
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter());
task_executor_ = std::make_unique<TaskExecutor>();

auto redis_ip = ConfigInternal::Instance().redis_ip;
if (redis_ip.empty()) {
redis_ip = GetNodeIpAddress();
auto bootstrap_address = ConfigInternal::Instance().bootstrap_ip;
if (bootstrap_address.empty()) {
bootstrap_address = GetNodeIpAddress();
}
bootstrap_address =
bootstrap_address + ":" + std::to_string(ConfigInternal::Instance().bootstrap_port);
if (::RayConfig::instance().bootstrap_with_gcs()) {
global_state_accessor_ =
ProcessHelper::GetInstance().CreateGlobalStateAccessor(bootstrap_address);
} else {
global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor(
bootstrap_address, ConfigInternal::Instance().redis_password);
}
std::string redis_address =
redis_ip + ":" + std::to_string(ConfigInternal::Instance().redis_port);
global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor(
redis_address, ConfigInternal::Instance().redis_password);
}

ActorID NativeRayRuntime::GetCurrentActorID() {
return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
}

} // namespace internal
} // namespace ray
} // namespace ray
46 changes: 31 additions & 15 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <boost/algorithm/string.hpp>

#include "ray/common/ray_config.h"
#include "ray/util/process.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/gcs.pb.h"
Expand Down Expand Up @@ -48,6 +49,15 @@ void ProcessHelper::StopRayNode() {
return;
}

std::unique_ptr<ray::gcs::GlobalStateAccessor> ProcessHelper::CreateGlobalStateAccessor(
const std::string &gcs_address) {
ray::gcs::GcsClientOptions client_options(gcs_address);
auto global_state_accessor =
std::make_unique<ray::gcs::GlobalStateAccessor>(client_options);
RAY_CHECK(global_state_accessor->Connect()) << "Failed to connect to GCS.";
return global_state_accessor;
}

std::unique_ptr<ray::gcs::GlobalStateAccessor> ProcessHelper::CreateGlobalStateAccessor(
const std::string &redis_address, const std::string &redis_password) {
std::vector<std::string> address;
Expand All @@ -63,30 +73,34 @@ std::unique_ptr<ray::gcs::GlobalStateAccessor> ProcessHelper::CreateGlobalStateA
}

void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) {
std::string redis_ip = ConfigInternal::Instance().redis_ip;
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER && redis_ip.empty()) {
redis_ip = "127.0.0.1";
StartRayNode(ConfigInternal::Instance().redis_port,
ConfigInternal::Instance().redis_password,
std::string bootstrap_ip = ConfigInternal::Instance().bootstrap_ip;
int bootstrap_port = ConfigInternal::Instance().bootstrap_port;

if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER &&
bootstrap_ip.empty()) {
bootstrap_ip = "127.0.0.1";
StartRayNode(bootstrap_port, ConfigInternal::Instance().redis_password,
ConfigInternal::Instance().head_args);
}
if (redis_ip == "127.0.0.1") {
redis_ip = GetNodeIpAddress();
if (bootstrap_ip == "127.0.0.1") {
bootstrap_ip = GetNodeIpAddress();
}

std::string redis_address =
redis_ip + ":" + std::to_string(ConfigInternal::Instance().redis_port);
std::string bootstrap_address = bootstrap_ip + ":" + std::to_string(bootstrap_port);
std::string node_ip = ConfigInternal::Instance().node_ip_address;
if (node_ip.empty()) {
if (!ConfigInternal::Instance().redis_ip.empty()) {
node_ip = GetNodeIpAddress(redis_address);
if (!bootstrap_ip.empty()) {
node_ip = GetNodeIpAddress(bootstrap_address);
} else {
node_ip = GetNodeIpAddress();
}
}

std::unique_ptr<ray::gcs::GlobalStateAccessor> global_state_accessor =
CreateGlobalStateAccessor(redis_address, ConfigInternal::Instance().redis_password);
::RayConfig::instance().bootstrap_with_gcs()
? CreateGlobalStateAccessor(bootstrap_address)
: CreateGlobalStateAccessor(bootstrap_address,
ConfigInternal::Instance().redis_password);
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER) {
std::string node_to_connect;
auto status =
Expand All @@ -113,8 +127,10 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
}

gcs::GcsClientOptions gcs_options =
gcs::GcsClientOptions(redis_ip, ConfigInternal::Instance().redis_port,
ConfigInternal::Instance().redis_password);
::RayConfig::instance().bootstrap_with_gcs()
? gcs::GcsClientOptions(bootstrap_address)
: gcs::GcsClientOptions(bootstrap_ip, ConfigInternal::Instance().bootstrap_port,
ConfigInternal::Instance().redis_password);

CoreWorkerOptions options;
options.worker_type = ConfigInternal::Instance().worker_type;
Expand Down Expand Up @@ -152,7 +168,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)

void ProcessHelper::RayStop() {
CoreWorkerProcess::Shutdown();
if (ConfigInternal::Instance().redis_ip.empty()) {
if (ConfigInternal::Instance().bootstrap_ip.empty()) {
StopRayNode();
}
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/ray/util/process_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class ProcessHelper {
std::unique_ptr<ray::gcs::GlobalStateAccessor> CreateGlobalStateAccessor(
const std::string &redis_address, const std::string &redis_password);

std::unique_ptr<ray::gcs::GlobalStateAccessor> CreateGlobalStateAccessor(
const std::string &gcs_address);

ProcessHelper(ProcessHelper const &) = delete;
void operator=(ProcessHelper const &) = delete;

Expand Down
11 changes: 6 additions & 5 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1578,8 +1578,9 @@ def start_raylet(redis_address,

if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
cpp_worker_command = build_cpp_worker_command(
"", redis_address, plasma_store_name, raylet_name, redis_password,
session_dir, log_dir, node_ip_address)
"", gcs_address
if use_gcs_for_bootstrap() else redis_address, plasma_store_name,
raylet_name, redis_password, session_dir, log_dir, node_ip_address)
else:
cpp_worker_command = []

Expand Down Expand Up @@ -1778,14 +1779,14 @@ def build_java_worker_command(
return command


def build_cpp_worker_command(cpp_worker_options, redis_address,
def build_cpp_worker_command(cpp_worker_options, bootstrap_address,
plasma_store_name, raylet_name, redis_password,
session_dir, log_dir, node_ip_address):
"""This method assembles the command used to start a CPP worker.
Args:
cpp_worker_options (list): The command options for CPP worker.
redis_address (str): Redis address of GCS.
bootstrap_address (str): The bootstrap address of the cluster.
plasma_store_name (str): The name of the plasma store socket to connect
to.
raylet_name (str): The name of the raylet socket to create.
Expand All @@ -1802,7 +1803,7 @@ def build_cpp_worker_command(cpp_worker_options, redis_address,
f"--ray_plasma_store_socket_name={plasma_store_name}",
f"--ray_raylet_socket_name={raylet_name}",
"--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
f"--ray_address={redis_address}",
f"--ray_address={bootstrap_address}",
f"--ray_redis_password={redis_password}",
f"--ray_session_dir={session_dir}",
f"--ray_logs_dir={log_dir}",
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ class GcsClientOptions {
password_(password),
enable_sync_conn_(enable_sync_conn),
enable_async_conn_(enable_async_conn),
enable_subscribe_conn_(enable_subscribe_conn) {}
enable_subscribe_conn_(enable_subscribe_conn) {
RAY_LOG(DEBUG) << "Connect to gcs server via redis: " << redis_ip << ":"
<< redis_port;
}

/// Constructor of GcsClientOptions from gcs address
///
Expand All @@ -63,6 +66,7 @@ class GcsClientOptions {
RAY_CHECK(address.size() == 2);
gcs_address_ = address[0];
gcs_port_ = std::stoi(address[1]);
RAY_LOG(DEBUG) << "Connect to gcs server via address: " << gcs_address;
}

GcsClientOptions() {}
Expand Down
Loading

0 comments on commit 82103bf

Please sign in to comment.