Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
makdharma committed Aug 10, 2016
1 parent f075064 commit 198f8b0
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions src/core/ext/transport/cronet/transport/cronet_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum OP_ID {
OP_CANCEL_ERROR,
OP_ON_COMPLETE,
OP_FAILED,
OP_SUCCEEDED,
OP_CANCELED,
OP_RECV_MESSAGE_AND_ON_COMPLETE,
OP_READ_REQ_MADE,
Expand All @@ -91,6 +92,7 @@ const char *op_id_string[] = {
"OP_CANCEL_ERROR",
"OP_ON_COMPLETE",
"OP_FAILED",
"OP_SUCCEEDED",
"OP_CANCELED",
"OP_RECV_MESSAGE_AND_ON_COMPLETE",
"OP_READ_REQ_MADE",
Expand Down Expand Up @@ -189,6 +191,8 @@ struct stream_obj {
grpc_stream *curr_gs;
cronet_bidirectional_stream *cbs;

// Used for executing callbacks for ops
grpc_exec_ctx exec_ctx;
// This holds the state that is at stream level (response and req metadata)
struct op_state state;

Expand Down Expand Up @@ -227,7 +231,10 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
static void execute_from_storage(stream_obj *s) {
// Cycle through ops and try to take next action. Break when either
// an action with callback is taken, or no action is possible.
gpr_mu_lock(&s->mu);
// This can be executed from the Cronet network thread via cronet callback
// or on the application supplied thread via the perform_stream_op function.
if (1) {//gpr_mu_lock(&s->mu) == 0) {
gpr_mu_lock(&s->mu);
for (int i = 0; i < s->storage.wrptr; ) {
CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i, s->storage.pending_ops[i].done);
if (s->storage.pending_ops[i].done) {
Expand All @@ -242,7 +249,9 @@ static void execute_from_storage(stream_obj *s) {
break;
}
}
gpr_mu_unlock(&s->mu);
gpr_mu_unlock(&s->mu);
}
grpc_exec_ctx_finish(&s->exec_ctx);
}


Expand Down Expand Up @@ -271,7 +280,9 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
execute_from_storage(s);
}

static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
Expand Down Expand Up @@ -380,13 +391,6 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
memcpy(p, GPR_SLICE_START_PTR(slice), length);
}

static void enqueue_callback(grpc_closure *callback, grpc_error *error) {
GPR_ASSERT(callback);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_exec_ctx_sched(&exec_ctx, callback, error, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}

static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head,
const char *host,
Expand Down Expand Up @@ -498,9 +502,10 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st
// we haven't sent initial metadata yet
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
// we haven't sent message yet
// TODO: Streaming Write case is a problem. What if there is an outstanding write (2nd, 3rd,..) present.
else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
// we haven't got on_write_completed for the send yet
else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
else if (stream_state->state_op_done[OP_SEND_MESSAGE] && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
} else if (op_id == OP_CANCEL_ERROR) {
// already executed
if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
Expand All @@ -510,18 +515,20 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st
// Check if every op that was asked for is done.
else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
else if (curr_op->send_trailing_metadata && !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
else if (curr_op->recv_initial_metadata && !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
else if (curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE]) result = false;
else if (curr_op->recv_trailing_metadata) {
//if (!stream_state->state_op_done[OP_SUCCEEDED]) result = false; gpr_log(GPR_DEBUG, "HACK!!");
// We aren't done with trailing metadata yet
if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
// We've asked for actual message in an earlier op, and it hasn't been delivered yet.
// (TODO: What happens when multiple messages are asked for? How do you know when last message arrived?)
else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
// If this op is not the one asking for read, (which means some earlier op has asked), and the
// read hasn't been delivered.
if(!curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE]) result = false;
if(!curr_op->recv_message && !stream_state->state_op_done[OP_SUCCEEDED]) result = false;
}
}
// We should see at least one on_write_completed for the trailers that we sent
Expand Down Expand Up @@ -563,9 +570,9 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_chttp2_incoming_metadata_buffer_publish(&oas->s->state.rs.initial_metadata,
stream_op->recv_initial_metadata);
enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL);
} else {
enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
Expand Down Expand Up @@ -595,7 +602,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
} else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_CANCELLED);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) {
Expand All @@ -620,7 +627,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
result = ACTION_TAKEN_NO_CALLBACK;
Expand All @@ -645,7 +652,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
// Clear read state of the stream, so next read op (if it were to come) will work
Expand Down Expand Up @@ -682,7 +689,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
// All ops are complete. Call the on_complete callback
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
//CRONET_LOG(GPR_DEBUG, "calling on_complete");
enqueue_callback(stream_op->on_complete, GRPC_ERROR_NONE);
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, NULL);
// Instead of setting stream state, use the op state as on_complete is on per op basis
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true; // Mark this op as completed
Expand Down Expand Up @@ -714,6 +721,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
memset(s->state.state_callback_received, 0, sizeof(s->state.state_callback_received));
gpr_mu_init(&s->mu);
s->exec_ctx = *exec_ctx;
return 0;
}

Expand Down

0 comments on commit 198f8b0

Please sign in to comment.