Skip to content

Commit

Permalink
Merge pull request uNetworking#531 from uWebSockets/tinymem
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB authored Mar 20, 2017
2 parents 31c2832 + f7ec6bb commit aad7447
Show file tree
Hide file tree
Showing 22 changed files with 981 additions and 1,061 deletions.
4 changes: 2 additions & 2 deletions benchmarks/uWS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ int main()
{
uWS::Hub h;

h.onMessage([](uWS::WebSocket<uWS::SERVER> ws, char *message, size_t length, uWS::OpCode opCode) {
ws.send(message, length, opCode);
h.onMessage([](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
ws->send(message, length, opCode);
});

h.listen(3000);
Expand Down
4 changes: 2 additions & 2 deletions examples/echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ int main()
{
uWS::Hub h;

h.onMessage([](uWS::WebSocket<uWS::SERVER> ws, char *message, size_t length, uWS::OpCode opCode) {
ws.send(message, length, opCode);
h.onMessage([](uWS::WebSocket<uWS::SERVER> *ws, char *message, size_t length, uWS::OpCode opCode) {
ws->send(message, length, opCode);
});

h.listen(3000);
Expand Down
55 changes: 28 additions & 27 deletions nodejs/addon.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,22 @@ void deleteGroup(const FunctionCallbackInfo<Value> &args) {
}

template <bool isServer>
inline Local<External> wrapSocket(uWS::WebSocket<isServer> webSocket, Isolate *isolate) {
return External::New(isolate, webSocket.getPollHandle());
inline Local<External> wrapSocket(uWS::WebSocket<isServer> *webSocket, Isolate *isolate) {
return External::New(isolate, webSocket);
}

template <bool isServer>
inline uWS::WebSocket<isServer> unwrapSocket(Local<External> number) {
return uWS::WebSocket<isServer>((Poll *) number->Value());
inline uWS::WebSocket<isServer> *unwrapSocket(Local<External> external) {
return (uWS::WebSocket<isServer> *) external->Value();
}

inline Local<Value> wrapMessage(const char *message, size_t length, uWS::OpCode opCode, Isolate *isolate) {
return opCode == uWS::OpCode::BINARY ? (Local<Value>) ArrayBuffer::New(isolate, (char *) message, length) : (Local<Value>) String::NewFromUtf8(isolate, message, String::kNormalString, length);
}

template <bool isServer>
inline Local<Value> getDataV8(uWS::WebSocket<isServer> webSocket, Isolate *isolate) {
return webSocket.getUserData() ? Local<Value>::New(isolate, *(Persistent<Value> *) webSocket.getUserData()) : Local<Value>::Cast(Undefined(isolate));
inline Local<Value> getDataV8(uWS::WebSocket<isServer> *webSocket, Isolate *isolate) {
return webSocket->getUserData() ? Local<Value>::New(isolate, *(Persistent<Value> *) webSocket->getUserData()) : Local<Value>::Cast(Undefined(isolate));
}

template <bool isServer>
Expand All @@ -114,25 +114,25 @@ void getUserData(const FunctionCallbackInfo<Value> &args) {

template <bool isServer>
void clearUserData(const FunctionCallbackInfo<Value> &args) {
uWS::WebSocket<isServer> webSocket = unwrapSocket<isServer>(args[0].As<External>());
((Persistent<Value> *) webSocket.getUserData())->Reset();
delete (Persistent<Value> *) webSocket.getUserData();
uWS::WebSocket<isServer> *webSocket = unwrapSocket<isServer>(args[0].As<External>());
((Persistent<Value> *) webSocket->getUserData())->Reset();
delete (Persistent<Value> *) webSocket->getUserData();
}

template <bool isServer>
void setUserData(const FunctionCallbackInfo<Value> &args) {
uWS::WebSocket<isServer> webSocket = unwrapSocket<isServer>(args[0].As<External>());
if (webSocket.getUserData()) {
((Persistent<Value> *) webSocket.getUserData())->Reset(args.GetIsolate(), args[1]);
uWS::WebSocket<isServer> *webSocket = unwrapSocket<isServer>(args[0].As<External>());
if (webSocket->getUserData()) {
((Persistent<Value> *) webSocket->getUserData())->Reset(args.GetIsolate(), args[1]);
} else {
webSocket.setUserData(new Persistent<Value>(args.GetIsolate(), args[1]));
webSocket->setUserData(new Persistent<Value>(args.GetIsolate(), args[1]));
}
}

template <bool isServer>
void getAddress(const FunctionCallbackInfo<Value> &args)
{
typename uWS::WebSocket<isServer>::Address address = unwrapSocket<isServer>(args[0].As<External>()).getAddress();
typename uWS::WebSocket<isServer>::Address address = unwrapSocket<isServer>(args[0].As<External>())->getAddress();
Local<Array> array = Array::New(args.GetIsolate(), 3);
array->Set(0, Integer::New(args.GetIsolate(), address.port));
array->Set(1, String::NewFromUtf8(args.GetIsolate(), address.address));
Expand All @@ -154,7 +154,8 @@ struct SendCallbackData {
Isolate *isolate;
};

void sendCallback(void *webSocket, void *data, bool cancelled, void *reserved)
template <bool isServer>
void sendCallback(uWS::WebSocket<isServer> *webSocket, void *data, bool cancelled, void *reserved)
{
SendCallbackData *sc = (SendCallbackData *) data;
if (!cancelled) {
Expand All @@ -172,7 +173,7 @@ void send(const FunctionCallbackInfo<Value> &args)
NativeString nativeString(args[1]);

SendCallbackData *sc = nullptr;
void (*callback)(void *, void *, bool, void *) = nullptr;
void (*callback)(uWS::WebSocket<isServer> *, void *, bool, void *) = nullptr;

if (args[3]->IsFunction()) {
callback = sendCallback;
Expand All @@ -181,14 +182,14 @@ void send(const FunctionCallbackInfo<Value> &args)
sc->isolate = args.GetIsolate();
}

unwrapSocket<isServer>(args[0].As<External>()).send(nativeString.getData(),
unwrapSocket<isServer>(args[0].As<External>())->send(nativeString.getData(),
nativeString.getLength(), opCode, callback, sc);
}

void connect(const FunctionCallbackInfo<Value> &args) {
uWS::Group<uWS::CLIENT> *clientGroup = (uWS::Group<uWS::CLIENT> *) args[0].As<External>()->Value();
NativeString uri(args[1]);
hub.connect(std::string(uri.getData(), uri.getLength()), new Persistent<Value>(args.GetIsolate(), args[2]), 5000, clientGroup);
hub.connect(std::string(uri.getData(), uri.getLength()), new Persistent<Value>(args.GetIsolate(), args[2]), {}, 5000, clientGroup);
}

struct Ticket {
Expand Down Expand Up @@ -248,7 +249,7 @@ void onConnection(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
Persistent<Function> *connectionCallback = &groupData->connectionHandler;
connectionCallback->Reset(isolate, Local<Function>::Cast(args[1]));
group->onConnection([isolate, connectionCallback, groupData](uWS::WebSocket<isServer> webSocket, uWS::HttpRequest req) {
group->onConnection([isolate, connectionCallback, groupData](uWS::WebSocket<isServer> *webSocket, uWS::HttpRequest req) {
groupData->size++;
HandleScope hs(isolate);
Local<Value> argv[] = {wrapSocket(webSocket, isolate)};
Expand All @@ -264,7 +265,7 @@ void onMessage(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
Persistent<Function> *messageCallback = &groupData->messageHandler;
messageCallback->Reset(isolate, Local<Function>::Cast(args[1]));
group->onMessage([isolate, messageCallback](uWS::WebSocket<isServer> webSocket, const char *message, size_t length, uWS::OpCode opCode) {
group->onMessage([isolate, messageCallback](uWS::WebSocket<isServer> *webSocket, const char *message, size_t length, uWS::OpCode opCode) {
HandleScope hs(isolate);
Local<Value> argv[] = {wrapMessage(message, length, opCode, isolate),
getDataV8(webSocket, isolate)};
Expand All @@ -280,7 +281,7 @@ void onPing(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
Persistent<Function> *pingCallback = &groupData->pingHandler;
pingCallback->Reset(isolate, Local<Function>::Cast(args[1]));
group->onPing([isolate, pingCallback](uWS::WebSocket<isServer> webSocket, const char *message, size_t length) {
group->onPing([isolate, pingCallback](uWS::WebSocket<isServer> *webSocket, const char *message, size_t length) {
HandleScope hs(isolate);
Local<Value> argv[] = {wrapMessage(message, length, uWS::OpCode::PING, isolate),
getDataV8(webSocket, isolate)};
Expand All @@ -296,7 +297,7 @@ void onPong(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
Persistent<Function> *pongCallback = &groupData->pongHandler;
pongCallback->Reset(isolate, Local<Function>::Cast(args[1]));
group->onPong([isolate, pongCallback](uWS::WebSocket<isServer> webSocket, const char *message, size_t length) {
group->onPong([isolate, pongCallback](uWS::WebSocket<isServer> *webSocket, const char *message, size_t length) {
HandleScope hs(isolate);
Local<Value> argv[] = {wrapMessage(message, length, uWS::OpCode::PONG, isolate),
getDataV8(webSocket, isolate)};
Expand All @@ -313,7 +314,7 @@ void onDisconnection(const FunctionCallbackInfo<Value> &args) {
Persistent<Function> *disconnectionCallback = &groupData->disconnectionHandler;
disconnectionCallback->Reset(isolate, Local<Function>::Cast(args[1]));

group->onDisconnection([isolate, disconnectionCallback, groupData](uWS::WebSocket<isServer> webSocket, int code, char *message, size_t length) {
group->onDisconnection([isolate, disconnectionCallback, groupData](uWS::WebSocket<isServer> *webSocket, int code, char *message, size_t length) {
groupData->size--;
HandleScope hs(isolate);
Local<Value> argv[] = {wrapSocket(webSocket, isolate),
Expand Down Expand Up @@ -345,12 +346,12 @@ void onError(const FunctionCallbackInfo<Value> &args) {
template <bool isServer>
void closeSocket(const FunctionCallbackInfo<Value> &args) {
NativeString nativeString(args[2]);
unwrapSocket<isServer>(args[0].As<External>()).close(args[1]->IntegerValue(), nativeString.getData(), nativeString.getLength());
unwrapSocket<isServer>(args[0].As<External>())->close(args[1]->IntegerValue(), nativeString.getData(), nativeString.getLength());
}

template <bool isServer>
void terminateSocket(const FunctionCallbackInfo<Value> &args) {
unwrapSocket<isServer>(args[0].As<External>()).terminate();
unwrapSocket<isServer>(args[0].As<External>())->terminate();
}

template <bool isServer>
Expand Down Expand Up @@ -383,7 +384,7 @@ void prepareMessage(const FunctionCallbackInfo<Value> &args) {
template <bool isServer>
void sendPrepared(const FunctionCallbackInfo<Value> &args) {
unwrapSocket<isServer>(args[0].As<External>())
.sendPrepared((typename uWS::WebSocket<isServer>::PreparedMessage *) args[1].As<External>()->Value());
->sendPrepared((typename uWS::WebSocket<isServer>::PreparedMessage *) args[1].As<External>()->Value());
}

template <bool isServer>
Expand All @@ -395,7 +396,7 @@ void forEach(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
uWS::Group<uWS::SERVER> *group = (uWS::Group<uWS::SERVER> *) args[0].As<External>()->Value();
Local<Function> cb = Local<Function>::Cast(args[1]);
group->forEach([isolate, &cb](uWS::WebSocket<uWS::SERVER> webSocket) {
group->forEach([isolate, &cb](uWS::WebSocket<uWS::SERVER> *webSocket) {
Local<Value> argv[] = {
getDataV8(webSocket, isolate)
};
Expand Down
4 changes: 2 additions & 2 deletions nodejs/tests/autobahn.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ non_ssl.on('connection', function(ws) {
});

const options = {
key: fs.readFileSync('../../ssl/key.pem'),
cert: fs.readFileSync('../../ssl/cert.pem'),
key: fs.readFileSync('../../misc/ssl/key.pem'),
cert: fs.readFileSync('../../misc/ssl/cert.pem'),
passphrase: '1234'
};

Expand Down
71 changes: 24 additions & 47 deletions src/Asio.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,94 +98,71 @@ struct Async {

struct Poll {
boost::asio::posix::stream_descriptor *socket;
void *data;
void (*cb)(Poll *p, int status, int events);
Loop *loop;
boost::asio::ip::tcp::socket::native_type fd;

Poll(Loop *loop, uv_os_sock_t fd) {
init(loop, fd);
}

void init(Loop *loop, uv_os_sock_t fd) {
this->fd = fd;
this->loop = loop;
socket = new boost::asio::posix::stream_descriptor(*loop, fd);
socket->non_blocking(true);
}

Poll() {

}

~Poll() {
}

void setData(void *data) {
this->data = data;
}

bool isClosing() {
bool isClosed() {
return !socket;
}

boost::asio::ip::tcp::socket::native_type getFd() {
return fd;//socket->native_handle();
}

void *getData() {
return data;
return socket ? socket->native_handle() : -1;
}

void setCb(void (*cb)(Poll *p, int status, int events)) {
this->cb = cb;
}

void start(int events) {
void start(Loop *, Poll *self, int events) {
if (events & UV_READABLE) {
socket->async_read_some(boost::asio::null_buffers(), [this](boost::system::error_code ec, std::size_t) {
socket->async_read_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) {
if (ec != boost::asio::error::operation_aborted) {
start(UV_READABLE);
cb(this, ec ? -1 : 0, UV_READABLE);
self->start(nullptr, self, UV_READABLE);
self->cb(self, ec ? -1 : 0, UV_READABLE);
}
});
}

if (events & UV_WRITABLE) {
socket->async_write_some(boost::asio::null_buffers(), [this](boost::system::error_code ec, std::size_t) {
socket->async_write_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) {
if (ec != boost::asio::error::operation_aborted) {
start(UV_WRITABLE);
cb(this, ec ? -1 : 0, UV_WRITABLE);
self->start(nullptr, self, UV_WRITABLE);
self->cb(self, ec ? -1 : 0, UV_WRITABLE);
}
});
}
}

void change(int events) {
void change(Loop *, Poll *self, int events) {
socket->cancel();
start(events);
start(nullptr, self, events);
}

void stop() {
bool fastTransfer(Loop *loop, Loop *newLoop, int events) {
return false;
}

// todo: asio is thread safe, use it!
bool threadSafeChange(Loop *loop, Poll *self, int events) {
return false;
}

void stop(Loop *) {
socket->cancel();
}

void close() {
void close(Loop *loop, void (*cb)(Poll *)) {
socket->release();
socket->get_io_service().post([this]() {
delete this;
socket->get_io_service().post([cb, this]() {
cb(this);
});
delete socket;
socket = nullptr;
}

void (*getPollCb())(Poll *, int, int) {
return (void (*)(Poll *, int, int)) cb;
}

Loop *getLoop() {
return loop;//(Loop *) &socket->get_io_service();
}
};

#endif // ASIO_H
11 changes: 4 additions & 7 deletions src/Epoll.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
#include "Backend.h"

#ifdef USE_EPOLL
Loop *loops[128];
int loopHead = 0;

void (*callbacks[128])(Poll *, int, int);
void (*callbacks[16])(Poll *, int, int);
int cbHead = 0;

void Loop::run() {
timepoint = std::chrono::system_clock::now();
while (numPolls) {
for (Poll *c : closing) {
for (std::pair<Poll *, void (*)(Poll *)> c : closing) {
numPolls--;

// probably not correct
delete c;
c.second(c.first);

if (!numPolls) {
closing.clear();
Expand All @@ -27,7 +24,7 @@ void Loop::run() {
for (int i = 0; i < numFdReady; i++) {
Poll *poll = (Poll *) readyEvents[i].data.ptr;
int status = -bool(readyEvents[i].events & EPOLLERR);
callbacks[poll->cbIndex](poll, status, readyEvents[i].events);
callbacks[poll->state.cbIndex](poll, status, readyEvents[i].events);
}

timepoint = std::chrono::system_clock::now();
Expand Down
Loading

0 comments on commit aad7447

Please sign in to comment.