Skip to content

Commit

Permalink
[memory test] add benchmarks with xDS enabled (grpc#34785)
Browse files Browse the repository at this point in the history
Initial results:

```
default: call/client: 15038.38208
default: call/server: 12873.64608
default: call/xds_client: 15850.04544
default: call/xds_server: 13177.15968
default: channel/client: 27747.9424
default: channel/server: 19939.328
default: channel/xds_client: 45272.2688
default: channel/xds_server: 20805.632
```

b/297028827
  • Loading branch information
markdroth authored Oct 27, 2023
1 parent 341c52e commit e0a5190
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 56 deletions.
3 changes: 3 additions & 0 deletions test/core/memory_usage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ grpc_cc_binary(
"//:gpr",
"//:grpc",
"//src/core:channel_args",
"//src/core:xds_enabled_server",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
Expand Down Expand Up @@ -162,6 +163,7 @@ grpc_cc_test(
"//:subprocess",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
"//test/cpp/end2end/xds:xds_utils",
],
)

Expand All @@ -182,5 +184,6 @@ grpc_cc_binary(
"//:subprocess",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
"//test/cpp/end2end/xds:xds_utils",
],
)
7 changes: 5 additions & 2 deletions test/core/memory_usage/callback_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/match.h"

#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
Expand Down Expand Up @@ -160,12 +161,14 @@ int main(int argc, char** argv) {
std::chrono::milliseconds(1)));
}

const char* prefix = "";
if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
printf("---------Client channel stats--------\n");
printf("client channel memory usage: %f bytes per channel\n",
printf("%sclient channel memory usage: %f bytes per channel\n", prefix,
static_cast<double>(peak_client_memory - before_client_memory) / size *
1024);
printf("---------Server channel stats--------\n");
printf("server channel memory usage: %f bytes per channel\n",
printf("%sserver channel memory usage: %f bytes per channel\n", prefix,
static_cast<double>(peak_server_memory - before_server_memory) / size *
1024);
gpr_log(GPR_INFO, "Client Done");
Expand Down
20 changes: 13 additions & 7 deletions test/core/memory_usage/callback_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/status.h>
#include <grpcpp/xds_server_builder.h>

#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h"

ABSL_FLAG(std::string, bind, "", "Bind host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
ABSL_FLAG(bool, use_xds, false, "Use xDS");

class ServerCallbackImpl final
: public grpc::testing::BenchmarkService::CallbackService {
public:
Expand Down Expand Up @@ -69,9 +74,6 @@ class ServerCallbackImpl final
// TODO(chennancy) Add graceful shutdown
static void sigint_handler(int /*x*/) { _exit(0); }

ABSL_FLAG(std::string, bind, "", "Bind host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");

int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
char* fake_argv[1];
Expand All @@ -90,7 +92,11 @@ int main(int argc, char** argv) {
// Get initial process memory usage before creating server
long before_server_create = GetMemUsage();
ServerCallbackImpl callback_server(before_server_create);
grpc::ServerBuilder builder;

grpc::XdsServerBuilder xds_builder;
grpc::ServerBuilder normal_builder;
grpc::ServerBuilder* builder =
absl::GetFlag(FLAGS_use_xds) ? &xds_builder : &normal_builder;

// Set the authentication mechanism.
std::shared_ptr<grpc::ServerCredentials> creds =
Expand All @@ -99,11 +105,11 @@ int main(int argc, char** argv) {
gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
// TODO (chennancy) Add in secure credentials
}
builder.AddListeningPort(server_address, creds);
builder.RegisterService(&callback_server);
builder->AddListeningPort(server_address, creds);
builder->RegisterService(&callback_server);

// Set up the server to start accepting requests.
std::shared_ptr<grpc::Server> server(builder.BuildAndStart());
std::shared_ptr<grpc::Server> server(builder->BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.c_str());

// Keep the program running until the server shuts down.
Expand Down
13 changes: 11 additions & 2 deletions test/core/memory_usage/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/match.h"

#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
Expand All @@ -41,6 +42,7 @@

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h"

Expand Down Expand Up @@ -158,6 +160,11 @@ static MemStats send_snapshot_request(int call_idx, grpc_slice call_type) {
(void*)nullptr, nullptr));
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);

gpr_log(GPR_INFO, "Call %d status %d (%s)", call_idx, calls[call_idx].status,
std::string(grpc_core::StringViewFromSlice(calls[call_idx].details))
.c_str());

GPR_ASSERT(response_payload_recv != nullptr);
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, response_payload_recv);
grpc_slice response = grpc_byte_buffer_reader_readall(&reader);
Expand Down Expand Up @@ -282,14 +289,16 @@ int main(int argc, char** argv) {
grpc_completion_queue_destroy(cq);
grpc_shutdown_blocking();

const char* prefix = "";
if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
printf("---------client stats--------\n");
printf("client call memory usage: %f bytes per call\n",
printf("%sclient call memory usage: %f bytes per call\n", prefix,
static_cast<double>(client_calls_inflight.rss -
client_benchmark_calls_start.rss) /
benchmark_iterations * 1024);

printf("---------server stats--------\n");
printf("server call memory usage: %f bytes per call\n",
printf("%sserver call memory usage: %f bytes per call\n", prefix,
static_cast<double>(server_calls_inflight.rss -
server_benchmark_calls_start.rss) /
benchmark_iterations * 1024);
Expand Down
121 changes: 109 additions & 12 deletions test/core/memory_usage/memory_usage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
//
//

#include <stdint.h>
#include <stdio.h>
#include <string.h>

#include <algorithm>
#include <iterator>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -32,15 +35,26 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "google/protobuf/wrappers.pb.h"

#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>

#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gpr/subprocess.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/env.h"
#include "src/proto/grpc/testing/xds/v3/cluster.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/xds/xds_server.h"
#include "test/cpp/end2end/xds/xds_utils.h"

using grpc::testing::XdsResourceUtils;

ABSL_FLAG(std::string, benchmark_names, "call,channel",
"Which benchmark to run"); // Default all benchmarks in order to
Expand All @@ -51,6 +65,9 @@ ABSL_FLAG(std::string, scenario_config, "insecure",
"secure (Use SSL credentials on server)");
ABSL_FLAG(bool, memory_profiling, false,
"Run memory profiling"); // TODO (chennancy) Connect this flag
ABSL_FLAG(bool, use_xds, false, "Use xDS");

// TODO(roth, ctiller): Add support for multiple addresses per channel.

class Subprocess {
public:
Expand All @@ -74,35 +91,46 @@ class Subprocess {
};

// per-call memory usage benchmark
int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags,
int RunCallBenchmark(int port, char* root,
std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
int status;
int port = grpc_pick_unused_port_or_die();

// start the server
gpr_log(GPR_INFO, "starting server");
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_server",
gpr_subprocess_binary_extension()),
"--grpc_experiments",
std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind",
grpc_core::JoinHostPort("::", port)};
grpc_core::LocalIpAndPort(port)};
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
// Add scenario-specific server flags to the end of the server_flags
absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
Subprocess svr(server_flags);
gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());

// Wait one second before starting client to give the server a chance
// to start up.
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));

// start the client
gpr_log(GPR_INFO, "starting client");
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_client",
gpr_subprocess_binary_extension()),
"--target",
grpc_core::JoinHostPort("localhost", port),
absl::GetFlag(FLAGS_use_xds)
? absl::StrCat("xds:", XdsResourceUtils::kServerName)
: grpc_core::LocalIpAndPort(port),
"--grpc_experiments",
std::string(grpc_core::ConfigVars::Get().Experiments()),
absl::StrCat("--warmup=", 10000),
absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))};
// Add scenario-specific client flags to the end of the client_flags
absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
Subprocess cli(client_flags);
gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
// wait for completion
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
Expand All @@ -114,32 +142,38 @@ int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags,
}

// Per-channel benchmark
int RunChannelBenchmark(char* root) {
int RunChannelBenchmark(int port, char* root) {
// TODO(chennancy) Add the scenario specific flags
int status;
int port = grpc_pick_unused_port_or_die();

// start the server
gpr_log(GPR_INFO, "starting server");
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_callback_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::JoinHostPort("::", port)};
"--bind", grpc_core::LocalIpAndPort(port)};
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
Subprocess svr(server_flags);
gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());

// Wait one second before starting client to avoid possible race condition
// of client sending an RPC before the server is set up
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));

// start the client
gpr_log(GPR_INFO, "starting client");
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_callback_client",
gpr_subprocess_binary_extension()),
"--target",
grpc_core::JoinHostPort("localhost", port),
absl::GetFlag(FLAGS_use_xds)
? absl::StrCat("xds:", XdsResourceUtils::kServerName)
: grpc_core::LocalIpAndPort(port),
"--nosecure",
absl::StrCat("--server_pid=", svr.GetPID()),
absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
Subprocess cli(client_flags);
gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
// wait for completion
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
Expand All @@ -149,17 +183,78 @@ int RunChannelBenchmark(char* root) {
return svr.Join() == 0 ? 0 : 2;
}

struct XdsServer {
std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service;
std::unique_ptr<grpc::Server> server;
};

XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
XdsServer xds_server;
int xds_server_port = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port);
// Generate xDS bootstrap and set the env var.
std::string bootstrap =
grpc::testing::XdsBootstrapBuilder()
.SetDefaultServer(absl::StrCat("localhost:", xds_server_port))
.SetXdsChannelCredentials("insecure")
.Build();
grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap);
gpr_log(GPR_INFO, "xDS bootstrap: %s", bootstrap.c_str());
// Create ADS service.
xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
xds_server.ads_service->Start();
// Populate xDS resources.
XdsResourceUtils::SetListenerAndRouteConfiguration(
xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
XdsResourceUtils::DefaultRouteConfig());
auto cluster = XdsResourceUtils::DefaultCluster();
cluster.mutable_circuit_breakers()
->add_thresholds()
->mutable_max_requests()
->set_value(std::numeric_limits<uint32_t>::max());
xds_server.ads_service->SetCdsResource(cluster);
xds_server.ads_service->SetEdsResource(
XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
{XdsResourceUtils::EdsResourceArgs::Locality(
"here",
{XdsResourceUtils::EdsResourceArgs::Endpoint(server_port)})})));
XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
server_port, XdsResourceUtils::DefaultServerRouteConfig());
// Create and start server.
gpr_log(GPR_INFO, "starting xDS server...");
grpc::ServerBuilder builder;
builder.RegisterService(xds_server.ads_service.get());
builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port),
grpc::InsecureServerCredentials());
xds_server.server = builder.BuildAndStart();
gpr_log(GPR_INFO, "xDS server started");
return xds_server;
}

int RunBenchmark(char* root, absl::string_view benchmark,
std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str());
int server_port = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "server port: %d", server_port);
XdsServer xds_server;
if (absl::GetFlag(FLAGS_use_xds)) {
xds_server = StartXdsServerAndConfigureBootstrap(server_port);
}
int retval;
if (benchmark == "call") {
return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags);
retval = RunCallBenchmark(server_port, root, server_scenario_flags,
client_scenario_flags);
} else if (benchmark == "channel") {
return RunChannelBenchmark(root);
retval = RunChannelBenchmark(server_port, root);
} else {
gpr_log(GPR_INFO, "Not a valid benchmark name");
return 4;
retval = 4;
}
if (xds_server.server != nullptr) xds_server.server->Shutdown();
gpr_log(GPR_INFO, "done running benchmark");
return retval;
}

int main(int argc, char** argv) {
Expand Down Expand Up @@ -199,10 +294,12 @@ int main(int argc, char** argv) {
// Run all benchmarks listed (Multiple benchmarks usually only for default
// scenario)
auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ',');
grpc_init();
for (const auto& benchmark : benchmarks) {
int r = RunBenchmark(root, benchmark, it_scenario->second.server,
it_scenario->second.client);
if (r != 0) return r;
}
grpc_shutdown();
return 0;
}
Loading

0 comments on commit e0a5190

Please sign in to comment.