Skip to content

Commit

Permalink
Merge pull request grpc#8795 from markdroth/client_channel_init_cleanup
Browse files Browse the repository at this point in the history
Client channel init cleanup
  • Loading branch information
markdroth authored Dec 15, 2016
2 parents 2249e92 + f843914 commit 5904a95
Show file tree
Hide file tree
Showing 47 changed files with 405 additions and 338 deletions.
10 changes: 2 additions & 8 deletions include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,12 @@ typedef struct {
/** If non-zero, allow the use of SO_REUSEPORT if it's available (default 1) */
#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
/** If non-zero, a pointer to a buffer pool (use grpc_resource_quota_arg_vtable
to fetch an appropriate pointer arg vtable */
to fetch an appropriate pointer arg vtable) */
#define GRPC_ARG_RESOURCE_QUOTA "grpc.resource_quota"
/** Service config data, to be passed to subchannels.
Not intended for external use. */
/** Service config data in JSON form. Not intended for use outside of tests. */
#define GRPC_ARG_SERVICE_CONFIG "grpc.service_config"
/** LB policy name. */
#define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name"
/** Server name. Not intended for external use. */
#define GRPC_ARG_SERVER_NAME "grpc.server_name"
/** Resolved addresses in a form used by the LB policy.
Not intended for external use. */
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
/** The grpc_socket_mutator instance that set the socket options. A pointer. */
#define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator"
/** \} */
Expand Down
7 changes: 4 additions & 3 deletions src/core/ext/census/grpc_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,12 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
}

static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(chand != NULL);
return GRPC_ERROR_NONE;
}

static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
Expand Down
56 changes: 23 additions & 33 deletions src/core/ext/client_channel/client_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <grpc/support/useful.h>

#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
Expand Down Expand Up @@ -499,24 +500,37 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
}

/* Constructor for channel_data */
static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;

memset(chand, 0, sizeof(*chand));

GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);

// Initialize data members.
gpr_mu_init(&chand->mu);
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed, chand);
chand->owning_stack = args->channel_stack;

chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
chand->interested_parties = grpc_pollset_set_create();
// Record client channel factory.
const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
grpc_client_channel_factory_ref(arg->value.pointer.p);
chand->client_channel_factory = arg->value.pointer.p;
// Instantiate resolver.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
chand->resolver = grpc_resolver_create(arg->value.string, args->channel_args);
if (chand->resolver == NULL) {
return GRPC_ERROR_CREATE("resolver creation failed");
}
return GRPC_ERROR_NONE;
}

/* Destructor for channel_data */
Expand Down Expand Up @@ -1135,30 +1149,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};

void grpc_client_channel_finish_initialization(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
grpc_resolver *resolver,
grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
chand->client_channel_factory = client_channel_factory;
grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}

grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
Expand Down
10 changes: 3 additions & 7 deletions src/core/ext/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
#include "src/core/ext/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"

// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"

/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
connected to again later.
Expand All @@ -47,13 +50,6 @@

extern const grpc_channel_filter grpc_client_channel_filter;

/* Post-construction initializer to give the client channel its resolver
and factory. */
void grpc_client_channel_finish_initialization(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
grpc_resolver *resolver,
grpc_client_channel_factory *client_channel_factory);

grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);

Expand Down
32 changes: 32 additions & 0 deletions src/core/ext/client_channel/client_channel_factory.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,35 @@ grpc_channel* grpc_client_channel_factory_create_channel(
return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
args);
}

static void* factory_arg_copy(void* factory) {
grpc_client_channel_factory_ref(factory);
return factory;
}

static void factory_arg_destroy(void* factory) {
// TODO(roth): Remove local exec_ctx when
// https://github.com/grpc/grpc/pull/8705 is merged.
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_client_channel_factory_unref(&exec_ctx, factory);
grpc_exec_ctx_finish(&exec_ctx);
}

static int factory_arg_cmp(void* factory1, void* factory2) {
if (factory1 < factory2) return -1;
if (factory1 > factory2) return 1;
return 0;
}

static const grpc_arg_pointer_vtable factory_arg_vtable = {
factory_arg_copy, factory_arg_destroy, factory_arg_cmp};

grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory* factory) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_CLIENT_CHANNEL_FACTORY;
arg.value.pointer.p = factory;
arg.value.pointer.vtable = &factory_arg_vtable;
return arg;
}
6 changes: 6 additions & 0 deletions src/core/ext/client_channel/client_channel_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"

// Channel arg key for client channel factory.
#define GRPC_ARG_CLIENT_CHANNEL_FACTORY "grpc.client_channel_factory"

typedef struct grpc_client_channel_factory grpc_client_channel_factory;
typedef struct grpc_client_channel_factory_vtable
grpc_client_channel_factory_vtable;
Expand Down Expand Up @@ -83,4 +86,7 @@ grpc_channel *grpc_client_channel_factory_create_channel(
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args);

grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory *factory);

#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */
31 changes: 20 additions & 11 deletions src/core/ext/client_channel/http_connect_handshaker.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>

#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
Expand All @@ -51,7 +53,6 @@ typedef struct http_connect_handshaker {
grpc_handshaker base;

char* proxy_server;
char* server_name;

gpr_refcount refcount;
gpr_mu mu;
Expand Down Expand Up @@ -86,7 +87,6 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
gpr_free(handshaker->read_buffer_to_destroy);
}
gpr_free(handshaker->proxy_server);
gpr_free(handshaker->server_name);
grpc_slice_buffer_destroy(&handshaker->write_buffer);
grpc_http_parser_destroy(&handshaker->http_parser);
grpc_http_response_destroy(&handshaker->http_response);
Expand Down Expand Up @@ -265,18 +265,27 @@ static void http_connect_handshaker_do_handshake(
grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done,
grpc_handshaker_args* args) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
// Get server name from channel args.
const grpc_arg* arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
char* canonical_uri =
grpc_resolver_factory_add_default_prefix_if_needed(arg->value.string);
grpc_uri* uri = grpc_uri_parse(canonical_uri, 1);
char* server_name = uri->path;
if (server_name[0] == '/') ++server_name;
// Save state in the handshaker object.
gpr_mu_lock(&handshaker->mu);
handshaker->args = args;
handshaker->on_handshake_done = on_handshake_done;
// Send HTTP CONNECT request.
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
handshaker->server_name, handshaker->proxy_server);
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", server_name,
handshaker->proxy_server);
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
request.host = handshaker->proxy_server;
request.host = server_name;
request.http.method = "CONNECT";
request.http.path = handshaker->server_name;
request.http.path = server_name;
request.handshaker = &grpc_httpcli_plaintext;
grpc_slice request_slice = grpc_httpcli_format_connect_request(&request);
grpc_slice_buffer_add(&handshaker->write_buffer, request_slice);
Expand All @@ -285,23 +294,23 @@ static void http_connect_handshaker_do_handshake(
grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer,
&handshaker->request_done_closure);
gpr_mu_unlock(&handshaker->mu);
// Clean up.
gpr_free(canonical_uri);
grpc_uri_destroy(uri);
}

static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
http_connect_handshaker_do_handshake};

grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
const char* server_name) {
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
GPR_ASSERT(proxy_server != NULL);
GPR_ASSERT(server_name != NULL);
http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
gpr_mu_init(&handshaker->mu);
gpr_ref_init(&handshaker->refcount, 1);
handshaker->proxy_server = gpr_strdup(proxy_server);
handshaker->server_name = gpr_strdup(server_name);
grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done,
handshaker);
Expand Down
5 changes: 2 additions & 3 deletions src/core/ext/client_channel/http_connect_handshaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@

#include "src/core/lib/channel/handshaker.h"

/// Does NOT take ownership of \a proxy_server or \a server_name.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
const char* server_name);
/// Does NOT take ownership of \a proxy_server.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server);

/// Returns the name of the proxy to use, or NULL if no proxy is configured.
/// Caller takes ownership of result.
Expand Down
3 changes: 3 additions & 0 deletions src/core/ext/client_channel/lb_policy_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"

// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"

typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;

Expand Down
32 changes: 23 additions & 9 deletions src/core/ext/client_channel/resolver_registry.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,46 +109,60 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) {
}

static grpc_resolver_factory *resolve_factory(const char *target,
grpc_uri **uri) {
char *tmp;
grpc_uri **uri,
char **canonical_target) {
grpc_resolver_factory *factory = NULL;

GPR_ASSERT(uri != NULL);
*uri = grpc_uri_parse(target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(*uri);
gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
*uri = grpc_uri_parse(tmp, 1);
gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target);
*uri = grpc_uri_parse(*canonical_target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(grpc_uri_parse(target, 0));
grpc_uri_destroy(grpc_uri_parse(tmp, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp);
grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
*canonical_target);
}
gpr_free(tmp);
}
return factory;
}

grpc_resolver *grpc_resolver_create(const char *target,
const grpc_channel_args *args) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
char *canonical_target = NULL;
grpc_resolver_factory *factory =
resolve_factory(target, &uri, &canonical_target);
grpc_resolver *resolver;
grpc_resolver_args resolver_args;
memset(&resolver_args, 0, sizeof(resolver_args));
resolver_args.uri = uri;
resolver_args.args = args;
resolver = grpc_resolver_factory_create_resolver(factory, &resolver_args);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return resolver;
}

char *grpc_get_default_authority(const char *target) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
char *canonical_target = NULL;
grpc_resolver_factory *factory =
resolve_factory(target, &uri, &canonical_target);
char *authority = grpc_resolver_factory_get_default_authority(factory, uri);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return authority;
}

char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
resolve_factory(target, &uri, &canonical_target);
grpc_uri_destroy(uri);
return canonical_target == NULL ? gpr_strdup(target) : canonical_target;
}
4 changes: 4 additions & 0 deletions src/core/ext/client_channel/resolver_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
representing the default authority to pass from a client. */
char *grpc_get_default_authority(const char *target);

/** Returns a newly allocated string containing \a target, adding the
default prefix if needed. */
char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target);

#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
Loading

0 comments on commit 5904a95

Please sign in to comment.