Skip to content

Commit

Permalink
Merge pull request grpc#16050 from ncteisen/channelz-round-robin
Browse files Browse the repository at this point in the history
Channelz Part 3.33: Round Robin Subchannel Refs Support
  • Loading branch information
ncteisen authored Jul 19, 2018
2 parents b29339f + 90f0631 commit 93579e9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}

void PickFirst::ShutdownLocked() {
AutoChildRefsUpdater gaurd(this);
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
Expand Down Expand Up @@ -327,30 +327,10 @@ void PickFirst::FillChildRefsForChannelz(
void PickFirst::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(
subchannel_list_->subchannel(i)->subchannel());
if (subchannel_node != nullptr) {
cs.push_back(subchannel_node->subchannel_uuid());
}
}
}
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
++i) {
if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(
latest_pending_subchannel_list_->subchannel(i)->subchannel());
if (subchannel_node != nullptr) {
cs.push_back(subchannel_node->subchannel_uuid());
}
}
}
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ class RoundRobin : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
// TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override {}
ChildRefsList* ignored) override;

private:
~RoundRobin();
Expand Down Expand Up @@ -183,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};

// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }

private:
RoundRobin* rr_;
};

void ShutdownLocked() override;

void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
void UpdateChildRefsLocked();

/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
Expand All @@ -205,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
/// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
ChildRefsList child_subchannels_;
ChildRefsList child_channels_;
};

RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
Expand All @@ -223,6 +241,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
Expand All @@ -242,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}

void RoundRobin::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
Expand Down Expand Up @@ -365,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}

void RoundRobin::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
mu_guard guard(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}

void RoundRobin::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}

void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
Expand Down Expand Up @@ -455,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
Expand Down Expand Up @@ -611,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,

void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
Expand Down
13 changes: 13 additions & 0 deletions src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@ class SubchannelList
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }

// Populates refs_list with the uuids of this SubchannelLists's subchannels.
void PopulateChildRefsList(ChildRefsList* refs_list) {
for (size_t i = 0; i < subchannels_.size(); ++i) {
if (subchannels_[i].subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
if (subchannel_node != nullptr) {
refs_list->push_back(subchannel_node->subchannel_uuid());
}
}
}
}

// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
Expand Down

0 comments on commit 93579e9

Please sign in to comment.