Skip to content

Commit

Permalink
Merge pull request grpc#16044 from ncteisen/interop-soak-test
Browse files Browse the repository at this point in the history
Add Two New Soak Interop Tests
  • Loading branch information
ncteisen authored Jul 18, 2018
2 parents 0418b3a + 21fa790 commit 2d5e0f1
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 34 deletions.
19 changes: 19 additions & 0 deletions doc/interop-test-descriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,25 @@ Status: TODO
This test verifies that a client sending faster than a server can drain sees
pushback (i.e., attempts to send succeed only after appropriate delays).
### Experimental Tests
These tests are not yet standardized, and are not yet implemented in all
languages. Therefore they are not part of our interop matrix.
#### rpc_soak
The client performs many large_unary RPCs in sequence over the same channel.
The number of RPCs is configured by the experimental flag, `soak_iterations`.
#### channel_soak
The client performs many large_unary RPCs in sequence. Before each RPC, it
tears down and rebuilds the channel. The number of RPCs is configured by
the experimental flag, `soak_iterations`.
This tests puts stress on several gRPC components; the resolver, the load
balancer, and the RPC hotpath.
### TODO Tests
#### High priority:
Expand Down
16 changes: 14 additions & 2 deletions test/cpp/interop/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ DEFINE_string(
"all : all test cases;\n"
"cancel_after_begin : cancel stream after starting it;\n"
"cancel_after_first_response: cancel on first response;\n"
"channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each time;\n"
"client_compressed_streaming : compressed request streaming with "
"client_compressed_unary : single compressed request;\n"
"client_streaming : request streaming with single response;\n"
Expand All @@ -60,6 +61,7 @@ DEFINE_string(
"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
"ping_pong : full-duplex streaming;\n"
"response streaming;\n"
"rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
"server_compressed_streaming : single request with compressed "
"server_compressed_unary : single compressed response;\n"
"server_streaming : single request with response streaming;\n"
Expand All @@ -83,6 +85,10 @@ DEFINE_bool(do_not_abort_on_transient_failures, false,
"test is retried in case of transient failures (and currently the "
"interop tests are not retried even if this flag is set to true)");

DEFINE_int32(soak_iterations, 1000,
"number of iterations to use for the two soak tests; rpc_soak and "
"channel_soak");

using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
using grpc::testing::UpdateActions;
Expand All @@ -91,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
true,
grpc::testing::ChannelCreationFunc channel_creation_func =
std::bind(&CreateChannelForTestCase, FLAGS_test_case);
grpc::testing::InteropClient client(channel_creation_func, true,
FLAGS_do_not_abort_on_transient_failures);

std::unordered_map<grpc::string, std::function<bool()>> actions;
Expand Down Expand Up @@ -151,6 +158,11 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
actions["cacheable_unary"] =
std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
actions["channel_soak"] =
std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client,
FLAGS_soak_iterations);
actions["rpc_soak"] = std::bind(&grpc::testing::InteropClient::DoRpcSoakTest,
&client, FLAGS_soak_iterations);

UpdateActions(&actions);

Expand Down
64 changes: 44 additions & 20 deletions test/cpp/interop/interop_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
}
} // namespace

InteropClient::ServiceStub::ServiceStub(const std::shared_ptr<Channel>& channel,
bool new_stub_every_call)
: channel_(channel), new_stub_every_call_(new_stub_every_call) {
InteropClient::ServiceStub::ServiceStub(
ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
: channel_creation_func_(channel_creation_func),
channel_(channel_creation_func_()),
new_stub_every_call_(new_stub_every_call) {
// If new_stub_every_call is false, then this is our chance to initialize
// stub_. (see Get())
if (!new_stub_every_call) {
stub_ = TestService::NewStub(channel);
stub_ = TestService::NewStub(channel_);
}
}

Expand All @@ -100,27 +102,17 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() {
return unimplemented_service_stub_.get();
}

void InteropClient::ServiceStub::Reset(
const std::shared_ptr<Channel>& channel) {
channel_ = channel;

// Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
// the stub_ since the next call to Get() will create a new stub
if (new_stub_every_call_) {
stub_.reset();
} else {
stub_ = TestService::NewStub(channel);
void InteropClient::ServiceStub::ResetChannel() {
channel_ = channel_creation_func_();
if (!new_stub_every_call_) {
stub_ = TestService::NewStub(channel_);
}
}

void InteropClient::Reset(const std::shared_ptr<Channel>& channel) {
serviceStub_.Reset(std::move(channel));
}

InteropClient::InteropClient(const std::shared_ptr<Channel>& channel,
InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures)
: serviceStub_(std::move(channel), new_stub_every_test_case),
: serviceStub_(channel_creation_func, new_stub_every_test_case),
do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}

bool InteropClient::AssertStatusOk(const Status& s,
Expand Down Expand Up @@ -1028,6 +1020,38 @@ bool InteropClient::DoCustomMetadata() {
return true;
}

bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) {
gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
GPR_ASSERT(soak_iterations > 0);
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < soak_iterations; ++i) {
if (!PerformLargeUnary(&request, &response)) {
gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i);
return false;
}
}
gpr_log(GPR_DEBUG, "rpc_soak test done.");
return true;
}

bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) {
gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
soak_iterations);
GPR_ASSERT(soak_iterations > 0);
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < soak_iterations; ++i) {
serviceStub_.ResetChannel();
if (!PerformLargeUnary(&request, &response)) {
gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i);
return false;
}
}
gpr_log(GPR_DEBUG, "channel_soak test done.");
return true;
}

bool InteropClient::DoUnimplementedService() {
gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");

Expand Down
20 changes: 16 additions & 4 deletions test/cpp/interop/interop_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ typedef std::function<void(const InteropClientContextInspector&,
const SimpleRequest*, const SimpleResponse*)>
CheckerFn;

typedef std::function<std::shared_ptr<Channel>(void)> ChannelCreationFunc;

class InteropClient {
public:
/// If new_stub_every_test_case is true, a new TestService::Stub object is
/// created for every test case
/// If do_not_abort_on_transient_failures is true, abort() is not called in
/// case of transient failures (like connection failures)
explicit InteropClient(const std::shared_ptr<Channel>& channel,
explicit InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures);
~InteropClient() {}
Expand All @@ -67,6 +69,14 @@ class InteropClient {
bool DoUnimplementedMethod();
bool DoUnimplementedService();
bool DoCacheableUnary();

// The following interop test are not yet part of the interop spec, and are
// not implemented cross-language. They are considered experimental for now,
// but at some point in the future, might be codified and implemented in all
// languages
bool DoChannelSoakTest(int32_t soak_iterations);
bool DoRpcSoakTest(int32_t soak_iterations);

// Auth tests.
// username is a string containing the user email
bool DoJwtTokenCreds(const grpc::string& username);
Expand All @@ -83,15 +93,17 @@ class InteropClient {
public:
// If new_stub_every_call = true, pointer to a new instance of
// TestServce::Stub is returned by Get() everytime it is called
ServiceStub(const std::shared_ptr<Channel>& channel,
ServiceStub(ChannelCreationFunc channel_creation_func,
bool new_stub_every_call);

TestService::Stub* Get();
UnimplementedService::Stub* GetUnimplementedServiceStub();

void Reset(const std::shared_ptr<Channel>& channel);
// forces channel to be recreated.
void ResetChannel();

private:
ChannelCreationFunc channel_creation_func_;
std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
std::shared_ptr<Channel> channel_;
Expand All @@ -109,8 +121,8 @@ class InteropClient {
bool AssertStatusCode(const Status& s, StatusCode expected_code,
const grpc::string& optional_debug_string);
bool TransientFailureOrAbort();
ServiceStub serviceStub_;

ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_;
};
Expand Down
6 changes: 3 additions & 3 deletions test/cpp/interop/stress_interop_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const {

StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel,
ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
interop_client_(new InteropClient(channel, false,
channel_creation_func_(channel_creation_func),
interop_client_(new InteropClient(channel_creation_func_, false,
do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
Expand Down
4 changes: 2 additions & 2 deletions test/cpp/interop/stress_interop_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class WeightedRandomTestSelector {
class StressTestInteropClient {
public:
StressTestInteropClient(int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel,
ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures);
Expand All @@ -105,7 +105,7 @@ class StressTestInteropClient {

int test_id_;
const grpc::string& server_address_;
std::shared_ptr<Channel> channel_;
ChannelCreationFunc channel_creation_func_;
std::unique_ptr<InteropClient> interop_client_;
const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_;
Expand Down
11 changes: 8 additions & 3 deletions test/cpp/interop/stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,20 @@ int main(int argc, char** argv) {
channel_idx++) {
gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
channel_idx);
std::shared_ptr<grpc::Channel> channel = grpc::CreateTestChannel(
grpc::testing::ChannelCreationFunc channel_creation_func = std::bind(
static_cast<std::shared_ptr<grpc::Channel> (*)(
const grpc::string&, const grpc::string&,
grpc::testing::transport_security, bool)>(
grpc::CreateTestChannel),
*it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);

// Create stub(s) for each channel
for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
stub_idx++) {
clients.emplace_back(new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures));
++thread_idx, *it, channel_creation_func, test_selector,
FLAGS_test_duration_secs, FLAGS_sleep_duration_ms,
FLAGS_do_not_abort_on_transient_failures));

bool is_already_created = false;
// QpsGauge name
Expand Down

0 comments on commit 2d5e0f1

Please sign in to comment.