Skip to content

Commit

Permalink
[pick_first] fix happy eyeballs address interleaving bug (grpc#34804)
Browse files Browse the repository at this point in the history
The original logic from grpc#34615 was incorrect in cases where one address
family has a different number of addresses than the other(s).

Fixes b/307937051.
  • Loading branch information
markdroth authored Oct 26, 2023
1 parent 9c15d63 commit 4826efa
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
#include <string.h>

#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <utility>
Expand Down Expand Up @@ -446,6 +446,35 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
}
}

absl::string_view GetAddressFamily(const grpc_resolved_address& address) {
const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
};

// An endpoint list iterator that returns only entries for a specific
// address family, as indicated by the URI scheme.
class AddressFamilyIterator {
public:
AddressFamilyIterator(absl::string_view scheme, size_t index)
: scheme_(scheme), index_(index) {}

EndpointAddresses* Next(EndpointAddressesList& endpoints,
std::vector<bool>* endpoints_moved) {
for (; index_ < endpoints.size(); ++index_) {
if (!(*endpoints_moved)[index_] &&
GetAddressFamily(endpoints[index_].address()) == scheme_) {
(*endpoints_moved)[index_] = true;
return &endpoints[index_++];
}
}
return nullptr;
}

private:
absl::string_view scheme_;
size_t index_;
};

absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
Expand Down Expand Up @@ -473,41 +502,35 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
// While we're iterating, also determine the desired address family
// order and the index of the first element of each family, for use in
// the interleaving below.
auto get_address_family = [](const grpc_resolved_address& address) {
const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
};
std::vector<absl::string_view> address_family_order;
std::map<absl::string_view, size_t> address_family_indexes;
std::set<absl::string_view> address_families;
std::vector<AddressFamilyIterator> address_family_order;
EndpointAddressesList endpoints;
for (const auto& endpoint : *args.addresses) {
for (const auto& address : endpoint.addresses()) {
endpoints.emplace_back(address, endpoint.args());
if (IsPickFirstHappyEyeballsEnabled()) {
absl::string_view scheme = get_address_family(address);
bool inserted =
address_family_indexes.emplace(scheme, endpoints.size() - 1)
.second;
if (inserted) address_family_order.push_back(scheme);
absl::string_view scheme = GetAddressFamily(address);
bool inserted = address_families.insert(scheme).second;
if (inserted) {
address_family_order.emplace_back(scheme, endpoints.size() - 1);
}
}
}
}
// Interleave addresses as per RFC-8305 section 4.
if (IsPickFirstHappyEyeballsEnabled()) {
EndpointAddressesList interleaved_endpoints;
interleaved_endpoints.reserve(endpoints.size());
std::vector<bool> endpoints_moved(endpoints.size());
size_t scheme_index = 0;
for (size_t i = 0; i < endpoints.size(); ++i) {
absl::string_view scheme_to_use =
address_family_order[i % address_family_order.size()];
size_t& next_index = address_family_indexes[scheme_to_use];
for (; next_index < endpoints.size(); ++next_index) {
if (get_address_family(endpoints[next_index].address()) ==
scheme_to_use) {
break;
}
}
if (next_index == endpoints.size()) continue;
interleaved_endpoints.emplace_back(std::move(endpoints[next_index]));
next_index++;
EndpointAddresses* endpoint;
do {
auto& iterator = address_family_order[scheme_index++ %
address_family_order.size()];
endpoint = iterator.Next(endpoints, &endpoints_moved);
} while (endpoint == nullptr);
interleaved_endpoints.emplace_back(std::move(*endpoint));
}
args.addresses = std::move(interleaved_endpoints);
} else {
Expand Down
121 changes: 107 additions & 14 deletions test/core/client_channel/lb_policy/pick_first_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,11 +670,11 @@ TEST_F(PickFirstTest,

TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three IPv4 addresses followed by three
// Send an update containing four IPv4 addresses followed by two
// IPv6 addresses.
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
"ipv6:[::1]:443", "ipv6:[::1]:444", "ipv6:[::1]:445"};
"ipv4:127.0.0.1:446", "ipv6:[::1]:444", "ipv6:[::1]:445"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
Expand All @@ -685,12 +685,12 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
ASSERT_NE(subchannel_ipv4_2, nullptr);
auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel_ipv4_3, nullptr);
auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[3]);
auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[3]);
ASSERT_NE(subchannel_ipv4_4, nullptr);
auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[4]);
ASSERT_NE(subchannel_ipv6_1, nullptr);
auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[4]);
auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[5]);
ASSERT_NE(subchannel_ipv6_2, nullptr);
auto* subchannel_ipv6_3 = FindSubchannel(kAddresses[5]);
ASSERT_NE(subchannel_ipv6_3, nullptr);
// When the LB policy receives the subchannels' initial connectivity
// state notifications (all IDLE), it will request a connection on the
// first IPv4 subchannel.
Expand All @@ -701,9 +701,9 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_1->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the first IPv6
Expand All @@ -715,8 +715,8 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv4
Expand All @@ -727,8 +727,8 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv6
Expand All @@ -739,7 +739,7 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the third IPv4
Expand All @@ -749,13 +749,106 @@ TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the third IPv6
// This causes the LB policy to start connecting to the fourth IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv6_3->ConnectionRequested());
subchannel_ipv6_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
}

TEST_F(PickFirstTest,
HappyEyeballsAddressInterleavingSecondFamilyHasMoreAddresses) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing two IPv6 addresses followed by four IPv4
// addresses.
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv6:[::1]:444", "ipv6:[::1]:445", "ipv4:127.0.0.1:443",
"ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses.
auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel_ipv6_1, nullptr);
auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel_ipv6_2, nullptr);
auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel_ipv4_1, nullptr);
auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[3]);
ASSERT_NE(subchannel_ipv4_2, nullptr);
auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[4]);
ASSERT_NE(subchannel_ipv4_3, nullptr);
auto* subchannel_ipv4_4 = FindSubchannel(kAddresses[5]);
ASSERT_NE(subchannel_ipv4_4, nullptr);
// When the LB policy receives the subchannels' initial connectivity
// state notifications (all IDLE), it will request a connection on the
// first IPv6 subchannel.
EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_1->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the first IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv6
// subchannel.
EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the third IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_4->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the fourth IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_4->ConnectionRequested());
subchannel_ipv4_4->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
}
Expand Down

0 comments on commit 4826efa

Please sign in to comment.