Skip to content

Commit

Permalink
Merge pull request grpc#22749 from donnadionne/new
Browse files Browse the repository at this point in the history
Integrating weighted target policy into xds routing policy
  • Loading branch information
donnadionne authored May 12, 2020
2 parents 576f478 + 9ad561c commit b1425e2
Show file tree
Hide file tree
Showing 7 changed files with 908 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
targets_[name]->UpdateLocked(config, std::move(address_map[name]),
args.args);
}
UpdateStateLocked();
}

void WeightedTargetLb::UpdateStateLocked() {
Expand Down
101 changes: 74 additions & 27 deletions src/core/ext/filters/client_channel/xds/xds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ inline absl::string_view UpbStringToAbsl(const upb_strview& str) {
return absl::string_view(str.data, str.size);
}

inline std::string UpbStringToStdString(const upb_strview& str) {
return std::string(str.data, str.size);
}

inline void AddStringField(const char* name, const upb_strview& value,
std::vector<std::string>* fields,
bool add_if_empty = false) {
Expand Down Expand Up @@ -1023,7 +1027,7 @@ grpc_error* RouteConfigParse(
const envoy_api_v2_route_Route* route = routes[i];
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
XdsApi::RdsRoute rds_route;
XdsApi::RdsUpdate::RdsRoute rds_route;
if (envoy_api_v2_route_RouteMatch_has_prefix(match)) {
upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match);
// Empty prefix "" is accepted.
Expand Down Expand Up @@ -1085,18 +1089,65 @@ grpc_error* RouteConfigParse(
}
const envoy_api_v2_route_RouteAction* route_action =
envoy_api_v2_route_Route_route(route);
// Get the cluster in the RouteAction.
if (!envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No cluster found in RouteAction.");
}
const upb_strview action =
envoy_api_v2_route_RouteAction_cluster(route_action);
if (action.size == 0) {
// Get the cluster or weighted_clusters in the RouteAction.
if (envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
const upb_strview cluster_name =
envoy_api_v2_route_RouteAction_cluster(route_action);
if (cluster_name.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction cluster contains empty cluster name.");
}
rds_route.cluster_name = UpbStringToStdString(cluster_name);
} else if (envoy_api_v2_route_RouteAction_has_weighted_clusters(
route_action)) {
const envoy_api_v2_route_WeightedCluster* weighted_cluster =
envoy_api_v2_route_RouteAction_weighted_clusters(route_action);
uint32_t total_weight = 100;
const google_protobuf_UInt32Value* weight =
envoy_api_v2_route_WeightedCluster_total_weight(weighted_cluster);
if (weight != nullptr) {
total_weight = google_protobuf_UInt32Value_value(weight);
}
size_t clusters_size;
const envoy_api_v2_route_WeightedCluster_ClusterWeight* const* clusters =
envoy_api_v2_route_WeightedCluster_clusters(weighted_cluster,
&clusters_size);
uint32_t sum_of_weights = 0;
for (size_t j = 0; j < clusters_size; ++j) {
const envoy_api_v2_route_WeightedCluster_ClusterWeight* cluster_weight =
clusters[j];
XdsApi::RdsUpdate::RdsRoute::ClusterWeight cluster;
cluster.name = UpbStringToStdString(
envoy_api_v2_route_WeightedCluster_ClusterWeight_name(
cluster_weight));
if (cluster.name.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster cluster contains empty cluster "
"name.");
}
const google_protobuf_UInt32Value* weight =
envoy_api_v2_route_WeightedCluster_ClusterWeight_weight(
cluster_weight);
if (weight == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster cluster missing weight");
}
cluster.weight = google_protobuf_UInt32Value_value(weight);
sum_of_weights += cluster.weight;
rds_route.weighted_clusters.emplace_back(std::move(cluster));
}
if (total_weight != sum_of_weights) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster has incorrect total weight");
}
if (rds_route.weighted_clusters.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction weighted_cluster has no valid clusters specified.");
}
} else {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction contains empty cluster.");
"No cluster or weighted_clusters found in RouteAction.");
}
rds_route.cluster_name = std::string(action.data, action.size);
rds_update->routes.emplace_back(std::move(rds_route));
}
if (rds_update->routes.empty()) {
Expand Down Expand Up @@ -1190,12 +1241,10 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
"HttpConnectionManager ConfigSource for RDS does not specify ADS.");
}
// Get the route_config_name.
const upb_strview route_config_name =
envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
rds);
lds_update->emplace();
(*lds_update)->route_config_name =
std::string(route_config_name.data, route_config_name.size);
(*lds_update)->route_config_name = UpbStringToStdString(
envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name(
rds));
return GRPC_ERROR_NONE;
}
return GRPC_ERROR_NONE;
Expand Down Expand Up @@ -1297,8 +1346,7 @@ grpc_error* CdsResponseParse(
upb_strview service_name =
envoy_api_v2_Cluster_EdsClusterConfig_service_name(eds_cluster_config);
if (service_name.size != 0) {
cds_update.eds_service_name =
std::string(service_name.data, service_name.size);
cds_update.eds_service_name = UpbStringToStdString(service_name);
}
// Check the LB policy.
if (envoy_api_v2_Cluster_lb_policy(cluster) !=
Expand All @@ -1316,7 +1364,7 @@ grpc_error* CdsResponseParse(
}
cds_update.lrs_load_reporting_server_name.emplace("");
}
cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
cds_update_map->emplace(UpbStringToStdString(cluster_name),
std::move(cds_update));
}
return GRPC_ERROR_NONE;
Expand Down Expand Up @@ -1377,8 +1425,8 @@ grpc_error* LocalityParse(
upb_strview zone = envoy_api_v2_core_Locality_region(locality);
upb_strview sub_zone = envoy_api_v2_core_Locality_sub_zone(locality);
output_locality->name = MakeRefCounted<XdsLocalityName>(
std::string(region.data, region.size), std::string(zone.data, zone.size),
std::string(sub_zone.data, sub_zone.size));
UpbStringToStdString(region), UpbStringToStdString(zone),
UpbStringToStdString(sub_zone));
// Parse the addresses.
size_t size;
const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
Expand Down Expand Up @@ -1428,8 +1476,7 @@ grpc_error* DropParseAndAppend(
}
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
drop_config->AddCategory(std::string(category.data, category.size),
numerator);
drop_config->AddCategory(UpbStringToStdString(category), numerator);
return GRPC_ERROR_NONE;
}

Expand Down Expand Up @@ -1508,7 +1555,7 @@ grpc_error* EdsResponseParse(
if (error != GRPC_ERROR_NONE) return error;
}
}
eds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
eds_update_map->emplace(UpbStringToStdString(cluster_name),
std::move(eds_update));
}
return GRPC_ERROR_NONE;
Expand Down Expand Up @@ -1542,12 +1589,12 @@ grpc_error* XdsApi::ParseAdsResponse(
// Record the type_url, the version_info, and the nonce of the response.
upb_strview type_url_strview =
envoy_api_v2_DiscoveryResponse_type_url(response);
*type_url = std::string(type_url_strview.data, type_url_strview.size);
*type_url = UpbStringToStdString(type_url_strview);
upb_strview version_info =
envoy_api_v2_DiscoveryResponse_version_info(response);
*version = std::string(version_info.data, version_info.size);
*version = UpbStringToStdString(version_info);
upb_strview nonce_strview = envoy_api_v2_DiscoveryResponse_nonce(response);
*nonce = std::string(nonce_strview.data, nonce_strview.size);
*nonce = UpbStringToStdString(nonce_strview);
// Parse the response according to the resource type.
if (*type_url == kLdsTypeUrl) {
return LdsResponseParse(client_, tracer_, response, expected_server_name,
Expand Down
35 changes: 24 additions & 11 deletions src/core/ext/filters/client_channel/xds/xds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,31 @@ class XdsApi {
static const char* kCdsTypeUrl;
static const char* kEdsTypeUrl;

struct RdsRoute {
std::string service;
std::string method;
std::string cluster_name;

bool operator==(const RdsRoute& other) const {
return (service == other.service && method == other.method &&
cluster_name == other.cluster_name);
}
};

struct RdsUpdate {
struct RdsRoute {
std::string service;
std::string method;
// TODO(donnadionne): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of cluster_name and
// weighted_clusters can be set.
std::string cluster_name;
struct ClusterWeight {
std::string name;
uint32_t weight;

bool operator==(const ClusterWeight& other) const {
return (name == other.name && weight == other.weight);
}
};
std::vector<ClusterWeight> weighted_clusters;

bool operator==(const RdsRoute& other) const {
return (service == other.service && method == other.method &&
cluster_name == other.cluster_name &&
weighted_clusters == other.weighted_clusters);
}
};

std::vector<RdsRoute> routes;

bool operator==(const RdsUpdate& other) const {
Expand Down
Loading

0 comments on commit b1425e2

Please sign in to comment.