Skip to content

Commit

Permalink
Refactor delayed promise resolving.
Browse files Browse the repository at this point in the history
  • Loading branch information
rolftimmermans committed Nov 13, 2019
1 parent e808574 commit 2f0894b
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 106 deletions.
13 changes: 12 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ dist: bionic

jobs:
include:

## TEST STAGE

# Test main OSes on Node 10.x branch.
- os: linux
node_js: "10.16"
env: ZMQ_DRAFT=true INCLUDE_COMPAT_TESTS=true
Expand All @@ -16,6 +16,7 @@ jobs:
sudo: required

- os: osx
osx_image: xcode10
env: ZMQ_DRAFT=true
node_js: "10.16"

Expand All @@ -36,6 +37,7 @@ jobs:
addons: {apt: {packages: libzmq3-dev}}

- os: osx
osx_image: xcode10
node_js: "10.16"
env: ZMQ_SHARED=true
addons: {homebrew: {packages: zeromq, update: true}}
Expand All @@ -56,6 +58,14 @@ jobs:
# Skip GC tests due to https://github.com/node-ffi-napi/weak-napi/issues/16
env: ZMQ_DRAFT=true SKIP_GC_TESTS=true

## ADDITIONAL TESTS

# This test ensures the delayed resolution of read/write promises is correct
# by disabling immediate resolution (which happens 99% of the time) entirely.
- os: linux
node_js: "10.16"
env: ZMQ_NO_SYNC_RESOLVE=true ZMQ_DRAFT=true INCLUDE_COMPAT_TESTS=true NODE_NO_WARNINGS=1

## PREBUILD STAGE

- stage: prebuild
Expand Down Expand Up @@ -86,6 +96,7 @@ jobs:

- stage: prebuild
os: osx
osx_image: xcode10
node_js: "10.16"
script: script/ci/prebuild.sh

Expand Down
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
### Unreleased
### v6.0.0-beta.4

* Break out of busy loops automatically when the number of synchronous I/O operations moves beyond a built-in threshold. This avoids the ZeroMQ background I/O process(es) starving the Node.js event loop when it can process messages faster than the application, potentially causing decreased responsiveness and/or high memory usage.
* Break out of busy loops automatically when the number of synchronous I/O operations moves beyond a built-in threshold. This avoids the ZeroMQ background I/O process(es) starving the Node.js event loop when it can process messages faster than the application. This could have caused decreased responsiveness and/or high memory usage. This only happens when sending/receiving messages as quickly as possible, such as in a benchmark or in test code.

* Fixed a memory leak in socket construction that would manifest itself when repeatedly creating many sockets.

### v6.0.0-beta.3

Expand Down
21 changes: 17 additions & 4 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'variables': {
'zmq_shared%': 'false',
'zmq_draft%': 'false',
'zmq_no_sync_resolve%': 'false',
},

'targets': [
Expand Down Expand Up @@ -52,6 +53,12 @@
],
}],

["zmq_no_sync_resolve == 'true'", {
'defines': [
'ZMQ_NO_SYNC_RESOLVE',
],
}],

["zmq_shared == 'true'", {
'link_settings': {
'libraries': ['-lzmq'],
Expand Down Expand Up @@ -85,7 +92,7 @@
'-std=gnu++1y'
],
'cflags_cc+': [
'-std=c++14',
'-std=c++17',
'-Wno-missing-field-initializers',
],
}],
Expand All @@ -94,7 +101,7 @@
'xcode_settings': {
# https://pewpewthespells.com/blog/buildsettings.html
'CLANG_CXX_LIBRARY': 'libc++',
'CLANG_CXX_LANGUAGE_STANDARD': 'c++14',
'CLANG_CXX_LANGUAGE_STANDARD': 'c++17',
'MACOSX_DEPLOYMENT_TARGET': '10.9',
'WARNING_CFLAGS': [
'-Wextra',
Expand All @@ -112,6 +119,9 @@
# 2 - MultiThreadedDLL (/MD)
# 3 - MultiThreadedDebugDLL (/MDd)
'RuntimeLibrary': 3,
'AdditionalOptions': [
'-std:c++17',
],
},
},
}],
Expand All @@ -126,7 +136,7 @@
'-std=gnu++1y'
],
'cflags_cc+': [
'-std=c++14',
'-std=c++17',
'-flto',
'-Wno-missing-field-initializers',
],
Expand All @@ -136,7 +146,7 @@
# https://pewpewthespells.com/blog/buildsettings.html
'xcode_settings': {
'CLANG_CXX_LIBRARY': 'libc++',
'CLANG_CXX_LANGUAGE_STANDARD': 'c++14',
'CLANG_CXX_LANGUAGE_STANDARD': 'c++17',
'MACOSX_DEPLOYMENT_TARGET': '10.9',
'LLVM_LTO': 'YES',
'GCC_OPTIMIZATION_LEVEL': '3',
Expand All @@ -154,6 +164,9 @@
# 2 - MultiThreadedDLL (/MD)
# 3 - MultiThreadedDebugDLL (/MDd)
'RuntimeLibrary': 2,
'AdditionalOptions': [
'-std:c++17',
],
},
'VCLinkerTool': {
'AdditionalOptions': ['/ignore:4099'],
Expand Down
2 changes: 1 addition & 1 deletion script/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ else
echo > "${SRC_DIR}/builds/cmake/Modules/ClangFormat.cmake"
fi

cmake -G "${CMAKE_GENERATOR}" "${BUILD_OPTIONS}" -DCMAKE_INSTALL_PREFIX="${PATH_PREFIX}" -DCMAKE_INSTALL_LIBDIR=lib -DBUILD_STATIC=ON -DBUILD_TESTS=OFF -DBUILD_SHARED=OFF "${SRC_DIR}"
cmake -G "${CMAKE_GENERATOR}" "${BUILD_OPTIONS}" -DCMAKE_INSTALL_PREFIX="${PATH_PREFIX}" -DCMAKE_INSTALL_LIBDIR=lib -DBUILD_STATIC=ON -DBUILD_TESTS=OFF -DBUILD_SHARED=OFF -DWITH_DOCS=OFF "${SRC_DIR}"

if [ -n "${WINDIR}" ]; then
cmake --build . --config Release --target install -- -verbosity:Minimal -maxcpucount
Expand Down
4 changes: 4 additions & 0 deletions script/ci/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ if [ -n "${ZMQ_DRAFT}" ]; then
export npm_config_zmq_draft=true
fi

if [ -n "${ZMQ_NO_SYNC_RESOLVE}" ]; then
export npm_config_zmq_no_sync_resolve=true
fi

export npm_config_build_from_source=true

# Installing node-gyp globally facilitates calling it in various ways, not just
Expand Down
10 changes: 6 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1662,25 +1662,27 @@ function defineOpt<T extends {prototype: any}, K extends ReadableKeys<PrototypeO
const desc: PropertyDescriptor = {}

if (acc & Acc.Read) {
const getter = `get${type}Option`
if (values) {
desc.get = function get(this: any) {
return values[this[`get${type}Option`](id)]
return values[this[getter](id)]
}
} else {
desc.get = function get(this: any) {
return this[`get${type}Option`](id)
return this[getter](id)
}
}
}

if (acc & Acc.Write) {
const setter = `set${type}Option`
if (values) {
desc.set = function set(this: any, val: any) {
this[`set${type}Option`](id, values.indexOf(val))
this[setter](id, values.indexOf(val))
}
} else {
desc.set = function set(this: any, val: any) {
this[`set${type}Option`](id, val)
this[setter](id, val)
}
}
}
Expand Down
31 changes: 16 additions & 15 deletions src/observer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "incoming_msg.h"
#include "util/async_scope.h"
#include "util/take.h"

#include <array>

Expand Down Expand Up @@ -315,21 +316,18 @@ Napi::Value Observer::Receive(const Napi::CallbackInfo& info) {
if (!ValidateArguments(info, {})) return Env().Undefined();
if (!ValidateOpen()) return Env().Undefined();

if (poller.Reading()) {
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
return Env().Undefined();
}

if (HasEvents()) {
/* We can read from the socket immediately. This is a separate code
path so we can avoid creating a lambda. */
/* We can read from the socket immediately. This is a fast path. */
auto res = Napi::Promise::Deferred::New(Env());
Receive(res);
return res.Promise();
} else {
/* Check if we are already polling for reads. Only one promise may
receive the next message, so we must ensure that receive
operations are in sequence. */
if (poller.PollingReadable()) {
ErrnoException(Env(), EAGAIN).ThrowAsJavaScriptException();
return Env().Undefined();
}

poller.PollReadable(0);
return poller.ReadPromise();
}
}
Expand All @@ -351,13 +349,16 @@ void Observer::Initialize(Module& module, Napi::Object& exports) {
}

void Observer::Poller::ReadableCallback() {
AsyncScope scope(read_deferred.Env(), socket.async_context);
socket.Receive(read_deferred);
assert(read_deferred);

AsyncScope scope(socket.Env(), socket.async_context);
socket.Receive(take(read_deferred));
}

Napi::Value Observer::Poller::ReadPromise() {
read_deferred = Napi::Promise::Deferred(read_deferred.Env());
zmq::Poller<Poller>::PollReadable(0);
return read_deferred.Promise();
assert(!read_deferred);

read_deferred = Napi::Promise::Deferred(socket.Env());
return read_deferred->Promise();
}
}
11 changes: 8 additions & 3 deletions src/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "poller.h"

#include <optional>

namespace zmq {
class Module;

Expand All @@ -31,14 +33,17 @@ class Observer : public Napi::ObjectWrap<Observer>, public Closable {

class Poller : public zmq::Poller<Poller> {
Observer& socket;
Napi::Promise::Deferred read_deferred;
std::optional<Napi::Promise::Deferred> read_deferred;

public:
explicit Poller(Observer& observer)
: socket(observer), read_deferred(socket.Env()) {}
explicit Poller(Observer& observer) : socket(observer) {}

Napi::Value ReadPromise();

inline bool Reading() const {
return read_deferred.has_value();
}

inline bool ValidateReadable() const {
return socket.HasEvents();
}
Expand Down
37 changes: 18 additions & 19 deletions src/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,21 @@ class Poller {
/* Safely close and release all handles. This can be called before
destruction to release resources early. */
inline void Close() {
/* Trigger all watched events manually, which causes any pending
operation to succeed or fail immediately. */
if (events) Trigger(events);
/* Trigger watched events manually, which causes any pending operation
to succeed or fail immediately. */
Trigger(events);

/* Pollers and timers are stopped automatically by uv_close() which is
wrapped in UvHandle. */

/* Release references to all UV handles. */
poll.reset(nullptr);
readable_timer.reset(nullptr);
writable_timer.reset(nullptr);
poll.reset();
readable_timer.reset();
writable_timer.reset();

if (finalize) finalize();
}

inline bool PollingReadable() const {
return events & UV_READABLE;
}

inline bool PollingWritable() const {
return events & UV_WRITABLE;
}

/* Start polling for readable state, with the given timeout. */
inline void PollReadable(int64_t timeout) {
assert((events & UV_READABLE) == 0);
Expand Down Expand Up @@ -119,13 +111,19 @@ class Poller {

/* Trigger any events that are ready. Use validation callbacks to see
which events are actually available. */
inline void Trigger() {
inline void TriggerReadable() {
if (events & UV_READABLE) {
if (static_cast<T*>(this)->ValidateReadable()) Trigger(UV_READABLE);
if (static_cast<T*>(this)->ValidateReadable()) {
Trigger(UV_READABLE);
}
}
}

inline void TriggerWritable() {
if (events & UV_WRITABLE) {
if (static_cast<T*>(this)->ValidateWritable()) Trigger(UV_WRITABLE);
if (static_cast<T*>(this)->ValidateWritable()) {
Trigger(UV_WRITABLE);
}
}
}

Expand Down Expand Up @@ -159,8 +157,9 @@ class Poller {
static void Callback(uv_poll_t* poll, int32_t status, int32_t events) {
if (status == 0) {
auto& poller = *reinterpret_cast<Poller*>(poll->data);
poller.Trigger();
poller.TriggerReadable();
poller.TriggerWritable();
}
};
}
};
}
Loading

0 comments on commit 2f0894b

Please sign in to comment.