From 50b7e2336c923d2142c2e963dcf35ed80730da64 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 2 Apr 2013 13:15:56 -0700 Subject: [PATCH] Support listening on pipes --- NAMESPACE | 1 + R/RcppExports.R | 8 +++- R/httpuv.R | 47 +++++++++++++++++--- man/startServer.Rd | 37 +++++++++++++--- src/RcppExports.cpp | 25 +++++++++-- src/http.cpp | 102 +++++++++++++++++++++++++++++--------------- src/http.h | 26 ++++++++--- src/httpuv.cpp | 44 +++++++++++++++---- 8 files changed, 222 insertions(+), 68 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index b98765cf..18fd42b8 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ export(runServer) export(service) +export(startPipeServer) export(startServer) export(stopServer) export(WebSocket) diff --git a/R/RcppExports.R b/R/RcppExports.R index 21d1f6bb..c749f8a6 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -9,8 +9,12 @@ closeWS <- function(conn) { invisible(.Call('httpuv_closeWS', PACKAGE = 'httpuv', conn)) } -makeServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) { - .Call('httpuv_makeServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) +makeTcpServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) { + .Call('httpuv_makeTcpServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) +} + +makePipeServer <- function(name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) { + .Call('httpuv_makePipeServer', PACKAGE = 'httpuv', name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) } destroyServer <- function(handle) { diff --git a/R/httpuv.R b/R/httpuv.R index e730525c..bd35c058 100644 --- a/R/httpuv.R +++ b/R/httpuv.R @@ -318,18 +318,51 @@ WebSocket <- setRefClass( #' The given object can be used to be notified when a message is received from #' the client, to send messages to the client, etc. See \code{\link{WebSocket}}.} #' } +#' +#' The \code{startPipeServer} variant can be used instead of +#' \code{startServer} to listen on a Unix domain socket or named pipe rather +#' than a TCP socket (this is not common). #' @seealso \code{\link{runServer}} +#' @aliases startPipeServer #' @export startServer <- function(host, port, app) { appWrapper <- AppWrapper$new(app) - server <- makeServer(host, port, - appWrapper$onHeaders, - appWrapper$onBodyData, - appWrapper$call, - appWrapper$onWSOpen, - appWrapper$onWSMessage, - appWrapper$onWSClose) + server <- makeTcpServer(host, port, + appWrapper$onHeaders, + appWrapper$onBodyData, + appWrapper$call, + appWrapper$onWSOpen, + appWrapper$onWSMessage, + appWrapper$onWSClose) + if (is.null(server)) { + stop("Failed to create server") + } + return(server) +} + +#' @param name A string that indicates the path for the domain socket (on +#' Unix-like systems) or the name of the named pipe (on Windows). +#' @param mask If non-\code{NULL} and non-negative, this numeric value is used +#' to temporarily modify the process's umask while the domain socket is being +#' created. To ensure that only root can access the domain socket, use +#' \code{strtoi("777", 8)}; or to allow owner and group read/write access, use +#' \code{strtoi("117", 8)}. If the value is \code{NULL} then the process's +#' umask is left unchanged. (This parameter has no effect on Windows.) +#' @rdname startServer +#' @export +startPipeServer <- function(name, mask, app) { + + appWrapper <- AppWrapper$new(app) + if (is.null(mask)) + mask <- -1 + server <- makePipeServer(name, mask, + appWrapper$onHeaders, + appWrapper$onBodyData, + appWrapper$call, + appWrapper$onWSOpen, + appWrapper$onWSMessage, + appWrapper$onWSClose) if (is.null(server)) { stop("Failed to create server") } diff --git a/man/startServer.Rd b/man/startServer.Rd index 3bc33eb6..13bbe514 100644 --- a/man/startServer.Rd +++ b/man/startServer.Rd @@ -1,8 +1,11 @@ \name{startServer} +\alias{startPipeServer} \alias{startServer} \title{Create an HTTP/WebSocket server} \usage{ startServer(host, port, app) + + startPipeServer(name, mask, app) } \arguments{ \item{host}{A string that is a valid IPv4 address that is @@ -16,6 +19,19 @@ \item{app}{A collection of functions that define your application. See Details.} + + \item{name}{A string that indicates the path for the + domain socket (on Unix-like systems) or the name of the + named pipe (on Windows).} + + \item{mask}{If non-\code{NULL} and non-negative, this + numeric value is used to temporarily modify the process's + umask while the domain socket is being created. To ensure + that only root can access the domain socket, use + \code{strtoi("777", 8)}; or to allow owner and group + read/write access, use \code{strtoi("117", 8)}. If the + value is \code{NULL} then the process's umask is left + unchanged. (This parameter has no effect on Windows.)} } \value{ A handle for this server that can be passed to @@ -42,11 +58,22 @@ request, and return an HTTP response. This method should be implemented in accordance with the \href{https://github.com/jeffreyhorner/Rook/blob/a5e45f751/README.md}{Rook} - specification.} \item{\code{onWSOpen(ws)}}{Called back - when a WebSocket connection is established. The given - object can be used to be notified when a message is - received from the client, to send messages to the client, - etc. See \code{\link{WebSocket}}.} } + specification.} \item{\code{onHeaders(req)}}{Optional. + Similar to \code{call}, but occurs when headers are + received. Return \code{NULL} to continue normal + processing of the request, or a Rook response to send + that response, stop processing the request, and ask the + client to close the connection. (This can be used to + implement upload size limits, for example.)} + \item{\code{onWSOpen(ws)}}{Called back when a WebSocket + connection is established. The given object can be used + to be notified when a message is received from the + client, to send messages to the client, etc. See + \code{\link{WebSocket}}.} } + + The \code{startPipeServer} variant can be used instead of + \code{startServer} to listen on a Unix domain socket or + named pipe rather than a TCP socket (this is not common). } \seealso{ \code{\link{runServer}} diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index aa5ce908..b0d0dfdb 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -27,9 +27,9 @@ BEGIN_RCPP return R_NilValue; END_RCPP } -// makeServer -Rcpp::RObject makeServer(const std::string& host, int port, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose); -RcppExport SEXP httpuv_makeServer(SEXP hostSEXP, SEXP portSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) { +// makeTcpServer +Rcpp::RObject makeTcpServer(const std::string& host, int port, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose); +RcppExport SEXP httpuv_makeTcpServer(SEXP hostSEXP, SEXP portSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) { BEGIN_RCPP Rcpp::RNGScope __rngScope; std::string host = Rcpp::as(hostSEXP); @@ -40,7 +40,24 @@ BEGIN_RCPP Rcpp::Function onWSOpen = Rcpp::as(onWSOpenSEXP); Rcpp::Function onWSMessage = Rcpp::as(onWSMessageSEXP); Rcpp::Function onWSClose = Rcpp::as(onWSCloseSEXP); - Rcpp::RObject __result = makeServer(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose); + Rcpp::RObject __result = makeTcpServer(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose); + return Rcpp::wrap(__result); +END_RCPP +} +// makePipeServer +Rcpp::RObject makePipeServer(const std::string& name, int mask, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose); +RcppExport SEXP httpuv_makePipeServer(SEXP nameSEXP, SEXP maskSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) { +BEGIN_RCPP + Rcpp::RNGScope __rngScope; + std::string name = Rcpp::as(nameSEXP); + int mask = Rcpp::as(maskSEXP); + Rcpp::Function onHeaders = Rcpp::as(onHeadersSEXP); + Rcpp::Function onBodyData = Rcpp::as(onBodyDataSEXP); + Rcpp::Function onRequest = Rcpp::as(onRequestSEXP); + Rcpp::Function onWSOpen = Rcpp::as(onWSOpenSEXP); + Rcpp::Function onWSMessage = Rcpp::as(onWSMessageSEXP); + Rcpp::Function onWSClose = Rcpp::as(onWSCloseSEXP); + Rcpp::RObject __result = makePipeServer(name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose); return Rcpp::wrap(__result); END_RCPP } diff --git a/src/http.cpp b/src/http.cpp index d760263f..931e6309 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -42,34 +42,36 @@ void HttpRequest::trace(const std::string& msg) { //std::cerr << msg << std::endl; } -uv_tcp_t* HttpRequest::handle() { - return &_handle; +uv_stream_t* HttpRequest::handle() { + return &_handle.stream; } Address HttpRequest::serverAddress() { Address address; - struct sockaddr_in addr = {0}; - int len = sizeof(sockaddr_in); - int r = uv_tcp_getsockname(&_handle, (struct sockaddr*)&addr, &len); - if (r) { - // TODO: warn? - return address; - } + if (_handle.isTcp) { + struct sockaddr_in addr = {0}; + int len = sizeof(sockaddr_in); + int r = uv_tcp_getsockname(&_handle.tcp, (struct sockaddr*)&addr, &len); + if (r) { + // TODO: warn? + return address; + } - if (addr.sin_family != AF_INET) { - // TODO: warn - return address; - } + if (addr.sin_family != AF_INET) { + // TODO: warn + return address; + } - // addrstr is a pointer to static buffer, no need to free - char* addrstr = inet_ntoa(addr.sin_addr); - if (addrstr) - address.host = std::string(addrstr); - else { - // TODO: warn? + // addrstr is a pointer to static buffer, no need to free + char* addrstr = inet_ntoa(addr.sin_addr); + if (addrstr) + address.host = std::string(addrstr); + else { + // TODO: warn? + } + address.port = ntohs(addr.sin_port); } - address.port = ntohs(addr.sin_port); return address; } @@ -250,7 +252,7 @@ void HttpRequest::close() { if (_protocol == WebSockets) _pWebApplication->onWSClose(this); _pSocket->removeConnection(this); - uv_close((uv_handle_t*)&_handle, HttpRequest_on_closed); + uv_close(toHandle(&_handle.stream), HttpRequest_on_closed); } void HttpRequest::_on_request_read(uv_stream_t*, ssize_t nread, uv_buf_t buf) { @@ -317,7 +319,7 @@ void HttpRequest::_on_request_read(uv_stream_t*, ssize_t nread, uv_buf_t buf) { } void HttpRequest::handleRequest() { - int r = uv_read_start((uv_stream_t*)&_handle, &on_alloc, &HttpRequest_on_request_read); + int r = uv_read_start(handle(), &on_alloc, &HttpRequest_on_request_read); if (r) { uv_err_t err = uv_last_error(_pLoop); fatal_error("read_start", uv_strerror(err)); @@ -362,7 +364,7 @@ void HttpResponse::writeResponse() { memset(pWriteReq, 0, sizeof(uv_write_t)); pWriteReq->data = this; - int r = uv_write(pWriteReq, toStream(_pRequest->handle()), &headerBuf, 1, + int r = uv_write(pWriteReq, _pRequest->handle(), &headerBuf, 1, &on_response_written); if (r) { _pRequest->fatal_error("uv_write", @@ -385,7 +387,7 @@ void HttpResponse::onResponseWritten(int status) { } else { HttpResponseExtendedWrite* pResponseWrite = new HttpResponseExtendedWrite( - this, toStream(_pRequest->handle()), _pBody); + this, _pRequest->handle(), _pBody); pResponseWrite->begin(); } } @@ -439,7 +441,7 @@ void Socket::destroy() { // std::cerr << "Request close on " << *it << std::endl; (*it)->close(); } - uv_close((uv_handle_t*)&handle, on_Socket_close); + uv_close(toHandle(&handle.stream), on_Socket_close); } void on_Socket_close(uv_handle_t* pHandle) { @@ -460,7 +462,7 @@ void on_request(uv_stream_t* handle, int status) { HttpRequest* req = new HttpRequest( handle->loop, pSocket->pWebApplication, pSocket); - int r = uv_accept(handle, (uv_stream_t*)req->handle()); + int r = uv_accept(handle, req->handle()); if (r) { uv_err_t err = uv_last_error(handle->loop); REprintf("accept: %s\n", uv_strerror(err)); @@ -472,31 +474,63 @@ void on_request(uv_stream_t* handle, int status) { } -uv_tcp_t* createServer(uv_loop_t* pLoop, const std::string& host, int port, - WebApplication* pWebApplication) { +uv_stream_t* createPipeServer(uv_loop_t* pLoop, const std::string& name, + int mask, WebApplication* pWebApplication) { + + // Deletes itself when destroy() is called, which occurs in freeServer() + Socket* pSocket = new Socket(); + // TODO: Handle error + uv_pipe_init(pLoop, &pSocket->handle.pipe, true); + pSocket->handle.isTcp = false; + pSocket->handle.stream.data = pSocket; + pSocket->pWebApplication = pWebApplication; + + mode_t oldMask = 0; + if (mask >= 0) + oldMask = umask(mask); + int r = uv_pipe_bind(&pSocket->handle.pipe, name.c_str()); + if (mask >= 0) + umask(oldMask); + + if (r) { + pSocket->destroy(); + return NULL; + } + r = uv_listen((uv_stream_t*)&pSocket->handle.stream, 128, &on_request); + if (r) { + pSocket->destroy(); + return NULL; + } + + return &pSocket->handle.stream; +} + +uv_stream_t* createTcpServer(uv_loop_t* pLoop, const std::string& host, + int port, WebApplication* pWebApplication) { // Deletes itself when destroy() is called, which occurs in freeServer() Socket* pSocket = new Socket(); // TODO: Handle error - uv_tcp_init(pLoop, &pSocket->handle); - pSocket->handle.data = pSocket; + uv_tcp_init(pLoop, &pSocket->handle.tcp); + pSocket->handle.isTcp = true; + pSocket->handle.stream.data = pSocket; pSocket->pWebApplication = pWebApplication; struct sockaddr_in address = uv_ip4_addr(host.c_str(), port); - int r = uv_tcp_bind(&pSocket->handle, address); + int r = uv_tcp_bind(&pSocket->handle.tcp, address); if (r) { pSocket->destroy(); return NULL; } - r = uv_listen((uv_stream_t*)&pSocket->handle, 128, &on_request); + r = uv_listen((uv_stream_t*)&pSocket->handle.stream, 128, &on_request); if (r) { pSocket->destroy(); return NULL; } - return &pSocket->handle; + return &pSocket->handle.stream; } -void freeServer(uv_tcp_t* pHandle) { +void freeServer(uv_stream_t* pHandle) { uv_loop_t* loop = pHandle->loop; Socket* pSocket = (Socket*)pHandle->data; pSocket->destroy(); diff --git a/src/http.h b/src/http.h index 5baef5fa..f05b572a 100644 --- a/src/http.h +++ b/src/http.h @@ -40,9 +40,18 @@ class WebApplication { virtual void onWSClose(WebSocketConnection* conn) = 0; }; +typedef struct { + union { + uv_stream_t stream; + uv_tcp_t tcp; + uv_pipe_t pipe; + }; + bool isTcp; +} VariantHandle; + class Socket { public: - uv_tcp_t handle; + VariantHandle handle; WebApplication* pWebApplication; std::vector connections; @@ -69,7 +78,7 @@ class HttpRequest : WebSocketConnection { private: uv_loop_t* _pLoop; WebApplication* _pWebApplication; - uv_tcp_t _handle; + VariantHandle _handle; Socket* _pSocket; http_parser _parser; Protocol _protocol; @@ -93,8 +102,9 @@ class HttpRequest : WebSocketConnection { : _pLoop(pLoop), _pWebApplication(pWebApplication), _pSocket(pSocket), _protocol(HTTP), _bytesRead(0), _ignoreNewData(false) { - uv_tcp_init(pLoop, &_handle); - _handle.data = this; + uv_tcp_init(pLoop, &_handle.tcp); + _handle.isTcp = true; + _handle.stream.data = this; http_parser_init(&_parser, HTTP_REQUEST); _parser.data = this; @@ -105,7 +115,7 @@ class HttpRequest : WebSocketConnection { virtual ~HttpRequest() { } - uv_tcp_t* handle(); + uv_stream_t* handle(); Address serverAddress(); void handleRequest(); @@ -183,9 +193,11 @@ DECLARE_CALLBACK_1(HttpRequest, on_closed, void, uv_handle_t*) DECLARE_CALLBACK_3(HttpRequest, on_request_read, void, uv_stream_t*, ssize_t, uv_buf_t) DECLARE_CALLBACK_2(HttpRequest, on_response_write, void, uv_write_t*, int) -uv_tcp_t* createServer(uv_loop_t* loop, const std::string& host, int port, +uv_stream_t* createPipeServer(uv_loop_t* loop, const std::string& name, + int mask, WebApplication* pWebApplication); +uv_stream_t* createTcpServer(uv_loop_t* loop, const std::string& host, int port, WebApplication* pWebApplication); -void freeServer(uv_tcp_t* pServer); +void freeServer(uv_stream_t* pServer); bool runNonBlocking(uv_loop_t* loop); #endif // HTTP_HPP diff --git a/src/httpuv.cpp b/src/httpuv.cpp index 42138f92..16751d79 100644 --- a/src/httpuv.cpp +++ b/src/httpuv.cpp @@ -310,20 +310,20 @@ void closeWS(std::string conn) { void destroyServer(std::string handle); // [[Rcpp::export]] -Rcpp::RObject makeServer(const std::string& host, int port, - Rcpp::Function onHeaders, - Rcpp::Function onBodyData, - Rcpp::Function onRequest, - Rcpp::Function onWSOpen, - Rcpp::Function onWSMessage, - Rcpp::Function onWSClose) { +Rcpp::RObject makeTcpServer(const std::string& host, int port, + Rcpp::Function onHeaders, + Rcpp::Function onBodyData, + Rcpp::Function onRequest, + Rcpp::Function onWSOpen, + Rcpp::Function onWSMessage, + Rcpp::Function onWSClose) { using namespace Rcpp; // Deleted when owning pHandler is deleted RWebApplication* pHandler = new RWebApplication(onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose); - uv_tcp_t* pServer = createServer( + uv_stream_t* pServer = createTcpServer( uv_default_loop(), host.c_str(), port, (WebApplication*)pHandler); if (!pServer) { @@ -334,9 +334,35 @@ Rcpp::RObject makeServer(const std::string& host, int port, return Rcpp::wrap(externalize(pServer)); } +// [[Rcpp::export]] +Rcpp::RObject makePipeServer(const std::string& name, + int mask, + Rcpp::Function onHeaders, + Rcpp::Function onBodyData, + Rcpp::Function onRequest, + Rcpp::Function onWSOpen, + Rcpp::Function onWSMessage, + Rcpp::Function onWSClose) { + + using namespace Rcpp; + // Deleted when owning pHandler is deleted + RWebApplication* pHandler = + new RWebApplication(onHeaders, onBodyData, onRequest, onWSOpen, + onWSMessage, onWSClose); + uv_stream_t* pServer = createPipeServer( + uv_default_loop(), name.c_str(), mask, (WebApplication*)pHandler); + + if (!pServer) { + delete pHandler; + return R_NilValue; + } + + return Rcpp::wrap(externalize(pServer)); +} + // [[Rcpp::export]] void destroyServer(std::string handle) { - uv_tcp_t* pServer = internalize(handle); + uv_stream_t* pServer = internalize(handle); freeServer(pServer); }