diff --git a/BUILD b/BUILD index 1d32787521579..5fd5ff2905f8d 100644 --- a/BUILD +++ b/BUILD @@ -669,7 +669,10 @@ grpc_cc_library( "nofixdeps", ], visibility = ["@grpc:public"], - deps = ["grpc_public_hdrs"], + deps = [ + "gpr_atm", + "grpc_public_hdrs", + ], ) grpc_cc_library( @@ -701,6 +704,7 @@ grpc_cc_library( "@grpc:public", ], deps = [ + "gpr_atm", "grpc++_base", "slice", ], @@ -7395,6 +7399,7 @@ grpc_cc_library( "env", "error", "gpr", + "gpr_atm", "gpr_manual_constructor", "grpc", "grpc++_codegen_proto", @@ -7451,6 +7456,7 @@ grpc_cc_library( "channel_init", "config", "gpr", + "gpr_atm", "gpr_manual_constructor", "grpc_base", "grpc_health_upb", diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h index 1b5ddfe84047c..2be5d54903b1a 100644 --- a/include/grpcpp/alarm.h +++ b/include/grpcpp/alarm.h @@ -24,7 +24,7 @@ #include #include -#include +#include #include #include #include diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index 9283aa4818b69..90b983c93b224 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -22,9 +22,9 @@ #include #include +#include #include #include -#include #include #include #include diff --git a/include/grpcpp/completion_queue.h b/include/grpcpp/completion_queue.h index 42637b5927268..502e79ce5765b 100644 --- a/include/grpcpp/completion_queue.h +++ b/include/grpcpp/completion_queue.h @@ -16,9 +16,449 @@ * */ +/// A completion queue implements a concurrent producer-consumer queue, with +/// two main API-exposed methods: \a Next and \a AsyncNext. These +/// methods are the essential component of the gRPC C++ asynchronous API. +/// There is also a \a Shutdown method to indicate that a given completion queue +/// will no longer have regular events. This must be called before the +/// completion queue is destroyed. +/// All completion queue APIs are thread-safe and may be used concurrently with +/// any other completion queue API invocation; it is acceptable to have +/// multiple threads calling \a Next or \a AsyncNext on the same or different +/// completion queues, or to call these methods concurrently with a \a Shutdown +/// elsewhere. +/// \remark{All other API calls on completion queue should be completed before +/// a completion queue destructor is called.} #ifndef GRPCPP_COMPLETION_QUEUE_H #define GRPCPP_COMPLETION_QUEUE_H -#include // IWYU pragma: export +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +struct grpc_completion_queue; + +namespace grpc { +template +class ClientReader; +template +class ClientWriter; +template +class ClientReaderWriter; +template +class ServerReader; +template +class ServerWriter; +namespace internal { +template +class ServerReaderWriterBody; + +template +void UnaryRunHandlerHelper( + const grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, + grpc::Status&); +template +class RpcMethodHandler; +template +class ClientStreamingHandler; +template +class ServerStreamingHandler; +template +class TemplatedBidiStreamingHandler; +template +class ErrorMethodHandler; +} // namespace internal + +class Channel; +class ChannelInterface; +class Server; +class ServerBuilder; +class ServerContextBase; +class ServerInterface; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; +template +class BlockingUnaryCallImpl; +template +class CallOpSet; +} // namespace internal + +extern CoreCodegenInterface* g_core_codegen_interface; + +/// A thin wrapper around \ref grpc_completion_queue (see \ref +/// src/core/lib/surface/completion_queue.h). +/// See \ref doc/cpp/perf_notes.md for notes on best practices for high +/// performance servers. +class CompletionQueue : private grpc::GrpcLibraryCodegen { + public: + /// Default constructor. Implicitly creates a \a grpc_completion_queue + /// instance. + CompletionQueue() + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, + nullptr}) {} + + /// Wrap \a take, taking ownership of the instance. + /// + /// \param take The completion queue instance to wrap. Ownership is taken. + explicit CompletionQueue(grpc_completion_queue* take); + + /// Destructor. Destroys the owned wrapped completion queue / instance. + ~CompletionQueue() override { + grpc::g_core_codegen_interface->grpc_completion_queue_destroy(cq_); + } + + /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. + enum NextStatus { + SHUTDOWN, ///< The completion queue has been shutdown and fully-drained + GOT_EVENT, ///< Got a new event; \a tag will be filled in with its + ///< associated value; \a ok indicating its success. + TIMEOUT ///< deadline was reached. + }; + + /// Read from the queue, blocking until an event is available or the queue is + /// shutting down. + /// + /// \param[out] tag Updated to point to the read event's tag. + /// \param[out] ok true if read a successful event, false otherwise. + /// + /// Note that each tag sent to the completion queue (through RPC operations + /// or alarms) will be delivered out of the completion queue by a call to + /// Next (or a related method), regardless of whether the operation succeeded + /// or not. Success here means that this operation completed in the normal + /// valid manner. + /// + /// Server-side RPC request: \a ok indicates that the RPC has indeed + /// been started. If it is false, the server has been Shutdown + /// before this particular call got matched to an incoming RPC. + /// + /// Client-side StartCall/RPC invocation: \a ok indicates that the RPC is + /// going to go to the wire. If it is false, it not going to the wire. This + /// would happen if the channel is either permanently broken or + /// transiently broken but with the fail-fast option. (Note that async unary + /// RPCs don't post a CQ tag at this point, nor do client-streaming + /// or bidi-streaming RPCs that have the initial metadata corked option set.) + /// + /// Client-side Write, Client-side WritesDone, Server-side Write, + /// Server-side Finish, Server-side SendInitialMetadata (which is + /// typically included in Write or Finish when not done explicitly): + /// \a ok means that the data/metadata/status/etc is going to go to the + /// wire. If it is false, it not going to the wire because the call + /// is already dead (i.e., canceled, deadline expired, other side + /// dropped the channel, etc). + /// + /// Client-side Read, Server-side Read, Client-side + /// RecvInitialMetadata (which is typically included in Read if not + /// done explicitly): \a ok indicates whether there is a valid message + /// that got read. If not, you know that there are certainly no more + /// messages that can ever be read from this stream. For the client-side + /// operations, this only happens because the call is dead. For the + /// server-sider operation, though, this could happen because the client + /// has done a WritesDone already. + /// + /// Client-side Finish: \a ok should always be true + /// + /// Server-side AsyncNotifyWhenDone: \a ok should always be true + /// + /// Alarm: \a ok is true if it expired, false if it was canceled + /// + /// \return true if got an event, false if the queue is fully drained and + /// shut down. + bool Next(void** tag, bool* ok) { + // Check return type == GOT_EVENT... cases: + // SHUTDOWN - queue has been shutdown, return false. + // TIMEOUT - we passed infinity time => queue has been shutdown, return + // false. + // GOT_EVENT - we actually got an event, return true. + return (AsyncNextInternal(tag, ok, + grpc::g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) == GOT_EVENT); + } + + /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param[out] tag Upon success, updated to point to the event's tag. + /// \param[out] ok Upon success, true if a successful event, false otherwise + /// See documentation for CompletionQueue::Next for explanation of ok + /// \param[in] deadline How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { + grpc::TimePoint deadline_tp(deadline); + return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); + } + + /// EXPERIMENTAL + /// First executes \a F, then reads from the queue, blocking up to + /// \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param[in] f Function to execute before calling AsyncNext on this queue. + /// \param[out] tag Upon success, updated to point to the event's tag. + /// \param[out] ok Upon success, true if read a regular event, false + /// otherwise. + /// \param[in] deadline How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { + CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); + f(); + if (cache.Flush(tag, ok)) { + return GOT_EVENT; + } else { + return AsyncNext(tag, ok, deadline); + } + } + + /// Request the shutdown of the queue. + /// + /// \warning This method must be called at some point if this completion queue + /// is accessed with Next or AsyncNext. \a Next will not return false + /// until this method has been called and all pending tags have been drained. + /// (Likewise for \a AsyncNext returning \a NextStatus::SHUTDOWN .) + /// Only once either one of these methods does that (that is, once the queue + /// has been \em drained) can an instance of this class be destroyed. + /// Also note that applications must ensure that no work is enqueued on this + /// completion queue after this method is called. + void Shutdown(); + + /// Returns a \em raw pointer to the underlying \a grpc_completion_queue + /// instance. + /// + /// \warning Remember that the returned instance is owned. No transfer of + /// owership is performed. + grpc_completion_queue* cq() { return cq_; } + + protected: + /// Private constructor of CompletionQueue only visible to friend classes + explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) { + cq_ = grpc::g_core_codegen_interface->grpc_completion_queue_create( + grpc::g_core_codegen_interface->grpc_completion_queue_factory_lookup( + &attributes), + &attributes, nullptr); + InitialAvalanching(); // reserve this for the future shutdown + } + + private: + // Friends for access to server registration lists that enable checking and + // logging on shutdown + friend class grpc::ServerBuilder; + friend class grpc::Server; + + // Friend synchronous wrappers so that they can access Pluck(), which is + // a semi-private API geared towards the synchronous implementation. + template + friend class grpc::ClientReader; + template + friend class grpc::ClientWriter; + template + friend class grpc::ClientReaderWriter; + template + friend class grpc::ServerReader; + template + friend class grpc::ServerWriter; + template + friend class grpc::internal::ServerReaderWriterBody; + template + friend void grpc::internal::UnaryRunHandlerHelper( + const grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, + grpc::Status&); + template + friend class grpc::internal::ClientStreamingHandler; + template + friend class grpc::internal::ServerStreamingHandler; + template + friend class grpc::internal::TemplatedBidiStreamingHandler; + template + friend class grpc::internal::ErrorMethodHandler; + friend class grpc::ServerContextBase; + friend class grpc::ServerInterface; + template + friend class grpc::internal::BlockingUnaryCallImpl; + + // Friends that need access to constructor for callback CQ + friend class grpc::Channel; + + // For access to Register/CompleteAvalanching + template + friend class grpc::internal::CallOpSet; + + /// EXPERIMENTAL + /// Creates a Thread Local cache to store the first event + /// On this completion queue queued from this thread. Once + /// initialized, it must be flushed on the same thread. + class CompletionQueueTLSCache { + public: + explicit CompletionQueueTLSCache(CompletionQueue* cq); + ~CompletionQueueTLSCache(); + bool Flush(void** tag, bool* ok); + + private: + CompletionQueue* cq_; + bool flushed_; + }; + + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); + + /// Wraps \a grpc_completion_queue_pluck. + /// \warning Must not be mixed with calls to \a Next. + bool Pluck(grpc::internal::CompletionQueueTag* tag) { + auto deadline = + grpc::g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); + while (true) { + auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + bool ok = ev.success != 0; + void* ignored = tag; + if (tag->FinalizeResult(&ignored, &ok)) { + GPR_CODEGEN_ASSERT(ignored == tag); + return ok; + } + } + } + + /// Performs a single polling pluck on \a tag. + /// \warning Must not be mixed with calls to \a Next. + /// + /// TODO: sreek - This calls tag->FinalizeResult() even if the cq_ is already + /// shutdown. This is most likely a bug and if it is a bug, then change this + /// implementation to simple call the other TryPluck function with a zero + /// timeout. i.e: + /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) + void TryPluck(grpc::internal::CompletionQueueTag* tag) { + auto deadline = + grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); + auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT) return; + bool ok = ev.success != 0; + void* ignored = tag; + // the tag must be swallowed if using TryPluck + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } + + /// Performs a single polling pluck on \a tag. Calls tag->FinalizeResult if + /// the pluck() was successful and returned the tag. + /// + /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects + /// that the tag is internal not something that is returned to the user. + void TryPluck(grpc::internal::CompletionQueueTag* tag, + gpr_timespec deadline) { + auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { + return; + } + + bool ok = ev.success != 0; + void* ignored = tag; + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } + + /// Manage state of avalanching operations : completion queue tags that + /// trigger other completion queue operations. The underlying core completion + /// queue should not really shutdown until all avalanching operations have + /// been finalized. Note that we maintain the requirement that an avalanche + /// registration must take place before CQ shutdown (which must be maintained + /// elsehwere) + void InitialAvalanching() { + gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); + } + void RegisterAvalanching() { + gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(1)); + } + void CompleteAvalanching() { + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(-1)) == 1) { + grpc::g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); + } + } + + void RegisterServer(const grpc::Server* server) { + (void)server; +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + server_list_.push_back(server); +#endif + } + void UnregisterServer(const grpc::Server* server) { + (void)server; +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + server_list_.remove(server); +#endif + } + bool ServerListEmpty() const { +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + return server_list_.empty(); +#endif + return true; + } + + static CompletionQueue* CallbackAlternativeCQ(); + static void ReleaseCallbackAlternativeCQ(CompletionQueue* cq); + + grpc_completion_queue* cq_; // owned + + gpr_atm avalanches_in_flight_; + + // List of servers associated with this CQ. Even though this is only used with + // NDEBUG, instantiate it in all cases since otherwise the size will be + // inconsistent. + mutable grpc::internal::Mutex server_list_mutex_; + std::list + server_list_ /* GUARDED_BY(server_list_mutex_) */; +}; + +/// A specific type of completion queue used by the processing of notifications +/// by servers. Instantiated by \a ServerBuilder or Server (for health checker). +class ServerCompletionQueue : public CompletionQueue { + public: + bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } + + protected: + /// Default constructor + ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} + + private: + /// \param completion_type indicates whether this is a NEXT or CALLBACK + /// completion queue. + /// \param polling_type Informs the GRPC library about the type of polling + /// allowed on this completion queue. See grpc_cq_polling_type's description + /// in grpc_types.h for more details. + /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues + ServerCompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, + grpc_completion_queue_functor* shutdown_cb) + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, + shutdown_cb}), + polling_type_(polling_type) {} + + grpc_cq_polling_type polling_type_; + friend class grpc::ServerBuilder; + friend class grpc::Server; +}; + +} // namespace grpc #endif // GRPCPP_COMPLETION_QUEUE_H diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 33786f6e656a9..67decf636f818 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -16,451 +16,12 @@ * */ -/// A completion queue implements a concurrent producer-consumer queue, with -/// two main API-exposed methods: \a Next and \a AsyncNext. These -/// methods are the essential component of the gRPC C++ asynchronous API. -/// There is also a \a Shutdown method to indicate that a given completion queue -/// will no longer have regular events. This must be called before the -/// completion queue is destroyed. -/// All completion queue APIs are thread-safe and may be used concurrently with -/// any other completion queue API invocation; it is acceptable to have -/// multiple threads calling \a Next or \a AsyncNext on the same or different -/// completion queues, or to call these methods concurrently with a \a Shutdown -/// elsewhere. -/// \remark{All other API calls on completion queue should be completed before -/// a completion queue destructor is called.} #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H -// IWYU pragma: private, include +// IWYU pragma: private -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -struct grpc_completion_queue; - -namespace grpc { -template -class ClientReader; -template -class ClientWriter; -template -class ClientReaderWriter; -template -class ServerReader; -template -class ServerWriter; -namespace internal { -template -class ServerReaderWriterBody; - -template -void UnaryRunHandlerHelper( - const grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, - grpc::Status&); -template -class RpcMethodHandler; -template -class ClientStreamingHandler; -template -class ServerStreamingHandler; -template -class TemplatedBidiStreamingHandler; -template -class ErrorMethodHandler; -} // namespace internal - -class Channel; -class ChannelInterface; -class Server; -class ServerBuilder; -class ServerContextBase; -class ServerInterface; - -namespace internal { -class CompletionQueueTag; -class RpcMethod; -template -class BlockingUnaryCallImpl; -template -class CallOpSet; -} // namespace internal - -extern CoreCodegenInterface* g_core_codegen_interface; - -/// A thin wrapper around \ref grpc_completion_queue (see \ref -/// src/core/lib/surface/completion_queue.h). -/// See \ref doc/cpp/perf_notes.md for notes on best practices for high -/// performance servers. -class CompletionQueue : private grpc::GrpcLibraryCodegen { - public: - /// Default constructor. Implicitly creates a \a grpc_completion_queue - /// instance. - CompletionQueue() - : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, - nullptr}) {} - - /// Wrap \a take, taking ownership of the instance. - /// - /// \param take The completion queue instance to wrap. Ownership is taken. - explicit CompletionQueue(grpc_completion_queue* take); - - /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue() override { - grpc::g_core_codegen_interface->grpc_completion_queue_destroy(cq_); - } - - /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. - enum NextStatus { - SHUTDOWN, ///< The completion queue has been shutdown and fully-drained - GOT_EVENT, ///< Got a new event; \a tag will be filled in with its - ///< associated value; \a ok indicating its success. - TIMEOUT ///< deadline was reached. - }; - - /// Read from the queue, blocking until an event is available or the queue is - /// shutting down. - /// - /// \param[out] tag Updated to point to the read event's tag. - /// \param[out] ok true if read a successful event, false otherwise. - /// - /// Note that each tag sent to the completion queue (through RPC operations - /// or alarms) will be delivered out of the completion queue by a call to - /// Next (or a related method), regardless of whether the operation succeeded - /// or not. Success here means that this operation completed in the normal - /// valid manner. - /// - /// Server-side RPC request: \a ok indicates that the RPC has indeed - /// been started. If it is false, the server has been Shutdown - /// before this particular call got matched to an incoming RPC. - /// - /// Client-side StartCall/RPC invocation: \a ok indicates that the RPC is - /// going to go to the wire. If it is false, it not going to the wire. This - /// would happen if the channel is either permanently broken or - /// transiently broken but with the fail-fast option. (Note that async unary - /// RPCs don't post a CQ tag at this point, nor do client-streaming - /// or bidi-streaming RPCs that have the initial metadata corked option set.) - /// - /// Client-side Write, Client-side WritesDone, Server-side Write, - /// Server-side Finish, Server-side SendInitialMetadata (which is - /// typically included in Write or Finish when not done explicitly): - /// \a ok means that the data/metadata/status/etc is going to go to the - /// wire. If it is false, it not going to the wire because the call - /// is already dead (i.e., canceled, deadline expired, other side - /// dropped the channel, etc). - /// - /// Client-side Read, Server-side Read, Client-side - /// RecvInitialMetadata (which is typically included in Read if not - /// done explicitly): \a ok indicates whether there is a valid message - /// that got read. If not, you know that there are certainly no more - /// messages that can ever be read from this stream. For the client-side - /// operations, this only happens because the call is dead. For the - /// server-sider operation, though, this could happen because the client - /// has done a WritesDone already. - /// - /// Client-side Finish: \a ok should always be true - /// - /// Server-side AsyncNotifyWhenDone: \a ok should always be true - /// - /// Alarm: \a ok is true if it expired, false if it was canceled - /// - /// \return true if got an event, false if the queue is fully drained and - /// shut down. - bool Next(void** tag, bool* ok) { - // Check return type == GOT_EVENT... cases: - // SHUTDOWN - queue has been shutdown, return false. - // TIMEOUT - we passed infinity time => queue has been shutdown, return - // false. - // GOT_EVENT - we actually got an event, return true. - return (AsyncNextInternal(tag, ok, - grpc::g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) == GOT_EVENT); - } - - /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). - /// Both \a tag and \a ok are updated upon success (if an event is available - /// within the \a deadline). A \a tag points to an arbitrary location usually - /// employed to uniquely identify an event. - /// - /// \param[out] tag Upon success, updated to point to the event's tag. - /// \param[out] ok Upon success, true if a successful event, false otherwise - /// See documentation for CompletionQueue::Next for explanation of ok - /// \param[in] deadline How long to block in wait for an event. - /// - /// \return The type of event read. - template - NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { - grpc::TimePoint deadline_tp(deadline); - return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); - } - - /// EXPERIMENTAL - /// First executes \a F, then reads from the queue, blocking up to - /// \a deadline (or the queue's shutdown). - /// Both \a tag and \a ok are updated upon success (if an event is available - /// within the \a deadline). A \a tag points to an arbitrary location usually - /// employed to uniquely identify an event. - /// - /// \param[in] f Function to execute before calling AsyncNext on this queue. - /// \param[out] tag Upon success, updated to point to the event's tag. - /// \param[out] ok Upon success, true if read a regular event, false - /// otherwise. - /// \param[in] deadline How long to block in wait for an event. - /// - /// \return The type of event read. - template - NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { - CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); - f(); - if (cache.Flush(tag, ok)) { - return GOT_EVENT; - } else { - return AsyncNext(tag, ok, deadline); - } - } - - /// Request the shutdown of the queue. - /// - /// \warning This method must be called at some point if this completion queue - /// is accessed with Next or AsyncNext. \a Next will not return false - /// until this method has been called and all pending tags have been drained. - /// (Likewise for \a AsyncNext returning \a NextStatus::SHUTDOWN .) - /// Only once either one of these methods does that (that is, once the queue - /// has been \em drained) can an instance of this class be destroyed. - /// Also note that applications must ensure that no work is enqueued on this - /// completion queue after this method is called. - void Shutdown(); - - /// Returns a \em raw pointer to the underlying \a grpc_completion_queue - /// instance. - /// - /// \warning Remember that the returned instance is owned. No transfer of - /// owership is performed. - grpc_completion_queue* cq() { return cq_; } - - protected: - /// Private constructor of CompletionQueue only visible to friend classes - explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) { - cq_ = grpc::g_core_codegen_interface->grpc_completion_queue_create( - grpc::g_core_codegen_interface->grpc_completion_queue_factory_lookup( - &attributes), - &attributes, nullptr); - InitialAvalanching(); // reserve this for the future shutdown - } - - private: - // Friends for access to server registration lists that enable checking and - // logging on shutdown - friend class grpc::ServerBuilder; - friend class grpc::Server; - - // Friend synchronous wrappers so that they can access Pluck(), which is - // a semi-private API geared towards the synchronous implementation. - template - friend class grpc::ClientReader; - template - friend class grpc::ClientWriter; - template - friend class grpc::ClientReaderWriter; - template - friend class grpc::ServerReader; - template - friend class grpc::ServerWriter; - template - friend class grpc::internal::ServerReaderWriterBody; - template - friend void grpc::internal::UnaryRunHandlerHelper( - const grpc::internal::MethodHandler::HandlerParameter&, ResponseType*, - grpc::Status&); - template - friend class grpc::internal::ClientStreamingHandler; - template - friend class grpc::internal::ServerStreamingHandler; - template - friend class grpc::internal::TemplatedBidiStreamingHandler; - template - friend class grpc::internal::ErrorMethodHandler; - friend class grpc::ServerContextBase; - friend class grpc::ServerInterface; - template - friend class grpc::internal::BlockingUnaryCallImpl; - - // Friends that need access to constructor for callback CQ - friend class grpc::Channel; - - // For access to Register/CompleteAvalanching - template - friend class grpc::internal::CallOpSet; - - /// EXPERIMENTAL - /// Creates a Thread Local cache to store the first event - /// On this completion queue queued from this thread. Once - /// initialized, it must be flushed on the same thread. - class CompletionQueueTLSCache { - public: - explicit CompletionQueueTLSCache(CompletionQueue* cq); - ~CompletionQueueTLSCache(); - bool Flush(void** tag, bool* ok); - - private: - CompletionQueue* cq_; - bool flushed_; - }; - - NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); - - /// Wraps \a grpc_completion_queue_pluck. - /// \warning Must not be mixed with calls to \a Next. - bool Pluck(grpc::internal::CompletionQueueTag* tag) { - auto deadline = - grpc::g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); - while (true) { - auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - if (tag->FinalizeResult(&ignored, &ok)) { - GPR_CODEGEN_ASSERT(ignored == tag); - return ok; - } - } - } - - /// Performs a single polling pluck on \a tag. - /// \warning Must not be mixed with calls to \a Next. - /// - /// TODO: sreek - This calls tag->FinalizeResult() even if the cq_ is already - /// shutdown. This is most likely a bug and if it is a bug, then change this - /// implementation to simple call the other TryPluck function with a zero - /// timeout. i.e: - /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(grpc::internal::CompletionQueueTag* tag) { - auto deadline = - grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); - } - - /// Performs a single polling pluck on \a tag. Calls tag->FinalizeResult if - /// the pluck() was successful and returned the tag. - /// - /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects - /// that the tag is internal not something that is returned to the user. - void TryPluck(grpc::internal::CompletionQueueTag* tag, - gpr_timespec deadline) { - auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { - return; - } - - bool ok = ev.success != 0; - void* ignored = tag; - GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); - } - - /// Manage state of avalanching operations : completion queue tags that - /// trigger other completion queue operations. The underlying core completion - /// queue should not really shutdown until all avalanching operations have - /// been finalized. Note that we maintain the requirement that an avalanche - /// registration must take place before CQ shutdown (which must be maintained - /// elsehwere) - void InitialAvalanching() { - gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); - } - void RegisterAvalanching() { - gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(1)); - } - void CompleteAvalanching() { - if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(-1)) == 1) { - grpc::g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); - } - } - - void RegisterServer(const grpc::Server* server) { - (void)server; -#ifndef NDEBUG - grpc::internal::MutexLock l(&server_list_mutex_); - server_list_.push_back(server); -#endif - } - void UnregisterServer(const grpc::Server* server) { - (void)server; -#ifndef NDEBUG - grpc::internal::MutexLock l(&server_list_mutex_); - server_list_.remove(server); -#endif - } - bool ServerListEmpty() const { -#ifndef NDEBUG - grpc::internal::MutexLock l(&server_list_mutex_); - return server_list_.empty(); -#endif - return true; - } - - static CompletionQueue* CallbackAlternativeCQ(); - static void ReleaseCallbackAlternativeCQ(CompletionQueue* cq); - - grpc_completion_queue* cq_; // owned - - gpr_atm avalanches_in_flight_; - - // List of servers associated with this CQ. Even though this is only used with - // NDEBUG, instantiate it in all cases since otherwise the size will be - // inconsistent. - mutable grpc::internal::Mutex server_list_mutex_; - std::list - server_list_ /* GUARDED_BY(server_list_mutex_) */; -}; - -/// A specific type of completion queue used by the processing of notifications -/// by servers. Instantiated by \a ServerBuilder or Server (for health checker). -class ServerCompletionQueue : public CompletionQueue { - public: - bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } - - protected: - /// Default constructor - ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} - - private: - /// \param completion_type indicates whether this is a NEXT or CALLBACK - /// completion queue. - /// \param polling_type Informs the GRPC library about the type of polling - /// allowed on this completion queue. See grpc_cq_polling_type's description - /// in grpc_types.h for more details. - /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues - ServerCompletionQueue(grpc_cq_completion_type completion_type, - grpc_cq_polling_type polling_type, - grpc_completion_queue_functor* shutdown_cb) - : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, - shutdown_cb}), - polling_type_(polling_type) {} - - grpc_cq_polling_type polling_type_; - friend class grpc::ServerBuilder; - friend class grpc::Server; -}; - -} // namespace grpc +/// TODO(chengyuc): Remove this file after solving compatibility. +#include #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 2264585c51875..fe1a458c5ae27 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index a524bf8124e68..e02c775d77d75 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -142,7 +142,7 @@ std::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/support/async_unary_call.h", "grpcpp/support/client_callback.h", "grpcpp/client_context.h", - "grpcpp/impl/codegen/completion_queue.h", + "grpcpp/completion_queue.h", "grpcpp/support/message_allocator.h", "grpcpp/support/method_handler.h", "grpcpp/impl/codegen/proto_utils.h", diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 85a8c0e7bfb57..1c3ef153e4e44 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -32,7 +32,7 @@ #include #include #include -#include +#include #include #include #include