Skip to content

Commit

Permalink
[Fork] Kick the EE PollPoller in prefork (grpc#33916)
Browse files Browse the repository at this point in the history
And add some trace points. This does not solve
[go/event-engine-forkable-prefork-deadlock](http://go/event-engine-forkable-prefork-deadlock),
but is a necessary step.
So ¯\\_(ツ)_/¯.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
  • Loading branch information
yijiem authored Aug 4, 2023
1 parent 6468b8a commit 60c1701
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,7 @@ grpc_cc_library(
"//:config_vars",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
],
)

Expand Down Expand Up @@ -1742,6 +1743,7 @@ grpc_cc_library(
"common_event_engine_closures",
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
Expand Down
16 changes: 15 additions & 1 deletion src/core/lib/event_engine/forkable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
namespace grpc_event_engine {
namespace experimental {

grpc_core::TraceFlag grpc_trace_fork(false, "fork");

namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};
Expand All @@ -58,48 +60,60 @@ void RegisterForkHandlers() {
grpc_core::MutexLock lock(g_mu.get());
if (!std::exchange(g_registered, true)) {
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
GRPC_FORK_TRACE_LOG_STRING("RegisterForkHandlers");
pthread_atfork(PrepareFork, PostforkParent, PostforkChild);
#endif
}
}
};
}

void PrepareFork() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PrepareFork");
grpc_core::MutexLock lock(g_mu.get());
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
}
GRPC_FORK_TRACE_LOG_STRING("PrepareFork finished");
}
}

void PostforkParent() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PostforkParent");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkParent for forkable::%p", forkable);
forkable->PostforkParent();
}
GRPC_FORK_TRACE_LOG_STRING("PostforkParent finished");
}
}

void PostforkChild() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PostforkChild");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkChild for forkable::%p", forkable);
forkable->PostforkChild();
}
GRPC_FORK_TRACE_LOG_STRING("PostforkChild finished");
}
}

void ManageForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Manage forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
g_forkables->push_back(forkable);
}
}

void StopManagingForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Stop managing forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
Expand Down
15 changes: 15 additions & 0 deletions src/core/lib/event_engine/forkable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,24 @@

#include <grpc/support/port_platform.h>

#include <grpc/support/log.h>

#include "src/core/lib/debug/trace.h"

namespace grpc_event_engine {
namespace experimental {

extern grpc_core::TraceFlag grpc_trace_fork;

#define GRPC_FORK_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fork)) { \
gpr_log(GPR_DEBUG, "[fork] " format, __VA_ARGS__); \
} \
} while (0)

#define GRPC_FORK_TRACE_LOG_STRING(format) GRPC_FORK_TRACE_LOG("%s", format)

// Register fork handlers with the system, enabling fork support.
//
// This provides pthread-based support for fork events. Any objects that
Expand Down
28 changes: 25 additions & 3 deletions src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) {
PollPoller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
InitPollPollerPosix();
Expand Down Expand Up @@ -566,6 +566,9 @@ bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {

void PollPoller::KickExternal(bool ext) {
grpc_core::MutexLock lock(&mu_);
if (closed_) {
return;
}
if (was_kicked_) {
if (ext) {
was_kicked_ext_ = true;
Expand Down Expand Up @@ -610,7 +613,8 @@ PollPoller::PollPoller(Scheduler* scheduler)
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
poll_handles_list_head_(nullptr),
closed_(false) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
Expand All @@ -622,7 +626,8 @@ PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll)
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
poll_handles_list_head_(nullptr),
closed_(false) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
Expand Down Expand Up @@ -829,6 +834,17 @@ void PollPoller::Shutdown() {
Unref();
}

void PollPoller::PrepareFork() { Kick(); }
// TODO(vigneshbabu): implement
void PollPoller::PostforkParent() {}
// TODO(vigneshbabu): implement
void PollPoller::PostforkChild() {}

void PollPoller::Close() {
grpc_core::MutexLock lock(&mu_);
closed_ = true;
}

PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) {
static bool kPollPollerSupported = InitPollPollerPosix();
if (kPollPollerSupported) {
Expand Down Expand Up @@ -875,6 +891,12 @@ PollPoller* MakePollPoller(Scheduler* /*scheduler*/,
return nullptr;
}

void PollPoller::PrepareFork() { grpc_core::Crash("unimplemented"); }
void PollPoller::PostforkParent() { grpc_core::Crash("unimplemented"); }
void PollPoller::PostforkChild() { grpc_core::Crash("unimplemented"); }

void PollPoller::Close() { grpc_core::Crash("unimplemented"); }

void PollPoller::KickExternal(bool /*ext*/) {
grpc_core::Crash("unimplemented");
}
Expand Down
11 changes: 10 additions & 1 deletion src/core/lib/event_engine/posix_engine/ev_poll_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <grpc/event_engine/event_engine.h>

#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
Expand All @@ -38,7 +39,7 @@ namespace experimental {
class PollEventHandle;

// Definition of poll based poller.
class PollPoller : public PosixEventPoller {
class PollPoller : public PosixEventPoller, public Forkable {
public:
explicit PollPoller(Scheduler* scheduler);
PollPoller(Scheduler* scheduler, bool use_phony_poll);
Expand All @@ -54,6 +55,13 @@ class PollPoller : public PosixEventPoller {
bool CanTrackErrors() const override { return false; }
~PollPoller() override;

// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;

void Close();

private:
void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
Expand Down Expand Up @@ -83,6 +91,7 @@ class PollPoller : public PosixEventPoller {
int num_poll_handles_ ABSL_GUARDED_BY(mu_);
PollEventHandle* poll_handles_list_head_ ABSL_GUARDED_BY(mu_) = nullptr;
std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_ ABSL_GUARDED_BY(mu_);
};

// Return an instance of a poll based poller tied to the specified scheduler.
Expand Down

0 comments on commit 60c1701

Please sign in to comment.