Skip to content

Commit

Permalink
Merge pull request grpc#8979 from markdroth/handshaker_cleanup
Browse files Browse the repository at this point in the history
Address comments from grpc#8913.  Also make changes needed for import.
  • Loading branch information
markdroth authored Dec 9, 2016
2 parents d03dfcc + 96ba68d commit be37dff
Show file tree
Hide file tree
Showing 25 changed files with 148 additions and 114 deletions.
18 changes: 9 additions & 9 deletions src/core/ext/transport/chttp2/client/chttp2_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ typedef struct {
bool connecting;

char *server_name;
grpc_chttp2_create_handshakers_func create_handshakers;
void *create_handshakers_user_data;
grpc_chttp2_add_handshakers_func add_handshakers;
void *add_handshakers_user_data;

grpc_closure *notify;
grpc_connect_in_args args;
Expand Down Expand Up @@ -160,9 +160,9 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
grpc_http_connect_handshaker_create(proxy_name, c->server_name));
gpr_free(proxy_name);
}
if (c->create_handshakers != NULL) {
c->create_handshakers(exec_ctx, c->create_handshakers_user_data,
c->handshake_mgr);
if (c->add_handshakers != NULL) {
c->add_handshakers(exec_ctx, c->add_handshakers_user_data,
c->handshake_mgr);
}
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
Expand Down Expand Up @@ -255,15 +255,15 @@ static const grpc_connector_vtable chttp2_connector_vtable = {

grpc_connector *grpc_chttp2_connector_create(
grpc_exec_ctx *exec_ctx, const char *server_name,
grpc_chttp2_create_handshakers_func create_handshakers,
void *create_handshakers_user_data) {
grpc_chttp2_add_handshakers_func add_handshakers,
void *add_handshakers_user_data) {
chttp2_connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
c->base.vtable = &chttp2_connector_vtable;
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
c->server_name = gpr_strdup(server_name);
c->create_handshakers = create_handshakers;
c->create_handshakers_user_data = create_handshakers_user_data;
c->add_handshakers = add_handshakers;
c->add_handshakers_user_data = add_handshakers_user_data;
return &c->base;
}
10 changes: 5 additions & 5 deletions src/core/ext/transport/chttp2/client/chttp2_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/exec_ctx.h"

typedef void (*grpc_chttp2_create_handshakers_func)(
typedef void (*grpc_chttp2_add_handshakers_func)(
grpc_exec_ctx* exec_ctx, void* user_data,
grpc_handshake_manager* handshake_mgr);

/// If \a create_handshakers is non-NULL, it will be called with
/// \a create_handshakers_user_data to add handshakers.
/// If \a add_handshakers is non-NULL, it will be called with
/// \a add_handshakers_user_data to add handshakers.
grpc_connector* grpc_chttp2_connector_create(
grpc_exec_ctx* exec_ctx, const char* server_name,
grpc_chttp2_create_handshakers_func create_handshakers,
void* create_handshakers_user_data);
grpc_chttp2_add_handshakers_func add_handshakers,
void* add_handshakers_user_data);

#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, NULL /* create_handshakers */,
exec_ctx, args->server_name, NULL /* add_handshakers */,
NULL /* user_data */);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,18 @@ static void client_channel_factory_unref(
}
}

static void create_handshakers(grpc_exec_ctx *exec_ctx,
void *security_connector,
grpc_handshake_manager *handshake_mgr) {
grpc_channel_security_connector_create_handshakers(
exec_ctx, security_connector, handshake_mgr);
static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector,
grpc_handshake_manager *handshake_mgr) {
grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector,
handshake_mgr);
}

static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, create_handshakers, f->security_connector);
exec_ctx, args->server_name, add_handshakers, f->security_connector);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
return s;
Expand Down
10 changes: 6 additions & 4 deletions src/core/ext/transport/chttp2/server/chttp2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"

void grpc_chttp2_server_handshaker_factory_create_handshakers(
void grpc_chttp2_server_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr) {
if (handshaker_factory != NULL) {
handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory,
handshake_mgr);
handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory,
handshake_mgr);
}
}

Expand Down Expand Up @@ -172,6 +172,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&connection_state->server_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
gpr_free(connection_state->acceptor);
gpr_free(connection_state);
}

Expand All @@ -183,6 +184,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
if (state->shutdown) {
gpr_mu_unlock(&state->mu);
grpc_endpoint_destroy(exec_ctx, tcp);
gpr_free(acceptor);
return;
}
grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create();
Expand All @@ -195,7 +197,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
grpc_chttp2_server_handshaker_factory_create_handshakers(
grpc_chttp2_server_handshaker_factory_add_handshakers(
exec_ctx, state->handshaker_factory, connection_state->handshake_mgr);
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chttp2/server/chttp2_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ typedef struct grpc_chttp2_server_handshaker_factory
grpc_chttp2_server_handshaker_factory;

typedef struct {
void (*create_handshakers)(
void (*add_handshakers)(
grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr);
Expand All @@ -57,7 +57,7 @@ struct grpc_chttp2_server_handshaker_factory {
const grpc_chttp2_server_handshaker_factory_vtable *vtable;
};

void grpc_chttp2_server_handshaker_factory_create_handshakers(
void grpc_chttp2_server_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ typedef struct {
grpc_server_security_connector *security_connector;
} server_security_handshaker_factory;

static void server_security_handshaker_factory_create_handshakers(
static void server_security_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf,
grpc_handshake_manager *handshake_mgr) {
server_security_handshaker_factory *handshaker_factory =
(server_security_handshaker_factory *)hf;
grpc_server_security_connector_create_handshakers(
grpc_server_security_connector_add_handshakers(
exec_ctx, handshaker_factory->security_connector, handshake_mgr);
}

Expand All @@ -74,7 +74,7 @@ static void server_security_handshaker_factory_destroy(

static const grpc_chttp2_server_handshaker_factory_vtable
server_security_handshaker_factory_vtable = {
server_security_handshaker_factory_create_handshakers,
server_security_handshaker_factory_add_handshakers,
server_security_handshaker_factory_destroy};

int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
Expand Down
18 changes: 9 additions & 9 deletions src/core/lib/channel/handshaker.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
handshaker->vtable = vtable;
}

static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
handshaker->vtable->destroy(exec_ctx, handshaker);
}

static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker);
}

static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args) {
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor,
on_handshake_done, args);
}
Expand Down
10 changes: 10 additions & 0 deletions src/core/lib/channel/handshaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ struct grpc_handshaker {
void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker);

void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args);

///
/// grpc_handshake_manager
///
Expand Down
17 changes: 9 additions & 8 deletions src/core/lib/http/httpcli_security_connector.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
gpr_free(sc);
}

static void httpcli_ssl_create_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) {
static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
tsi_handshaker *handshaker = NULL;
Expand All @@ -74,8 +74,9 @@ static void httpcli_ssl_create_handshakers(
tsi_result_to_string(result));
}
}
grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base,
handshake_mgr);
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
}

static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
Expand Down Expand Up @@ -132,7 +133,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
c->base.create_handshakers = httpcli_ssl_create_handshakers;
c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
}
Expand Down Expand Up @@ -185,8 +186,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
grpc_channel_security_connector_create_handshakers(exec_ctx, sc,
c->handshake_mgr);
grpc_channel_security_connector_add_handshakers(exec_ctx, sc,
c->handshake_mgr);
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline,
NULL /* acceptor */, on_handshake_done, c /* user_data */);
Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/iomgr/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index;
} grpc_tcp_server_acceptor;

/* Called for newly connected TCP connections. */
/* Called for newly connected TCP connections.
Takes ownership of acceptor. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
grpc_pollset *accepting_pollset,
Expand Down
16 changes: 9 additions & 7 deletions src/core/lib/iomgr/tcp_server_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,12 @@ static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr,
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;

if (err != GRPC_ERROR_NONE) {
goto error;
}

read_notifier_pollset =
grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
Expand Down Expand Up @@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}

fdobj = grpc_fd_create(fd, name);
grpc_fd *fdobj = grpc_fd_create(fd, name);

if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
Expand All @@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {

grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);

// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = sp->fd_index;

sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor);
read_notifier_pollset, acceptor);

gpr_free(name);
gpr_free(addr_str);
Expand Down
9 changes: 7 additions & 2 deletions src/core/lib/iomgr/tcp_server_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) {

static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
uv_tcp_t *client;
grpc_endpoint *ep = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Expand All @@ -201,6 +200,7 @@ static void on_connect(uv_stream_t *server, int status) {
uv_strerror(status));
return;
}

client = gpr_malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed
Expand All @@ -220,8 +220,13 @@ static void on_connect(uv_stream_t *server, int status) {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
}
ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor);
acceptor);
grpc_exec_ctx_finish(&exec_ctx);
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/core/lib/iomgr/tcp_server_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
Expand Down Expand Up @@ -396,8 +395,13 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) {
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor);
acceptor);
}
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
Expand Down
Loading

0 comments on commit be37dff

Please sign in to comment.