Skip to content

Commit

Permalink
[test] Fix http_proxy end2end test fixture (grpc#34838)
Browse files Browse the repository at this point in the history
Built upon @Vignesh2208 's work in grpc#33156

This adds ref counting to the http_proxy fixture object, fixing test
flakes identified by the introduction of EventEngine listeners. Proxy
objects were either being deleted twice, or sometimes not at all,
resulting in two different sorts of flakes.
  • Loading branch information
drfloob authored Nov 1, 2023
1 parent 15037d7 commit c85cdfe
Showing 1 changed file with 50 additions and 25 deletions.
75 changes: 50 additions & 25 deletions test/core/end2end/fixtures/http_proxy_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <limits.h>
#include <string.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -67,22 +67,42 @@

struct grpc_end2end_http_proxy {
grpc_end2end_http_proxy()
: server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) {
gpr_ref_init(&users, 1);
combiner = grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine());
}
: combiner(grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine())) {}
std::string proxy_name;
std::atomic<bool> is_shutdown{false};
std::atomic<size_t> users{1};
grpc_core::Thread thd;
grpc_tcp_server* server;
const grpc_channel_args* channel_args;
gpr_mu* mu;
grpc_tcp_server* server = nullptr;
const grpc_channel_args* channel_args = nullptr;
gpr_mu* mu = nullptr;
std::vector<grpc_pollset*> pollset;
gpr_refcount users;

grpc_core::Combiner* combiner;
grpc_core::Combiner* combiner = nullptr;
};

namespace {

// Sometimes, on_accept may be called after thread_main has returned, and the
// proxy will have already been destroyed. This value is reset every time a
// proxy fixture is created, and it prevents a crash due to a repeated unref.
std::atomic<bool> proxy_destroyed{false};

void proxy_ref(grpc_end2end_http_proxy* proxy) { proxy->users.fetch_add(1); }

// Returns the remaining number of outstanding refs
size_t proxy_unref(grpc_end2end_http_proxy* proxy) {
if (proxy_destroyed.load()) return -1;
size_t ref_count = proxy->users.fetch_sub(1) - 1;
if (ref_count == 0) {
proxy_destroyed.store(true);
GRPC_COMBINER_UNREF(proxy->combiner, "test");
delete proxy;
}
return ref_count;
}

} // namespace

//
// Connection handling
//
Expand Down Expand Up @@ -139,8 +159,6 @@ static void proxy_connection_ref(proxy_connection* conn,
static void proxy_connection_unref(proxy_connection* conn,
const char* /*reason*/) {
if (gpr_unref(&conn->refcount)) {
gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint,
conn->server_endpoint);
grpc_endpoint_destroy(conn->client_endpoint);
if (conn->server_endpoint != nullptr) {
grpc_endpoint_destroy(conn->server_endpoint);
Expand All @@ -154,7 +172,7 @@ static void proxy_connection_unref(proxy_connection* conn,
grpc_slice_buffer_destroy(&conn->server_write_buffer);
grpc_http_parser_destroy(&conn->http_parser);
grpc_http_request_destroy(&conn->http_request);
gpr_unref(&conn->proxy->users);
proxy_unref(conn->proxy);
gpr_free(conn);
}
}
Expand Down Expand Up @@ -589,9 +607,15 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
proxy_ref(proxy);
if (proxy->is_shutdown.load()) {
grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown"));
grpc_endpoint_destroy(endpoint);
proxy_unref(proxy);
return;
}
// Instantiate proxy_connection.
proxy_connection* conn = grpc_core::Zalloc<proxy_connection>();
gpr_ref(&proxy->users);
conn->client_endpoint = endpoint;
conn->proxy = proxy;
gpr_ref_init(&conn->refcount, 1);
Expand Down Expand Up @@ -623,22 +647,24 @@ static void thread_main(void* arg) {
grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
grpc_core::ExecCtx exec_ctx;
do {
gpr_ref(&proxy->users);
proxy_ref(proxy);
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(proxy->mu);
GRPC_LOG_IF_ERROR("grpc_pollset_work",
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::Timestamp::Now() +
grpc_core::Duration::Seconds(1)));
GRPC_LOG_IF_ERROR(
"grpc_pollset_work",
grpc_pollset_work(proxy->pollset[0], &worker,
grpc_core::Timestamp::Now() +
grpc_core::Duration::Milliseconds(100)));
gpr_mu_unlock(proxy->mu);
grpc_core::ExecCtx::Get()->Flush();
} while (!gpr_unref(&proxy->users));
} while (proxy_unref(proxy) > 1 || !proxy->is_shutdown.load());
}

grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
const grpc_channel_args* args) {
grpc_core::ExecCtx exec_ctx;
grpc_end2end_http_proxy* proxy = new grpc_end2end_http_proxy();
proxy_destroyed.store(false);
// Construct proxy address.
const int proxy_port = grpc_pick_unused_port_or_die();
proxy->proxy_name = grpc_core::JoinHostPort("localhost", proxy_port);
Expand Down Expand Up @@ -684,7 +710,7 @@ static void destroy_pollset(void* arg, grpc_error_handle /*error*/) {
}

void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
gpr_unref(&proxy->users); // Signal proxy thread to shutdown.
proxy->is_shutdown.store(true);
grpc_core::ExecCtx exec_ctx;
proxy->thd.Join();
grpc_tcp_server_shutdown_listeners(proxy->server);
Expand All @@ -693,8 +719,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
grpc_pollset_shutdown(proxy->pollset[0],
GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset[0],
grpc_schedule_on_exec_ctx));
GRPC_COMBINER_UNREF(proxy->combiner, "test");
delete proxy;
proxy_unref(proxy);
}

const char* grpc_end2end_http_proxy_get_proxy_name(
Expand Down

0 comments on commit c85cdfe

Please sign in to comment.