Skip to content

Commit

Permalink
[client_channel] ensure that subchannels are always destroyed inside …
Browse files Browse the repository at this point in the history
…the WorkSerializer (grpc#34077)

- add debug-only `WorkSerializer::IsRunningInWorkSerializer()` method
and use it in client_channel to verify that subchannels are destroyed in
the `WorkSerializer`
  - note: this mechanism uses `std::thread::id`, so I had to exclude
    work_serializer.cc from the core_banned_constructs check
- fix `WorkSerializer::Run()` to unref the callback before releasing
ownership of the `WorkSerializer`, so that any refs captured by the
`std::function<>` will be released before releasing ownership
- fix the WRR timer callback to hop into the `WorkSerializer` to release
its ref to the picker, since that transitively releases refs to
subchannels
- fix subchannel connectivity state notifications to unref the watcher
inside the `WorkSerializer`, since the watcher often transitively holds
refs to subchannels
  • Loading branch information
markdroth authored Aug 21, 2023
1 parent fdf3f5a commit 1e818c9
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 18 deletions.
9 changes: 6 additions & 3 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
++it->second;
}
}
GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
chand_->subchannel_wrappers_.insert(this);
}

Expand All @@ -522,6 +523,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
"chand=%p: destroying subchannel wrapper %p for subchannel %p",
chand_, this, subchannel_.get());
}
GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
Expand Down Expand Up @@ -615,15 +617,16 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
DEBUG_LOCATION);
}

void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) override {
void OnConnectivityStateChange(
RefCountedPtr<ConnectivityStateWatcherInterface> self,
grpc_connectivity_state state, const absl::Status& status) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO,
"chand=%p: connectivity change for subchannel wrapper %p "
"subchannel %p; hopping into work_serializer",
parent_->chand_, parent_.get(), parent_->subchannel_.get());
}
Ref().release(); // ref owned by lambda
self.release(); // Held by callback.
parent_->chand_->work_serializer_->Run(
[this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
*parent_->chand_->work_serializer_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,11 @@ class HealthProducer::ConnectivityWatcher
explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
: producer_(std::move(producer)) {}

void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) override {
void OnConnectivityStateChange(
RefCountedPtr<ConnectivityStateWatcherInterface> self,
grpc_connectivity_state state, const absl::Status& status) override {
producer_->OnConnectivityStateChange(state, status);
self.reset();
}

grpc_pollset_set* interested_parties() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ class OrcaProducer::ConnectivityWatcher
grpc_pollset_set_destroy(interested_parties_);
}

void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status&) override {
void OnConnectivityStateChange(
RefCountedPtr<ConnectivityStateWatcherInterface> self,
grpc_connectivity_state state, const absl::Status&) override {
producer_->OnConnectivityStateChange(state);
self.reset();
}

grpc_pollset_set* interested_parties() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
// Start timer.
WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(), [self = std::move(self)]() mutable {
config_->weight_update_period(),
[self = std::move(self),
work_serializer = wrr_->work_serializer()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
{
Expand All @@ -621,8 +623,8 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
self->BuildSchedulerAndStartTimerLocked();
}
}
// Release ref before ExecCtx goes out of scope.
self.reset();
// Release the picker ref inside the WorkSerializer.
work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION);
});
}

Expand Down
12 changes: 8 additions & 4 deletions src/core/ext/filters/client_channel/subchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,10 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
grpc_connectivity_state state, const absl::Status& status) {
for (const auto& p : watchers_) {
subchannel_->work_serializer_.Schedule(
[watcher = p.second->Ref(), state, status]() {
watcher->OnConnectivityStateChange(state, status);
[watcher = p.second->Ref(), state, status]() mutable {
auto* watcher_ptr = watcher.get();
watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
status);
},
DEBUG_LOCATION);
}
Expand Down Expand Up @@ -527,8 +529,10 @@ void Subchannel::WatchConnectivityState(
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
work_serializer_.Schedule(
[watcher = watcher->Ref(), state = state_, status = status_]() {
watcher->OnConnectivityStateChange(state, status);
[watcher = watcher->Ref(), state = state_, status = status_]() mutable {
auto* watcher_ptr = watcher.get();
watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
status);
},
DEBUG_LOCATION);
watcher_list_.AddWatcherLocked(std::move(watcher));
Expand Down
10 changes: 8 additions & 2 deletions src/core/ext/filters/client_channel/subchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,14 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Invoked whenever the subchannel's connectivity state changes.
// There will be only one invocation of this method on a given watcher
// instance at any given time.
virtual void OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) = 0;
// A ref to the watcher is passed in here so that the implementation
// can unref it in the appropriate synchronization context (e.g.,
// inside a WorkSerializer).
// TODO(roth): Figure out a cleaner way to guarantee that the ref is
// released in the right context.
virtual void OnConnectivityStateChange(
RefCountedPtr<ConnectivityStateWatcherInterface> self,
grpc_connectivity_state state, const absl::Status& status) = 0;

virtual grpc_pollset_set* interested_parties() = 0;
};
Expand Down
36 changes: 36 additions & 0 deletions src/core/lib/gprpp/work_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <utility>

#include <grpc/support/log.h>
Expand All @@ -47,6 +48,12 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
void DrainQueue();
void Orphan() override;

#ifndef NDEBUG
bool RunningInWorkSerializer() const {
return std::this_thread::get_id() == current_thread_;
}
#endif

private:
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
Expand Down Expand Up @@ -86,6 +93,9 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
// orphaned.
std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
MultiProducerSingleConsumerQueue queue_;
#ifndef NDEBUG
std::thread::id current_thread_;
#endif
};

void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
Expand All @@ -102,10 +112,17 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
callback();
// Delete the callback while still holding the WorkSerializer, so
// that any refs being held by the callback via lambda captures will
// be destroyed inside the WorkSerializer.
callback = nullptr;
DrainQueueOwned();
} else {
// Another thread is holding the WorkSerializer, so decrement the ownership
Expand Down Expand Up @@ -158,6 +175,9 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0) {
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned();
} else {
Expand Down Expand Up @@ -186,6 +206,12 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
}
if (GetSize(prev_ref_pair) == 2) {
// Queue drained. Give up ownership but only if queue remains empty.
#ifndef NDEBUG
// Reset current_thread_ before giving up ownership to avoid TSAN
// race. If we don't wind up giving up ownership, we'll set this
// again below before we pull the next callback out of the queue.
current_thread_ = std::thread::id();
#endif
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
std::memory_order_acq_rel)) {
Expand All @@ -200,6 +226,10 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
delete this;
return;
}
#ifndef NDEBUG
// Didn't wind up giving up ownership, so set current_thread_ again.
current_thread_ = std::this_thread::get_id();
#endif
}
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
Expand Down Expand Up @@ -244,4 +274,10 @@ void WorkSerializer::Schedule(std::function<void()> callback,

void WorkSerializer::DrainQueue() { impl_->DrainQueue(); }

#ifndef NDEBUG
bool WorkSerializer::RunningInWorkSerializer() const {
return impl_->RunningInWorkSerializer();
}
#endif

} // namespace grpc_core
5 changes: 5 additions & 0 deletions src/core/lib/gprpp/work_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class ABSL_LOCKABLE WorkSerializer {
// Drains the queue of callbacks.
void DrainQueue();

#ifndef NDEBUG
// Returns true if the current thread is running in the WorkSerializer.
bool RunningInWorkSerializer() const;
#endif

private:
class WorkSerializerImpl;

Expand Down
37 changes: 37 additions & 0 deletions test/core/gprpp/work_serializer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,43 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
}
}

#ifndef NDEBUG
TEST(WorkSerializerTest, RunningInWorkSerializer) {
grpc_core::WorkSerializer work_serializer1;
grpc_core::WorkSerializer work_serializer2;
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
}
#endif

} // namespace

int main(int argc, char** argv) {
Expand Down
4 changes: 2 additions & 2 deletions tools/run_tests/sanity/cpp_banned_constructs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ cd "$(dirname "$0")/../../.."
grep -EIrn \
'std::(mutex|condition_variable|lock_guard|unique_lock|thread)' \
include/grpc include/grpcpp src/core src/cpp | \
grep -Ev include/grpcpp/impl/sync.h | \
grep -Ev 'include/grpcpp/impl/sync.h|src/core/lib/gprpp/work_serializer.cc' | \
diff - /dev/null

#
Expand All @@ -36,7 +36,7 @@ grep -EIrn \
grep -EIrn \
'^#include (<mutex>|<condition_variable>|<thread>|<ratio>|<filesystem>|<future>|<system_error>)' \
include/grpc include/grpcpp src/core src/cpp | \
grep -Ev include/grpcpp/impl/sync.h | \
grep -Ev 'include/grpcpp/impl/sync.h|src/core/lib/gprpp/work_serializer.cc' | \
diff - /dev/null

#
Expand Down

0 comments on commit 1e818c9

Please sign in to comment.