Skip to content

Commit

Permalink
[Fork] Add ObjectGroupForkHandler fork-handling system (grpc#33733)
Browse files Browse the repository at this point in the history
<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: Bradley Hess <[email protected]>
Co-authored-by: AJ Heller <[email protected]>
  • Loading branch information
3 people authored Nov 8, 2023
1 parent f87ce8b commit d3828eb
Show file tree
Hide file tree
Showing 25 changed files with 585 additions and 178 deletions.
60 changes: 60 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1456,9 +1456,7 @@ grpc_cc_library(
hdrs = [
"lib/event_engine/forkable.h",
],
external_deps = ["absl/base:core_headers"],
deps = [
"no_destruct",
"//:config_vars",
"//:gpr",
"//:gpr_platform",
Expand Down Expand Up @@ -1601,6 +1599,7 @@ grpc_cc_library(
"event_engine_trace",
"event_engine_work_queue",
"forkable",
"no_destruct",
"notification",
"time",
"useful",
Expand Down Expand Up @@ -1664,6 +1663,7 @@ grpc_cc_library(
],
deps = [
"event_engine_poller",
"forkable",
"posix_event_engine_closure",
"//:event_engine_base_hdrs",
"//:gpr_platform",
Expand Down Expand Up @@ -1796,7 +1796,6 @@ grpc_cc_library(
deps = [
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
Expand Down Expand Up @@ -1834,7 +1833,6 @@ grpc_cc_library(
"common_event_engine_closures",
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
Expand All @@ -1859,7 +1857,9 @@ grpc_cc_library(
],
external_deps = ["absl/strings"],
deps = [
"forkable",
"iomgr_port",
"no_destruct",
"posix_event_engine_event_poller",
"posix_event_engine_poller_posix_epoll1",
"posix_event_engine_poller_posix_poll",
Expand Down Expand Up @@ -2073,8 +2073,10 @@ grpc_cc_library(
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"forkable",
"init_internally",
"iomgr_port",
"no_destruct",
"posix_event_engine_base_hdrs",
"posix_event_engine_closure",
"posix_event_engine_endpoint",
Expand Down
99 changes: 40 additions & 59 deletions src/core/lib/event_engine/forkable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,98 +26,79 @@
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"

#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/sync.h"

namespace grpc_event_engine {
namespace experimental {

grpc_core::TraceFlag grpc_trace_fork(false, "fork");

namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};

// This must be ordered because there are ordering dependencies between
// certain fork handlers.
grpc_core::NoDestruct<std::vector<Forkable*>> g_forkables ABSL_GUARDED_BY(g_mu);

bool IsForkEnabled() {
static bool enabled = grpc_core::ConfigVars::Get().EnableForkSupport();
return enabled;
}
} // namespace

Forkable::Forkable() { ManageForkable(this); }

Forkable::~Forkable() { StopManagingForkable(this); }

void RegisterForkHandlers() {
if (IsForkEnabled()) {
grpc_core::MutexLock lock(g_mu.get());
if (!std::exchange(g_registered, true)) {
void ObjectGroupForkHandler::RegisterForkable(
std::shared_ptr<Forkable> forkable, GRPC_UNUSED void (*prepare)(void),
GRPC_UNUSED void (*parent)(void), GRPC_UNUSED void (*child)(void)) {
GPR_ASSERT(!is_forking_);
forkables_.emplace_back(forkable);
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
GRPC_FORK_TRACE_LOG_STRING("RegisterForkHandlers");
pthread_atfork(PrepareFork, PostforkParent, PostforkChild);
#endif
}
if (!std::exchange(registered_, true)) {
pthread_atfork(prepare, parent, child);
}
#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
}

void PrepareFork() {
void ObjectGroupForkHandler::Prefork() {
if (IsForkEnabled()) {
GPR_ASSERT(!std::exchange(is_forking_, true));
GRPC_FORK_TRACE_LOG_STRING("PrepareFork");
grpc_core::MutexLock lock(g_mu.get());
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PrepareFork();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PrepareFork finished");
}
}

void PostforkParent() {
void ObjectGroupForkHandler::PostforkParent() {
if (IsForkEnabled()) {
GPR_ASSERT(is_forking_);
GRPC_FORK_TRACE_LOG_STRING("PostforkParent");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkParent for forkable::%p", forkable);
forkable->PostforkParent();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PostforkParent();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PostforkParent finished");
is_forking_ = false;
}
}

void PostforkChild() {
void ObjectGroupForkHandler::PostforkChild() {
if (IsForkEnabled()) {
GPR_ASSERT(is_forking_);
GRPC_FORK_TRACE_LOG_STRING("PostforkChild");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkChild for forkable::%p", forkable);
forkable->PostforkChild();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PostforkChild();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PostforkChild finished");
}
}

void ManageForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Manage forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
g_forkables->push_back(forkable);
}
}

void StopManagingForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Stop managing forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
g_forkables->erase(iter);
is_forking_ = false;
}
}

Expand Down
60 changes: 33 additions & 27 deletions src/core/lib/event_engine/forkable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <grpc/support/port_platform.h>

#include <memory>
#include <vector>

#include <grpc/support/log.h>

#include "src/core/lib/debug/trace.h"
Expand All @@ -34,41 +37,44 @@ extern grpc_core::TraceFlag grpc_trace_fork;

#define GRPC_FORK_TRACE_LOG_STRING(format) GRPC_FORK_TRACE_LOG("%s", format)

// Register fork handlers with the system, enabling fork support.
//
// This provides pthread-based support for fork events. Any objects that
// implement Forkable can register themselves with this system using
// ManageForkable, and their respective methods will be called upon fork.
//
// This should be called once upon grpc_initialization.
void RegisterForkHandlers();

// Global callback for pthread_atfork's *prepare argument
void PrepareFork();
// Global callback for pthread_atfork's *parent argument
void PostforkParent();
// Global callback for pthread_atfork's *child argument
void PostforkChild();

// An interface to be implemented by EventEngines that wish to have managed fork
// support.
// support. The child class must guarantee that those methods are thread-safe.
class Forkable {
public:
Forkable();
virtual ~Forkable();
virtual ~Forkable() = default;
virtual void PrepareFork() = 0;
virtual void PostforkParent() = 0;
virtual void PostforkChild() = 0;
};

// Add Forkables from the set of objects that are supported.
// Upon fork, each forkable will have its respective fork hooks called on
// the thread that invoked the fork.
//
// Relative ordering of fork callback operations is not guaranteed.
void ManageForkable(Forkable* forkable);
// Remove a forkable from the managed set.
void StopManagingForkable(Forkable* forkable);
// ObjectGroupForkHandler is meant to be used as a static object in each
// translation unit where Forkables are created and registered with the
// ObjectGroupForkHandler. It essentially provides storage for Forkables'
// instances (as a vector of weak pointers) and helper methods that are meant to
// be invoked inside the fork handlers (see pthread_atfork(3)). The idea is to
// have different Forkables (e.g. PosixEventPoller) to store their instances
// (e.g. a PosixEventPoller object) in a single place separated from other
// Forkables (a sharded approach). Forkables need to register their pthread fork
// handlers and manage the relative ordering themselves. This object is
// thread-unsafe.
class ObjectGroupForkHandler {
public:
// Registers a Forkable with this ObjectGroupForkHandler, the Forkable must be
// created as a shared pointer.
void RegisterForkable(std::shared_ptr<Forkable> forkable,
GRPC_UNUSED void (*prepare)(void),
GRPC_UNUSED void (*parent)(void),
GRPC_UNUSED void (*child)(void));

void Prefork();
void PostforkParent();
void PostforkChild();

private:
GRPC_UNUSED bool registered_ = false;
bool is_forking_ = false;
std::vector<std::weak_ptr<Forkable> > forkables_;
};

} // namespace experimental
} // namespace grpc_event_engine
Expand Down
Loading

0 comments on commit d3828eb

Please sign in to comment.