Skip to content

Commit

Permalink
[EventEngine] Port GrpcPolledFdFactoryPosix fix to EE (grpc#34025)
Browse files Browse the repository at this point in the history
Port grpc#33871 to EE's
GrpcPolledFdFactoryPosix.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
  • Loading branch information
yijiem authored Aug 11, 2023
1 parent 4acb7d3 commit 67ad297
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 16 deletions.
21 changes: 14 additions & 7 deletions src/core/lib/event_engine/ares_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ AresResolver::CreateAresResolver(
std::shared_ptr<EventEngine> event_engine) {
ares_options opts = {};
opts.flags |= ARES_FLAG_STAYOPEN;
if (g_event_engine_grpc_ares_test_only_force_tcp) {
opts.flags |= ARES_FLAG_USEVC;
}
ares_channel channel;
int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS);
if (status != ARES_SUCCESS) {
Expand All @@ -168,6 +171,7 @@ AresResolver::CreateAresResolver(
absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status)));
}
event_engine_grpc_ares_test_only_inject_config(&channel);
polled_fd_factory->ConfigureAresChannelLocked(channel);
if (!dns_server.empty()) {
absl::Status status = SetRequestDNSServer(dns_server, &channel);
if (!status.ok()) {
Expand All @@ -194,7 +198,7 @@ void AresResolver::Orphan() {
}
for (const auto& fd_node : fd_node_list_) {
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("request: %p shutdown fd: %s", this,
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
fd_node->polled_fd->ShutdownLocked(
absl::CancelledError("AresResolver::Orphan"));
Expand Down Expand Up @@ -351,9 +355,10 @@ void AresResolver::CheckSocketsLocked() {
fd_node_list_.begin(), fd_node_list_.end(),
[sock = socks[i]](const auto& node) { return node->as == sock; });
if (iter == fd_node_list_.end()) {
GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p new fd: %d", this,
socks[i]);
new_list.push_back(std::make_unique<FdNode>(
socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i])));
GRPC_ARES_RESOLVER_TRACE_LOG("request:%p new fd: %d", this, socks[i]);
} else {
new_list.splice(new_list.end(), fd_node_list_, iter);
}
Expand All @@ -368,15 +373,15 @@ void AresResolver::CheckSocketsLocked() {
// new data arrives and c-ares hasn't read all the data in the
// previous ares_process_fd.
GRPC_ARES_RESOLVER_TRACE_LOG(
"request:%p schedule read directly on: %d", this, fd_node->as);
"resolver:%p schedule read directly on: %d", this, fd_node->as);
event_engine_->Run(
[self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
fd_node]() mutable {
self->OnReadable(fd_node, absl::OkStatus());
});
} else {
// Otherwise register with the poller for readable event.
GRPC_ARES_RESOLVER_TRACE_LOG("request:%p notify read on: %d", this,
GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify read on: %d", this,
fd_node->as);
fd_node->polled_fd->RegisterForOnReadableLocked(
[self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
Expand All @@ -389,7 +394,7 @@ void AresResolver::CheckSocketsLocked() {
// has not been registered with this socket.
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fd_node->writable_registered) {
GRPC_ARES_RESOLVER_TRACE_LOG("request:%p notify write on: %d", this,
GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify write on: %d", this,
fd_node->as);
fd_node->writable_registered = true;
fd_node->polled_fd->RegisterForOnWriteableLocked(
Expand All @@ -407,13 +412,13 @@ void AresResolver::CheckSocketsLocked() {
while (!fd_node_list_.empty()) {
FdNode* fd_node = fd_node_list_.front().get();
if (!fd_node->already_shutdown) {
GRPC_ARES_RESOLVER_TRACE_LOG("request: %p shutdown fd: %s", this,
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
fd_node->polled_fd->GetName());
fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
fd_node->already_shutdown = true;
}
if (!fd_node->readable_registered && !fd_node->writable_registered) {
GRPC_ARES_RESOLVER_TRACE_LOG("request: %p delete fd: %s", this,
GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this,
fd_node->polled_fd->GetName());
fd_node_list_.pop_front();
} else {
Expand Down Expand Up @@ -702,4 +707,6 @@ void noop_inject_channel_config(ares_channel* /*channel*/) {}
void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) =
noop_inject_channel_config;

bool g_event_engine_grpc_ares_test_only_force_tcp = false;

#endif // GRPC_ARES == 1
3 changes: 3 additions & 0 deletions src/core/lib/event_engine/ares_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,8 @@ class AresResolver : public grpc_core::InternallyRefCounted<AresResolver> {
extern void (*event_engine_grpc_ares_test_only_inject_config)(
ares_channel* channel);

// Exposed in this header for C-core tests only
extern bool g_event_engine_grpc_ares_test_only_force_tcp;

#endif // GRPC_ARES == 1
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_ARES_RESOLVER_H
89 changes: 87 additions & 2 deletions src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)

#include <string.h>
// IWYU pragma: no_include <ares_build.h>

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#include <string>
#include <unordered_set>
#include <utility>

#include <ares.h>
Expand All @@ -36,6 +41,7 @@
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/error.h"

namespace grpc_event_engine {
Expand Down Expand Up @@ -93,16 +99,95 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller)
: poller_(poller) {}

~GrpcPolledFdFactoryPosix() override {
for (auto& fd : owned_fds_) {
close(fd);
}
}

GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as) override {
owned_fds_.insert(as);
return new GrpcPolledFdPosix(
as,
poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors()));
}

void ConfigureAresChannelLocked(ares_channel /*channel*/) override {}
void ConfigureAresChannelLocked(ares_channel channel) override {
ares_set_socket_functions(channel, &kSockFuncs, this);
ares_set_socket_configure_callback(
channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr);
}

private:
/// Overridden socket API for c-ares
static ares_socket_t Socket(int af, int type, int protocol,
void* /*user_data*/) {
return socket(af, type, protocol);
}

/// Overridden connect API for c-ares
static int Connect(ares_socket_t as, const struct sockaddr* target,
ares_socklen_t target_len, void* /*user_data*/) {
return connect(as, target, target_len);
}

/// Overridden writev API for c-ares
static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov,
int iovec_count, void* /*user_data*/) {
return writev(as, iov, iovec_count);
}

/// Overridden recvfrom API for c-ares
static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
int flags, struct sockaddr* from,
ares_socklen_t* from_len, void* /*user_data*/) {
return recvfrom(as, data, data_len, flags, from, from_len);
}

/// Overridden close API for c-ares
static int Close(ares_socket_t as, void* user_data) {
GrpcPolledFdFactoryPosix* self =
static_cast<GrpcPolledFdFactoryPosix*>(user_data);
if (self->owned_fds_.find(as) == self->owned_fds_.end()) {
// c-ares owns this fd, grpc has never seen it
return close(as);
}
return 0;
}

/// Because we're using socket API overrides, c-ares won't
/// perform its typical configuration on the socket. See
/// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018.
/// So we use the configure socket callback override and copy default
/// settings that c-ares would normally apply on posix platforms:
/// - non-blocking
/// - cloexec flag
/// - disable nagle
static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) {
// clang-format off
#define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; }
// clang-format on
PosixSocketWrapper sock(fd);
RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
RETURN_IF_ERROR(sock.SetSocketCloexec(1));
if (type == SOCK_STREAM) {
RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
}
return 0;
}

const struct ares_socket_functions kSockFuncs = {
&GrpcPolledFdFactoryPosix::Socket /* socket */,
&GrpcPolledFdFactoryPosix::Close /* close */,
&GrpcPolledFdFactoryPosix::Connect /* connect */,
&GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */,
&GrpcPolledFdFactoryPosix::WriteV /* writev */,
};

PosixEventPoller* poller_;
// fds that are used/owned by grpc - we (grpc) will close them rather than
// c-ares
std::unordered_set<ares_socket_t> owned_fds_;
};

} // namespace experimental
Expand Down
1 change: 1 addition & 0 deletions test/cpp/naming/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/core:ares_resolver",
"//src/core:channel_args",
"//test/core/end2end:cq_verifier",
"//test/core/util:fake_udp_and_tcp_server",
Expand Down
26 changes: 19 additions & 7 deletions test/cpp/naming/cancel_ares_query_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/event_engine/ares_resolver.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/string.h"
Expand Down Expand Up @@ -442,21 +443,32 @@ TEST_F(CancelDuringAresQuery, TestQueryFailsBecauseTcpServerClosesSocket) {
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::
CloseSocketUponReceivingBytesFromPeer);
// TODO(yijiem): make this test work with the EE DNS resolver by supporting
// this test flag to force TCP in the EE DNS resolver.
if (grpc_core::IsEventEngineDnsEnabled()) return;
g_grpc_ares_test_only_force_tcp = true;
if (grpc_core::IsEventEngineDnsEnabled()) {
g_event_engine_grpc_ares_test_only_force_tcp = true;
} else {
g_grpc_ares_test_only_force_tcp = true;
}
grpc_status_code expected_status_code = GRPC_STATUS_UNAVAILABLE;
std::string expected_error_message_substring =
absl::StrCat("DNS resolution failed for ", kFakeName);
std::string expected_error_message_substring;
if (grpc_core::IsEventEngineDnsEnabled()) {
expected_error_message_substring =
absl::StrCat("errors resolving ", kFakeName);
} else {
expected_error_message_substring =
absl::StrCat("DNS resolution failed for ", kFakeName);
}
// Don't really care about the deadline - we should quickly hit a DNS
// resolution failure.
gpr_timespec rpc_deadline = grpc_timeout_seconds_to_deadline(100);
int dns_query_timeout_ms = -1; // don't set query timeout
TestCancelDuringActiveQuery(expected_status_code,
expected_error_message_substring, rpc_deadline,
dns_query_timeout_ms, fake_dns_server.port());
g_grpc_ares_test_only_force_tcp = false;
if (grpc_core::IsEventEngineDnsEnabled()) {
g_event_engine_grpc_ares_test_only_force_tcp = false;
} else {
g_grpc_ares_test_only_force_tcp = false;
}
}

} // namespace
Expand Down

0 comments on commit 67ad297

Please sign in to comment.