Skip to content

Commit

Permalink
Use zero-copy message initialization for outgoing messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
rolftimmermans authored and rgbkrk committed Sep 19, 2017
1 parent 3ecb07b commit 5d10c38
Showing 1 changed file with 75 additions and 136 deletions.
211 changes: 75 additions & 136 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,9 @@ namespace zmq {
#endif

class IncomingMessage;
class OutgoingMessage;
static NAN_METHOD(Recv);
static NAN_METHOD(Readv);
class OutgoingMessage;
static NAN_METHOD(Send);
static NAN_METHOD(Sendv);
void Close();
static NAN_METHOD(Close);
Expand Down Expand Up @@ -183,6 +182,12 @@ namespace zmq {

static NAN_MODULE_INIT(Initialize);

static void
on_uv_close(uv_handle_t *handle)
{
delete handle;
}

/*
* Helpers for dealing with ØMQ errors.
*/
Expand All @@ -197,7 +202,6 @@ namespace zmq {
return Nan::Error(ErrorMessage());
}


/*
* Context methods.
*/
Expand Down Expand Up @@ -318,7 +322,6 @@ namespace zmq {
Nan::SetPrototypeMethod(t, "unref", DetachFromEventLoop);
Nan::SetPrototypeMethod(t, "recv", Recv);
Nan::SetPrototypeMethod(t, "readv", Readv);
Nan::SetPrototypeMethod(t, "send", Send);
Nan::SetPrototypeMethod(t, "sendv", Sendv);
Nan::SetPrototypeMethod(t, "close", Close);

Expand Down Expand Up @@ -1019,6 +1022,61 @@ namespace zmq {
MessageReference* msgref_;
};

class Socket::OutgoingMessage {
public:
inline OutgoingMessage(Local<Object> buf) {
bufref_ = new BufferReference(buf);
if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf),
BufferReference::FreeCallback, bufref_) < 0) {
delete bufref_;
Nan::ThrowError(ErrorMessage());
}
};

inline ~OutgoingMessage() {
if (zmq_msg_close(&msg_) < 0)
Nan::ThrowError(ErrorMessage());
};

inline operator zmq_msg_t*() {
return &msg_;
}

private:
class BufferReference {
public:
inline BufferReference(Local<Object> buf) {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, Destroy);
async->data = this;
persistent.Reset(buf);
}

inline ~BufferReference() {
persistent.Reset();
}

// Called by zmq when the message has been sent.
// NOTE: May be called from a worker thread. Do not modify V8/Node.
static void FreeCallback(void* data, void* bufref) {
uv_async_send(static_cast<BufferReference*>(bufref)->async);
}

static void Destroy(uv_async_t* async) {
uv_close(reinterpret_cast<uv_handle_t*>(async), on_uv_close);
BufferReference* bufref = static_cast<BufferReference*>(async->data);
delete bufref;
}
private:
Nan::Persistent<Object> persistent;
uv_async_t* async;
};

zmq_msg_t msg_;
BufferReference* bufref_;
};


#if ZMQ_CAN_MONITOR
NAN_METHOD(Socket::Monitor) {
int64_t timer_interval = 10; // default to 10ms interval
Expand Down Expand Up @@ -1193,65 +1251,6 @@ namespace zmq {
info.GetReturnValue().Set(msg.GetBuffer());
}

/*
* An object that creates a ØMQ message from the given Buffer Object,
* and manages the reference to it using RAII. A persistent V8 handle
* for the Buffer object will remain while its data is in use by ØMQ.
*/

class Socket::OutgoingMessage {
public:
inline OutgoingMessage(Local<Object> buf) {
bufref_ = new BufferReference(buf);
if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf),
BufferReference::FreeCallback, bufref_) < 0) {
delete bufref_;
Nan::ThrowError(ErrorMessage());
}
};

inline ~OutgoingMessage() {
if (zmq_msg_close(&msg_) < 0)
Nan::ThrowError(ErrorMessage());
};

inline operator zmq_msg_t*() {
return &msg_;
}

private:
class BufferReference {
public:
inline BufferReference(Local<Object> buf) {
loop = uv_default_loop();
uv_async_init(loop, &async, reinterpret_cast<uv_async_cb>(cleanup));
async.data = this;
persistent.Reset(buf);
}

inline ~BufferReference() {
persistent.Reset();
}

// Called by zmq when the message has been sent.
// NOTE: May be called from a worker thread. Do not modify V8/Node.
static void FreeCallback(void* data, void* message) {
uv_async_send(&static_cast<BufferReference *>(message)->async);
}

static void cleanup(uv_async_t *handle, int status) {
delete static_cast<BufferReference *>(handle->data);
}
private:
Nan::Persistent<Object> persistent;
uv_async_t async;
uv_loop_t *loop;
};

zmq_msg_t msg_;
BufferReference* bufref_;
};

NAN_METHOD(Socket::Sendv) {
Socket* socket = GetSocket(info);
if (socket->state_ != STATE_READY)
Expand All @@ -1262,8 +1261,6 @@ namespace zmq {
bool checkPollOut = true;
bool readsReady = false;

int rc;

Local<Array> batch = info[0].As<Array>();
size_t len = batch->Length();

Expand Down Expand Up @@ -1296,25 +1293,32 @@ namespace zmq {
Local<Number> flagsObj = batch->Get(i + 1).As<Number>();

int flags = Nan::To<int>(flagsObj).FromJust();
size_t len = Buffer::Length(buf);

#if 1
/* Non-copying implementation. */
OutgoingMessage msg_p(buf);
#else
/* Copying implementation. */
zmq_msg_t msg;
size_t len = Buffer::Length(buf);
rc = zmq_msg_init_size(&msg, len);
if (rc != 0)
return Nan::ThrowError(ErrorMessage());
char * cp = static_cast<char *>(zmq_msg_data(&msg));
const char * dat = Buffer::Data(buf);
char* cp = static_cast<char *>(zmq_msg_data(&msg));
const char* dat = Buffer::Data(buf);
std::copy(dat, dat + len, cp);
zmq_msg_t* msg_p = &msg;
#endif

while (true) {
int rc;
#if ZMQ_VERSION_MAJOR == 2
rc = zmq_send(socket->socket_, &msg, flags);
rc = zmq_send(socket->socket_, msg_p, flags);
#elif ZMQ_VERSION_MAJOR == 3
rc = zmq_sendmsg(socket->socket_, &msg, flags);
rc = zmq_sendmsg(socket->socket_, msg_p, flags);
#else
rc = zmq_msg_send(&msg, socket->socket_, flags);
rc = zmq_msg_send(msg_p, socket->socket_, flags);
checkPollOut = false;
#endif
if (rc < 0){
Expand Down Expand Up @@ -1343,71 +1347,6 @@ namespace zmq {
return info.GetReturnValue().Set(true);
}

// WARNING: the buffer passed here will be kept alive
// until zmq_send completes, possibly on another thread.
// Do not modify or reuse any buffer passed to send.
// This is bad, but allows us to send without copying.
NAN_METHOD(Socket::Send) {

int argc = info.Length();
if (argc != 1 && argc != 2)
return Nan::ThrowTypeError("Must pass a Buffer and optionally flags");
if (!Buffer::HasInstance(info[0]))
return Nan::ThrowTypeError("First argument should be a Buffer");
int flags = 0;
if (argc == 2) {
if (!info[1]->IsNumber())
return Nan::ThrowTypeError("Second argument should be an integer");
flags = Nan::To<int>(info[1]).FromJust();
}

GET_SOCKET(info);

#if 0 // zero-copy version, but doesn't properly pin buffer and so has GC issues
OutgoingMessage msg(info[0].As<Object>());
if (zmq_send(socket->socket_, msg, flags) < 0)
return Nan::ThrowError(ErrorMessage());
#else // copying version that has no GC issues
zmq_msg_t msg;
Local<Object> buf = info[0].As<Object>();
size_t len = Buffer::Length(buf);
int res = zmq_msg_init_size(&msg, len);
if (res != 0)
return Nan::ThrowError(ErrorMessage());

char * cp = static_cast<char *>(zmq_msg_data(&msg));
const char * dat = Buffer::Data(buf);
std::copy(dat, dat + len, cp);
while (true) {
int rc;
#if ZMQ_VERSION_MAJOR == 2
rc = zmq_send(socket->socket_, &msg, flags);
#elif ZMQ_VERSION_MAJOR == 3
rc = zmq_sendmsg(socket->socket_, &msg, flags);
#else
rc = zmq_msg_send(&msg, socket->socket_, flags);
#endif
if (rc < 0){
if (zmq_errno()==EINTR) {
continue;
}
return Nan::ThrowError(ErrorMessage());
} else {
break;
}
}
#endif // zero copy / copying version

return;
}


static void
on_uv_close(uv_handle_t *handle)
{
delete handle;
}

void
Socket::Close() {
if (socket_) {
Expand Down

0 comments on commit 5d10c38

Please sign in to comment.