Skip to content

Commit

Permalink
Merge pull request grpc#9318 from markdroth/subchannel_addr_channel_arg
Browse files Browse the repository at this point in the history
Store subchannel address in a channel arg.
  • Loading branch information
markdroth authored Jan 17, 2017
2 parents c16080c + 31f2dd4 commit c0d7d67
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
3 changes: 0 additions & 3 deletions src/core/ext/client_channel/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ struct grpc_connector {
typedef struct {
/** set of pollsets interested in this connection */
grpc_pollset_set *interested_parties;
/** address to connect to */
const grpc_resolved_address *addr;
size_t addr_len;
/** initial connect string to send */
grpc_slice initial_connect_string;
/** deadline for connection */
Expand Down
57 changes: 48 additions & 9 deletions src/core/ext/client_channel/subchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@

#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include <grpc/support/string_util.h>

#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/initial_connect_string.h"
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/subchannel_index.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
Expand Down Expand Up @@ -95,8 +99,6 @@ struct grpc_subchannel {
size_t num_filters;
/** channel arguments */
grpc_channel_args *args;
/** address to connect to */
grpc_resolved_address *addr;

grpc_subchannel_key *key;

Expand Down Expand Up @@ -211,7 +213,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(exec_ctx, c->args);
gpr_free(c->addr);
grpc_slice_unref_internal(exec_ctx, c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
Expand Down Expand Up @@ -327,12 +328,17 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
} else {
c->filters = NULL;
}
c->addr = gpr_malloc(sizeof(grpc_resolved_address));
if (args->addr->len)
memcpy(c->addr, args->addr, sizeof(grpc_resolved_address));
c->pollset_set = grpc_pollset_set_create();
grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args);
grpc_resolved_address *addr = gpr_malloc(sizeof(*addr));
grpc_get_subchannel_address_arg(args->args, addr);
grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
gpr_free(addr);
c->args = grpc_channel_args_copy_and_add_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
gpr_free(new_arg.value.string);

c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c,
Expand Down Expand Up @@ -385,7 +391,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
grpc_connect_in_args args;

args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.deadline = c->next_attempt;
args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string;
Expand Down Expand Up @@ -771,3 +776,37 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}

static void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) {
grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (strcmp(uri->scheme, "ipv4") == 0) {
GPR_ASSERT(parse_ipv4(uri, addr));
} else if (strcmp(uri->scheme, "ipv6") == 0) {
GPR_ASSERT(parse_ipv6(uri, addr));
} else {
GPR_ASSERT(parse_unix(uri, addr));
}
grpc_uri_destroy(uri);
}

void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
grpc_resolved_address *addr) {
const grpc_arg *addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
memset(addr, 0, sizeof(*addr));
if (*addr_arg->value.string != '\0') {
grpc_uri_to_sockaddr(addr_arg->value.string, addr);
}
}

grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) {
grpc_arg new_arg;
new_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS;
new_arg.type = GRPC_ARG_STRING;
new_arg.value.string =
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("");
return new_arg;
}
13 changes: 11 additions & 2 deletions src/core/ext/client_channel/subchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"

// Channel arg containing a grpc_resolved_address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"

/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
Expand Down Expand Up @@ -164,13 +167,19 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
/** Address to connect to */
grpc_resolved_address *addr;
};

/** create a subchannel given a connector */
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
const grpc_subchannel_args *args);

/// Sets \a addr from \a args.
void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
grpc_resolved_address *addr);

/// Returns a new channel arg encoding the subchannel address as a string.
/// Caller is responsible for freeing the string.
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr);

#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */
12 changes: 0 additions & 12 deletions src/core/ext/client_channel/subchannel_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
k->args.addr->len = args->addr->len;
if (k->args.addr->len > 0) {
memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address));
}
k->args.args = copy_channel_args(args->args);
return k;
}
Expand All @@ -108,14 +103,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
grpc_subchannel_key *b) {
int c = GPR_ICMP(a->connector, b->connector);
if (c != 0) return c;
c = GPR_ICMP(a->args.addr->len, b->args.addr->len);
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
if (a->args.addr->len) {
c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
if (c != 0) return c;
}
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
a->args.filter_count * sizeof(*a->args.filters));
Expand All @@ -129,7 +118,6 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free(k->args.addr);
gpr_free(k);
}

Expand Down
12 changes: 9 additions & 3 deletions src/core/ext/lb_policy/pick_first/pick_first.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
#include <grpc/support/alloc.h>

#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"

typedef struct pending_pick {
Expand Down Expand Up @@ -466,11 +468,15 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}

memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;

grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);

if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
Expand Down
12 changes: 9 additions & 3 deletions src/core/ext/lb_policy/round_robin/round_robin.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
#include <grpc/support/alloc.h>

#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"

Expand Down Expand Up @@ -729,11 +731,15 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (addresses->addresses[i].is_balancer) continue;

memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;

grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
grpc_channel_args_destroy(exec_ctx, new_args);

if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
Expand Down
7 changes: 5 additions & 2 deletions src/core/ext/transport/chttp2/client/chttp2_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

#include "src/core/ext/client_channel/connector.h"
#include "src/core/ext/client_channel/http_connect_handshaker.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
Expand Down Expand Up @@ -220,6 +221,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
grpc_connect_out_args *result,
grpc_closure *notify) {
chttp2_connector *c = (chttp2_connector *)con;
grpc_resolved_address addr;
grpc_get_subchannel_address_arg(args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
Expand All @@ -231,8 +234,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
args->interested_parties, args->channel_args,
args->addr, args->deadline);
args->interested_parties, args->channel_args, &addr,
args->deadline);
gpr_mu_unlock(&c->mu);
}

Expand Down

0 comments on commit c0d7d67

Please sign in to comment.