Skip to content

Commit

Permalink
Resolve race between OnReadDone(ok=false) and IsCancelled (grpc#26245)
Browse files Browse the repository at this point in the history
* Resolve and test race between OnReadDone(ok=false) and IsCancelled

* Fix retry case

* Fix health check case

* Address reviewer comments.

* ADD TODO requested by markdroth
  • Loading branch information
vjpai authored May 19, 2021
1 parent 113cd95 commit 0f80378
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 4 deletions.
6 changes: 6 additions & 0 deletions include/grpcpp/impl/codegen/server_callback_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
read_tag_.Set(
call_.call(),
[this, reactor](bool ok) {
if (GPR_UNLIKELY(!ok)) {
ctx_->MaybeMarkCancelledOnRead();
}
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
Expand Down Expand Up @@ -831,6 +834,9 @@ class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
read_tag_.Set(
call_.call(),
[this, reactor](bool ok) {
if (GPR_UNLIKELY(!ok)) {
ctx_->MaybeMarkCancelledOnRead();
}
reactor->OnReadDone(ok);
this->MaybeDone(/*inlineable_ondone=*/true);
},
Expand Down
5 changes: 5 additions & 0 deletions include/grpcpp/impl/codegen/server_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ class ServerContextBase {
message_allocator_state_ = allocator_state;
}

void MaybeMarkCancelledOnRead();

struct CallWrapper {
~CallWrapper();

Expand Down Expand Up @@ -524,6 +526,9 @@ class ServerContextBase {
typename std::aligned_storage<sizeof(Reactor), alignof(Reactor)>::type
default_reactor_;
std::atomic_bool default_reactor_used_{false};

std::atomic_bool marked_cancelled_{false};

std::unique_ptr<TestServerCallbackUnary> test_unary_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ void HealthCheckClient::CallState::StartCall() {
batch_.recv_initial_metadata = true;
// Add recv_message op.
payload_.recv_message.recv_message = &recv_message_;
payload_.recv_message.call_failed_before_recv_message = nullptr;
// recv_message callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
Expand Down Expand Up @@ -478,6 +479,7 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(
// callbacks from the original batch have completed yet.
recv_message_batch_.payload = &payload_;
payload_.recv_message.recv_message = &recv_message_;
payload_.recv_message.call_failed_before_recv_message = nullptr;
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
recv_message_batch_.recv_message = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_slice_buffer recv_message_buffer_;
Atomic<bool> seen_response_{false};

// True if the cancel_stream batch has been started.
Atomic<bool> cancelled_{false};

// recv_trailing_metadata
grpc_metadata_batch recv_trailing_metadata_;
grpc_transport_stream_stats collect_stats_;
grpc_closure recv_trailing_metadata_ready_;

// True if the cancel_stream batch has been started.
Atomic<bool> cancelled_{false};

// Closure for call stack destruction.
grpc_closure after_call_stack_destruction_;
};
Expand Down
1 change: 1 addition & 0 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
++call_attempt_->started_recv_message_count_;
batch_.recv_message = true;
batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
batch_.payload->recv_message.call_failed_before_recv_message = nullptr;
GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
grpc_schedule_on_exec_ctx);
batch_.payload->recv_message.recv_message_ready =
Expand Down
6 changes: 6 additions & 0 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,8 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
s->call_failed_before_recv_message =
op_payload->recv_message.call_failed_before_recv_message;
if (s->id != 0) {
if (!s->read_closed) {
before = s->frame_storage.length +
Expand Down Expand Up @@ -1947,6 +1949,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
null_then_sched_closure(&s->recv_message_ready);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = nullptr;
if (s->call_failed_before_recv_message != nullptr) {
*s->call_failed_before_recv_message =
(s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
}
null_then_sched_closure(&s->recv_message_ready);
}
GRPC_ERROR_UNREF(error);
Expand Down
1 change: 1 addition & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ struct grpc_chttp2_stream {
grpc_closure* recv_initial_metadata_ready = nullptr;
bool* trailing_metadata_available = nullptr;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
bool* call_failed_before_recv_message = nullptr;
grpc_closure* recv_message_ready = nullptr;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions src/core/ext/transport/inproc/inproc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
if (s->recv_message_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
grpc_error_std_string(error).c_str());
if (s->recv_message_op->payload->recv_message
.call_failed_before_recv_message != nullptr) {
*s->recv_message_op->payload->recv_message
.call_failed_before_recv_message = true;
}
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
Expand Down
7 changes: 7 additions & 0 deletions src/core/lib/surface/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct grpc_call {
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;

grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
bool call_failed_before_recv_message = false;
grpc_byte_buffer** receiving_buffer = nullptr;
grpc_slice receiving_slice = grpc_empty_slice();
grpc_closure receiving_slice_ready;
Expand Down Expand Up @@ -1845,6 +1846,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
stream_op_payload->recv_message.call_failed_before_recv_message =
&call->call_failed_before_recv_message;
GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
receiving_stream_ready_in_call_combiner, bctl,
grpc_schedule_on_exec_ctx);
Expand Down Expand Up @@ -2014,6 +2017,10 @@ grpc_compression_algorithm grpc_call_compression_for_level(
return algo;
}

bool grpc_call_failed_before_recv_message(grpc_call* c) {
return c->call_failed_before_recv_message;
}

const char* grpc_call_error_to_string(grpc_call_error error) {
switch (error) {
case GRPC_CALL_ERROR:
Expand Down
6 changes: 6 additions & 0 deletions src/core/lib/surface/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ size_t grpc_call_get_initial_size_estimate();
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call* call, grpc_compression_level level);

/* Returns whether or not the call's receive message operation failed because of
* an error (as opposed to a graceful end-of-stream) */
/* TODO(markdroth): This is currently available only to the C++ API.
Move to surface API if requested by other languages. */
bool grpc_call_failed_before_recv_message(grpc_call* c);

extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;

Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ struct grpc_transport_stream_op_batch_payload {
// containing a received message.
// Will be NULL if trailing metadata is received instead of a message.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
// Was this recv_message failed for reasons other than a clean end-of-stream
bool* call_failed_before_recv_message = nullptr;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure* recv_message_ready = nullptr;
} recv_message;
Expand Down
10 changes: 9 additions & 1 deletion src/cpp/server/server_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <grpcpp/impl/codegen/server_context.h>

#include <algorithm>
#include <atomic>
#include <utility>

#include <grpc/compression.h>
Expand Down Expand Up @@ -324,10 +325,17 @@ void ServerContextBase::TryCancel() const {
}
}

void ServerContextBase::MaybeMarkCancelledOnRead() {
if (grpc_call_failed_before_recv_message(call_.call)) {
marked_cancelled_.store(true, std::memory_order_release);
}
}

bool ServerContextBase::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
return completion_op_->CheckCancelledAsync();
return marked_cancelled_.load(std::memory_order_acquire) ||
completion_op_->CheckCancelledAsync();
} else if (has_notify_when_done_tag_) {
// When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
Expand Down
3 changes: 3 additions & 0 deletions test/cpp/end2end/end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,9 @@ TEST_P(End2endTest, ClientCancelsBidi) {
ClientContext context;
std::string msg("hello");

// Send server_try_cancel value in the client metadata
context.AddMetadata(kClientTryCancelRequest, std::to_string(1));

auto stream = stub_->BidiStream(&context);

request.set_message(msg + "0");
Expand Down
5 changes: 5 additions & 0 deletions test/cpp/end2end/test_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ CallbackTestServiceImpl::BidiStream(
kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
server_write_last_ = internal::GetIntValueFromMetadata(
kServerFinishAfterNReads, ctx->client_metadata(), 0);
client_try_cancel_ = static_cast<bool>(internal::GetIntValueFromMetadata(
kClientTryCancelRequest, ctx->client_metadata(), 0));
if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
internal::ServerTryCancelNonblocking(ctx);
} else {
Expand Down Expand Up @@ -589,6 +591,8 @@ CallbackTestServiceImpl::BidiStream(
return;
}
}
} else if (client_try_cancel_) {
EXPECT_TRUE(ctx_->IsCancelled());
}

if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
Expand Down Expand Up @@ -629,6 +633,7 @@ CallbackTestServiceImpl::BidiStream(
bool finished_{false};
bool setup_done_{false};
std::thread finish_thread_;
bool client_try_cancel_ = false;
};

return new Reactor(context);
Expand Down
1 change: 1 addition & 0 deletions test/cpp/end2end/test_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace testing {
const int kServerDefaultResponseStreamsToSend = 3;
const char* const kServerResponseStreamsToSend = "server_responses_to_send";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kClientTryCancelRequest = "client_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
Expand Down
1 change: 1 addition & 0 deletions test/cpp/microbenchmarks/bm_chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
op.on_complete = do_nothing.get();
op.recv_message = true;
op.payload->recv_message.recv_message = &recv_stream;
op.payload->recv_message.call_failed_before_recv_message = nullptr;
op.payload->recv_message.recv_message_ready = drain_start.get();
s->Op(&op);
f.PushInput(grpc_slice_ref(incoming_data));
Expand Down

0 comments on commit 0f80378

Please sign in to comment.