Skip to content

Commit

Permalink
Refactor uv/non-uv code in Node extension
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Nov 1, 2016
1 parent fdbf733 commit 013d203
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 19 deletions.
4 changes: 2 additions & 2 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,8 @@
"src/node/ext/call_credentials.cc",
"src/node/ext/channel.cc",
"src/node/ext/channel_credentials.cc",
"src/node/ext/completion_queue.cc",
"src/node/ext/completion_queue_async_worker.cc",
"src/node/ext/completion_queue_threadpool.cc",
"src/node/ext/completion_queue_uv.cc",
"src/node/ext/node_grpc.cc",
"src/node/ext/server.cc",
"src/node/ext/server_credentials.cc",
Expand Down
5 changes: 2 additions & 3 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3710,7 +3710,6 @@ node_modules:
- src/node/ext/channel.h
- src/node/ext/channel_credentials.h
- src/node/ext/completion_queue.h
- src/node/ext/completion_queue_async_worker.h
- src/node/ext/server.h
- src/node/ext/server_credentials.h
- src/node/ext/timeval.h
Expand All @@ -3729,8 +3728,8 @@ node_modules:
- src/node/ext/call_credentials.cc
- src/node/ext/channel.cc
- src/node/ext/channel_credentials.cc
- src/node/ext/completion_queue.cc
- src/node/ext/completion_queue_async_worker.cc
- src/node/ext/completion_queue_threadpool.cc
- src/node/ext/completion_queue_uv.cc
- src/node/ext/node_grpc.cc
- src/node/ext/server.cc
- src/node/ext/server_credentials.cc
Expand Down
1 change: 1 addition & 0 deletions src/node/ext/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/

#include <v8.h>
#include <grpc/grpc.h>

namespace grpc {
namespace node {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,63 @@
*
*/

/* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV

#include <node.h>
#include <nan.h>

#include "grpc/grpc.h"
#include "grpc/support/log.h"
#include "grpc/support/time.h"
#include "completion_queue_async_worker.h"
#include "completion_queue.h"
#include "call.h"

namespace grpc {
namespace node {

namespace {

/* A worker that asynchronously calls completion_queue_next, and queues onto the
node event loop a call to the function stored in the event's tag. */
class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
public:
CompletionQueueAsyncWorker();

~CompletionQueueAsyncWorker();
/* Calls completion_queue_next with the provided deadline, and stores the
event if there was one or sets an error message if there was not */
void Execute();

/* Returns the completion queue attached to this class */
static grpc_completion_queue *GetQueue();

/* Convenience function to create a worker with the given arguments and queue
it to run asynchronously */
static void Next();

/* Initialize the CompletionQueueAsyncWorker class */
static void Init(v8::Local<v8::Object> exports);

protected:
/* Called when Execute has succeeded (completed without setting an error
message). Calls the saved callback with the event that came from
completion_queue_next */
void HandleOKCallback();

void HandleErrorCallback();

private:
grpc_event result;

static grpc_completion_queue *queue;

// Number of grpc_completion_queue_next calls in the thread pool
static int current_threads;
// Number of grpc_completion_queue_next calls waiting to enter the thread pool
static int waiting_next_calls;
};

const int max_queue_threads = 2;

using v8::Function;
Expand Down Expand Up @@ -137,5 +182,21 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
DestroyTag(result.tag);
}

} // namespace

grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue();
}

void CompletionQueueNext() {
CompletionQueueAsyncWorker::Next();
}

void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports);
}

} // namespace node
} // namespace grpc

#endif /* GRPC_UV */
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
*
*/

#ifdef GRPC_UV

#include <uv.h>
#include <node.h>
#include <v8.h>
#include <grpc/grpc.h>

#include "call.h"
#include "completion_queue.h"
#include "completion_queue_async_worker.h"

namespace grpc {
namespace node {
Expand Down Expand Up @@ -81,34 +82,24 @@ void drain_completion_queue(uv_prepare_t *handle) {
}

grpc_completion_queue *GetCompletionQueue() {
#ifdef GRPC_UV
return queue;
#else
return CompletionQueueAsyncWorker::GetQueue();
#endif
}

void CompletionQueueNext() {
#ifdef GRPC_UV
if (pending_batches == 0) {
GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
uv_prepare_start(&prepare, drain_completion_queue);
}
pending_batches++;
#else
CompletionQueueAsyncWorker::Next();
#endif
}

void CompletionQueueInit(Local<Object> exports) {
#ifdef GRPC_UV
queue = grpc_completion_queue_create(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
#else
CompletionQueueAsyncWorker::Init(exports);
#endif
}

} // namespace node
} // namespace grpc

#endif /* GRPC_UV */

0 comments on commit 013d203

Please sign in to comment.