Skip to content

Commit

Permalink
Support listening on pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
jcheng5 committed Apr 2, 2013
1 parent 7418866 commit 50b7e23
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 68 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export(runServer)
export(service)
export(startPipeServer)
export(startServer)
export(stopServer)
export(WebSocket)
Expand Down
8 changes: 6 additions & 2 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ closeWS <- function(conn) {
invisible(.Call('httpuv_closeWS', PACKAGE = 'httpuv', conn))
}

makeServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) {
.Call('httpuv_makeServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
makeTcpServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) {
.Call('httpuv_makeTcpServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
}

makePipeServer <- function(name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) {
.Call('httpuv_makePipeServer', PACKAGE = 'httpuv', name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
}

destroyServer <- function(handle) {
Expand Down
47 changes: 40 additions & 7 deletions R/httpuv.R
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,51 @@ WebSocket <- setRefClass(
#' The given object can be used to be notified when a message is received from
#' the client, to send messages to the client, etc. See \code{\link{WebSocket}}.}
#' }
#'
#' The \code{startPipeServer} variant can be used instead of
#' \code{startServer} to listen on a Unix domain socket or named pipe rather
#' than a TCP socket (this is not common).
#' @seealso \code{\link{runServer}}
#' @aliases startPipeServer
#' @export
startServer <- function(host, port, app) {

appWrapper <- AppWrapper$new(app)
server <- makeServer(host, port,
appWrapper$onHeaders,
appWrapper$onBodyData,
appWrapper$call,
appWrapper$onWSOpen,
appWrapper$onWSMessage,
appWrapper$onWSClose)
server <- makeTcpServer(host, port,
appWrapper$onHeaders,
appWrapper$onBodyData,
appWrapper$call,
appWrapper$onWSOpen,
appWrapper$onWSMessage,
appWrapper$onWSClose)
if (is.null(server)) {
stop("Failed to create server")
}
return(server)
}

#' @param name A string that indicates the path for the domain socket (on
#' Unix-like systems) or the name of the named pipe (on Windows).
#' @param mask If non-\code{NULL} and non-negative, this numeric value is used
#' to temporarily modify the process's umask while the domain socket is being
#' created. To ensure that only root can access the domain socket, use
#' \code{strtoi("777", 8)}; or to allow owner and group read/write access, use
#' \code{strtoi("117", 8)}. If the value is \code{NULL} then the process's
#' umask is left unchanged. (This parameter has no effect on Windows.)
#' @rdname startServer
#' @export
startPipeServer <- function(name, mask, app) {

appWrapper <- AppWrapper$new(app)
if (is.null(mask))
mask <- -1
server <- makePipeServer(name, mask,
appWrapper$onHeaders,
appWrapper$onBodyData,
appWrapper$call,
appWrapper$onWSOpen,
appWrapper$onWSMessage,
appWrapper$onWSClose)
if (is.null(server)) {
stop("Failed to create server")
}
Expand Down
37 changes: 32 additions & 5 deletions man/startServer.Rd
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
\name{startServer}
\alias{startPipeServer}
\alias{startServer}
\title{Create an HTTP/WebSocket server}
\usage{
startServer(host, port, app)

startPipeServer(name, mask, app)
}
\arguments{
\item{host}{A string that is a valid IPv4 address that is
Expand All @@ -16,6 +19,19 @@

\item{app}{A collection of functions that define your
application. See Details.}

\item{name}{A string that indicates the path for the
domain socket (on Unix-like systems) or the name of the
named pipe (on Windows).}

\item{mask}{If non-\code{NULL} and non-negative, this
numeric value is used to temporarily modify the process's
umask while the domain socket is being created. To ensure
that only root can access the domain socket, use
\code{strtoi("777", 8)}; or to allow owner and group
read/write access, use \code{strtoi("117", 8)}. If the
value is \code{NULL} then the process's umask is left
unchanged. (This parameter has no effect on Windows.)}
}
\value{
A handle for this server that can be passed to
Expand All @@ -42,11 +58,22 @@
request, and return an HTTP response. This method should
be implemented in accordance with the
\href{https://github.com/jeffreyhorner/Rook/blob/a5e45f751/README.md}{Rook}
specification.} \item{\code{onWSOpen(ws)}}{Called back
when a WebSocket connection is established. The given
object can be used to be notified when a message is
received from the client, to send messages to the client,
etc. See \code{\link{WebSocket}}.} }
specification.} \item{\code{onHeaders(req)}}{Optional.
Similar to \code{call}, but occurs when headers are
received. Return \code{NULL} to continue normal
processing of the request, or a Rook response to send
that response, stop processing the request, and ask the
client to close the connection. (This can be used to
implement upload size limits, for example.)}
\item{\code{onWSOpen(ws)}}{Called back when a WebSocket
connection is established. The given object can be used
to be notified when a message is received from the
client, to send messages to the client, etc. See
\code{\link{WebSocket}}.} }

The \code{startPipeServer} variant can be used instead of
\code{startServer} to listen on a Unix domain socket or
named pipe rather than a TCP socket (this is not common).
}
\seealso{
\code{\link{runServer}}
Expand Down
25 changes: 21 additions & 4 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ BEGIN_RCPP
return R_NilValue;
END_RCPP
}
// makeServer
Rcpp::RObject makeServer(const std::string& host, int port, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose);
RcppExport SEXP httpuv_makeServer(SEXP hostSEXP, SEXP portSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) {
// makeTcpServer
Rcpp::RObject makeTcpServer(const std::string& host, int port, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose);
RcppExport SEXP httpuv_makeTcpServer(SEXP hostSEXP, SEXP portSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) {
BEGIN_RCPP
Rcpp::RNGScope __rngScope;
std::string host = Rcpp::as<std::string >(hostSEXP);
Expand All @@ -40,7 +40,24 @@ BEGIN_RCPP
Rcpp::Function onWSOpen = Rcpp::as<Rcpp::Function >(onWSOpenSEXP);
Rcpp::Function onWSMessage = Rcpp::as<Rcpp::Function >(onWSMessageSEXP);
Rcpp::Function onWSClose = Rcpp::as<Rcpp::Function >(onWSCloseSEXP);
Rcpp::RObject __result = makeServer(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose);
Rcpp::RObject __result = makeTcpServer(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose);
return Rcpp::wrap(__result);
END_RCPP
}
// makePipeServer
Rcpp::RObject makePipeServer(const std::string& name, int mask, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose);
RcppExport SEXP httpuv_makePipeServer(SEXP nameSEXP, SEXP maskSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) {
BEGIN_RCPP
Rcpp::RNGScope __rngScope;
std::string name = Rcpp::as<std::string >(nameSEXP);
int mask = Rcpp::as<int >(maskSEXP);
Rcpp::Function onHeaders = Rcpp::as<Rcpp::Function >(onHeadersSEXP);
Rcpp::Function onBodyData = Rcpp::as<Rcpp::Function >(onBodyDataSEXP);
Rcpp::Function onRequest = Rcpp::as<Rcpp::Function >(onRequestSEXP);
Rcpp::Function onWSOpen = Rcpp::as<Rcpp::Function >(onWSOpenSEXP);
Rcpp::Function onWSMessage = Rcpp::as<Rcpp::Function >(onWSMessageSEXP);
Rcpp::Function onWSClose = Rcpp::as<Rcpp::Function >(onWSCloseSEXP);
Rcpp::RObject __result = makePipeServer(name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose);
return Rcpp::wrap(__result);
END_RCPP
}
Expand Down
102 changes: 68 additions & 34 deletions src/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,36 @@ void HttpRequest::trace(const std::string& msg) {
//std::cerr << msg << std::endl;
}

uv_tcp_t* HttpRequest::handle() {
return &_handle;
uv_stream_t* HttpRequest::handle() {
return &_handle.stream;
}

Address HttpRequest::serverAddress() {
Address address;

struct sockaddr_in addr = {0};
int len = sizeof(sockaddr_in);
int r = uv_tcp_getsockname(&_handle, (struct sockaddr*)&addr, &len);
if (r) {
// TODO: warn?
return address;
}
if (_handle.isTcp) {
struct sockaddr_in addr = {0};
int len = sizeof(sockaddr_in);
int r = uv_tcp_getsockname(&_handle.tcp, (struct sockaddr*)&addr, &len);
if (r) {
// TODO: warn?
return address;
}

if (addr.sin_family != AF_INET) {
// TODO: warn
return address;
}
if (addr.sin_family != AF_INET) {
// TODO: warn
return address;
}

// addrstr is a pointer to static buffer, no need to free
char* addrstr = inet_ntoa(addr.sin_addr);
if (addrstr)
address.host = std::string(addrstr);
else {
// TODO: warn?
// addrstr is a pointer to static buffer, no need to free
char* addrstr = inet_ntoa(addr.sin_addr);
if (addrstr)
address.host = std::string(addrstr);
else {
// TODO: warn?
}
address.port = ntohs(addr.sin_port);
}
address.port = ntohs(addr.sin_port);

return address;
}
Expand Down Expand Up @@ -250,7 +252,7 @@ void HttpRequest::close() {
if (_protocol == WebSockets)
_pWebApplication->onWSClose(this);
_pSocket->removeConnection(this);
uv_close((uv_handle_t*)&_handle, HttpRequest_on_closed);
uv_close(toHandle(&_handle.stream), HttpRequest_on_closed);
}

void HttpRequest::_on_request_read(uv_stream_t*, ssize_t nread, uv_buf_t buf) {
Expand Down Expand Up @@ -317,7 +319,7 @@ void HttpRequest::_on_request_read(uv_stream_t*, ssize_t nread, uv_buf_t buf) {
}

void HttpRequest::handleRequest() {
int r = uv_read_start((uv_stream_t*)&_handle, &on_alloc, &HttpRequest_on_request_read);
int r = uv_read_start(handle(), &on_alloc, &HttpRequest_on_request_read);
if (r) {
uv_err_t err = uv_last_error(_pLoop);
fatal_error("read_start", uv_strerror(err));
Expand Down Expand Up @@ -362,7 +364,7 @@ void HttpResponse::writeResponse() {
memset(pWriteReq, 0, sizeof(uv_write_t));
pWriteReq->data = this;

int r = uv_write(pWriteReq, toStream(_pRequest->handle()), &headerBuf, 1,
int r = uv_write(pWriteReq, _pRequest->handle(), &headerBuf, 1,
&on_response_written);
if (r) {
_pRequest->fatal_error("uv_write",
Expand All @@ -385,7 +387,7 @@ void HttpResponse::onResponseWritten(int status) {
}
else {
HttpResponseExtendedWrite* pResponseWrite = new HttpResponseExtendedWrite(
this, toStream(_pRequest->handle()), _pBody);
this, _pRequest->handle(), _pBody);
pResponseWrite->begin();
}
}
Expand Down Expand Up @@ -439,7 +441,7 @@ void Socket::destroy() {
// std::cerr << "Request close on " << *it << std::endl;
(*it)->close();
}
uv_close((uv_handle_t*)&handle, on_Socket_close);
uv_close(toHandle(&handle.stream), on_Socket_close);
}

void on_Socket_close(uv_handle_t* pHandle) {
Expand All @@ -460,7 +462,7 @@ void on_request(uv_stream_t* handle, int status) {
HttpRequest* req = new HttpRequest(
handle->loop, pSocket->pWebApplication, pSocket);

int r = uv_accept(handle, (uv_stream_t*)req->handle());
int r = uv_accept(handle, req->handle());
if (r) {
uv_err_t err = uv_last_error(handle->loop);
REprintf("accept: %s\n", uv_strerror(err));
Expand All @@ -472,31 +474,63 @@ void on_request(uv_stream_t* handle, int status) {

}

uv_tcp_t* createServer(uv_loop_t* pLoop, const std::string& host, int port,
WebApplication* pWebApplication) {
uv_stream_t* createPipeServer(uv_loop_t* pLoop, const std::string& name,
int mask, WebApplication* pWebApplication) {

// Deletes itself when destroy() is called, which occurs in freeServer()
Socket* pSocket = new Socket();
// TODO: Handle error
uv_pipe_init(pLoop, &pSocket->handle.pipe, true);
pSocket->handle.isTcp = false;
pSocket->handle.stream.data = pSocket;
pSocket->pWebApplication = pWebApplication;

mode_t oldMask = 0;
if (mask >= 0)
oldMask = umask(mask);
int r = uv_pipe_bind(&pSocket->handle.pipe, name.c_str());
if (mask >= 0)
umask(oldMask);

if (r) {
pSocket->destroy();
return NULL;
}
r = uv_listen((uv_stream_t*)&pSocket->handle.stream, 128, &on_request);
if (r) {
pSocket->destroy();
return NULL;
}

return &pSocket->handle.stream;
}

uv_stream_t* createTcpServer(uv_loop_t* pLoop, const std::string& host,
int port, WebApplication* pWebApplication) {

// Deletes itself when destroy() is called, which occurs in freeServer()
Socket* pSocket = new Socket();
// TODO: Handle error
uv_tcp_init(pLoop, &pSocket->handle);
pSocket->handle.data = pSocket;
uv_tcp_init(pLoop, &pSocket->handle.tcp);
pSocket->handle.isTcp = true;
pSocket->handle.stream.data = pSocket;
pSocket->pWebApplication = pWebApplication;

struct sockaddr_in address = uv_ip4_addr(host.c_str(), port);
int r = uv_tcp_bind(&pSocket->handle, address);
int r = uv_tcp_bind(&pSocket->handle.tcp, address);
if (r) {
pSocket->destroy();
return NULL;
}
r = uv_listen((uv_stream_t*)&pSocket->handle, 128, &on_request);
r = uv_listen((uv_stream_t*)&pSocket->handle.stream, 128, &on_request);
if (r) {
pSocket->destroy();
return NULL;
}

return &pSocket->handle;
return &pSocket->handle.stream;
}
void freeServer(uv_tcp_t* pHandle) {
void freeServer(uv_stream_t* pHandle) {
uv_loop_t* loop = pHandle->loop;
Socket* pSocket = (Socket*)pHandle->data;
pSocket->destroy();
Expand Down
Loading

0 comments on commit 50b7e23

Please sign in to comment.