Skip to content

Commit

Permalink
Remove obsolete stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
wch committed Nov 21, 2017
1 parent e5988b4 commit 32f0e26
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 234 deletions.
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export(interrupt)
export(rawToBase64)
export(runServer)
export(service)
export(startBackgroundServer)
export(startDaemonizedServer)
export(startPipeServer)
export(startServer)
Expand Down
12 changes: 0 additions & 12 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ makeTcpServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen
.Call('_httpuv_makeTcpServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
}

makeBackgroundTcpServer <- function(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) {
.Call('_httpuv_makeBackgroundTcpServer', PACKAGE = 'httpuv', host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
}

makePipeServer <- function(name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose) {
.Call('_httpuv_makePipeServer', PACKAGE = 'httpuv', name, mask, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose)
}
Expand Down Expand Up @@ -44,14 +40,6 @@ base64encode <- function(x) {
.Call('_httpuv_base64encode', PACKAGE = 'httpuv', x)
}

daemonize <- function(handle) {
.Call('_httpuv_daemonize', PACKAGE = 'httpuv', handle)
}

destroyDaemonizedServer <- function(handle) {
invisible(.Call('_httpuv_destroyDaemonizedServer', PACKAGE = 'httpuv', handle))
}

#' URI encoding/decoding
#'
#' Encodes/decodes strings using URI encoding/decoding in the same way that web
Expand Down
37 changes: 6 additions & 31 deletions R/httpuv.R
Original file line number Diff line number Diff line change
Expand Up @@ -386,24 +386,7 @@ WebSocket <- setRefClass(
startServer <- function(host, port, app) {

appWrapper <- AppWrapper$new(app)
server <- makeTcpServer(host, port,
appWrapper$onHeaders,
appWrapper$onBodyData,
appWrapper$call,
appWrapper$onWSOpen,
appWrapper$onWSMessage,
appWrapper$onWSClose)
if (is.null(server)) {
stop("Failed to create server")
}
return(server)
}

#' @export
startBackgroundServer <- function(host, port, app) {

appWrapper <- AppWrapper$new(app)
server <- makeBackgroundTcpServer(
server <- makeTcpServer(
host, port,
appWrapper$onHeaders,
appWrapper$onBodyData,
Expand All @@ -412,6 +395,7 @@ startBackgroundServer <- function(host, port, app) {
appWrapper$onWSMessage,
appWrapper$onWSClose
)

if (is.null(server)) {
stop("Failed to create server")
}
Expand Down Expand Up @@ -503,7 +487,7 @@ service <- function(timeoutMs = ifelse(interactive(), 100, 1000)) {
#' @export
runServer <- function(host, port, app,
interruptIntervalMs = ifelse(interactive(), 100, 1000)) {
server <- startBackgroundServer(host, port, app)
server <- startServer(host, port, app)
on.exit(stopServer(server))

.globals$stopped <- FALSE
Expand Down Expand Up @@ -588,14 +572,7 @@ rawToBase64 <- function(x) {
#' @seealso \code{\link{startServer}}
#' @export
startDaemonizedServer <- function(host, port, app) {
server <- startServer(host, port, app)
tryCatch({
server <- daemonize(server)
}, error=function(e) {
stopServer(server)
stop(e)
})
return(server)
startServer(host, port, app)
}

#' Stop a running daemonized server in Unix environments
Expand All @@ -606,13 +583,11 @@ startDaemonizedServer <- function(host, port, app) {
#' unbinds the port. \strong{Be careful not to call \code{stopDaemonizedServer} more than
#' once on a handle, as this will cause the R process to crash!}
#'
#' @param server A handle that was previously returned from
#' @param handle A handle that was previously returned from
#' \code{\link{startDaemonizedServer}}.
#'
#' @export
stopDaemonizedServer <- function(server) {
destroyDaemonizedServer(server)
}
stopDaemonizedServer <- stopServer

# Needed so that Rcpp registers the 'httpuv_decodeURIComponent' symbol
legacy_dummy <- function(value){
Expand Down
4 changes: 2 additions & 2 deletions man/stopDaemonizedServer.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 1 addition & 43 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,6 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// makeBackgroundTcpServer
Rcpp::RObject makeBackgroundTcpServer(const std::string& host, int port, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose);
RcppExport SEXP _httpuv_makeBackgroundTcpServer(SEXP hostSEXP, SEXP portSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::string& >::type host(hostSEXP);
Rcpp::traits::input_parameter< int >::type port(portSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onHeaders(onHeadersSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onBodyData(onBodyDataSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onRequest(onRequestSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onWSOpen(onWSOpenSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onWSMessage(onWSMessageSEXP);
Rcpp::traits::input_parameter< Rcpp::Function >::type onWSClose(onWSCloseSEXP);
rcpp_result_gen = Rcpp::wrap(makeBackgroundTcpServer(host, port, onHeaders, onBodyData, onRequest, onWSOpen, onWSMessage, onWSClose));
return rcpp_result_gen;
END_RCPP
}
// makePipeServer
Rcpp::RObject makePipeServer(const std::string& name, int mask, Rcpp::Function onHeaders, Rcpp::Function onBodyData, Rcpp::Function onRequest, Rcpp::Function onWSOpen, Rcpp::Function onWSMessage, Rcpp::Function onWSClose);
RcppExport SEXP _httpuv_makePipeServer(SEXP nameSEXP, SEXP maskSEXP, SEXP onHeadersSEXP, SEXP onBodyDataSEXP, SEXP onRequestSEXP, SEXP onWSOpenSEXP, SEXP onWSMessageSEXP, SEXP onWSCloseSEXP) {
Expand Down Expand Up @@ -111,27 +93,6 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// daemonize
Rcpp::RObject daemonize(std::string handle);
RcppExport SEXP _httpuv_daemonize(SEXP handleSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type handle(handleSEXP);
rcpp_result_gen = Rcpp::wrap(daemonize(handle));
return rcpp_result_gen;
END_RCPP
}
// destroyDaemonizedServer
void destroyDaemonizedServer(std::string handle);
RcppExport SEXP _httpuv_destroyDaemonizedServer(SEXP handleSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< std::string >::type handle(handleSEXP);
destroyDaemonizedServer(handle);
return R_NilValue;
END_RCPP
}
// encodeURI
std::vector<std::string> encodeURI(std::vector<std::string> value);
RcppExport SEXP _httpuv_encodeURI(SEXP valueSEXP) {
Expand Down Expand Up @@ -203,20 +164,17 @@ static const R_CallMethodDef CallEntries[] = {
{"_httpuv_sendWSMessage", (DL_FUNC) &_httpuv_sendWSMessage, 3},
{"_httpuv_closeWS", (DL_FUNC) &_httpuv_closeWS, 1},
{"_httpuv_makeTcpServer", (DL_FUNC) &_httpuv_makeTcpServer, 8},
{"_httpuv_makeBackgroundTcpServer", (DL_FUNC) &_httpuv_makeBackgroundTcpServer, 8},
{"_httpuv_makePipeServer", (DL_FUNC) &_httpuv_makePipeServer, 8},
{"_httpuv_stopServer", (DL_FUNC) &_httpuv_stopServer, 1},
{"_httpuv_stopAllServers", (DL_FUNC) &_httpuv_stopAllServers, 0},
{"_httpuv_base64encode", (DL_FUNC) &_httpuv_base64encode, 1},
{"_httpuv_daemonize", (DL_FUNC) &_httpuv_daemonize, 1},
{"_httpuv_destroyDaemonizedServer", (DL_FUNC) &_httpuv_destroyDaemonizedServer, 1},
{"_httpuv_encodeURI", (DL_FUNC) &_httpuv_encodeURI, 1},
{"_httpuv_encodeURIComponent", (DL_FUNC) &_httpuv_encodeURIComponent, 1},
{"_httpuv_decodeURI", (DL_FUNC) &_httpuv_decodeURI, 1},
{"_httpuv_decodeURIComponent", (DL_FUNC) &_httpuv_decodeURIComponent, 1},
{"_httpuv_invokeCppCallback", (DL_FUNC) &_httpuv_invokeCppCallback, 2},
{"_httpuv_getRNGState", (DL_FUNC) &_httpuv_getRNGState, 0},
{"httpuv_decodeURIComponent", (DL_FUNC) &httpuv_decodeURIComponent, 1},
{"httpuv_decodeURIComponent", (DL_FUNC) &httpuv_decodeURIComponent, 1},
{NULL, NULL, 0}
};

Expand Down
149 changes: 4 additions & 145 deletions src/httpuv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,32 +172,6 @@ Rcpp::RObject makeTcpServer(const std::string& host, int port,
Rcpp::Function onWSMessage,
Rcpp::Function onWSClose) {

using namespace Rcpp;
return R_NilValue;
// Deleted when owning pServer is deleted. If pServer creation fails,
// it's still createTcpServer's responsibility to delete pHandler.
// RWebApplication* pHandler =
// new RWebApplication(onHeaders, onBodyData, onRequest, onWSOpen,
// onWSMessage, onWSClose);
// uv_stream_t* pServer = createTcpServer(
// get_io_loop(), host.c_str(), port, (WebApplication*)pHandler);

// if (!pServer) {
// return R_NilValue;
// }

// return Rcpp::wrap(externalize<uv_stream_t>(pServer));
}

// [[Rcpp::export]]
Rcpp::RObject makeBackgroundTcpServer(const std::string& host, int port,
Rcpp::Function onHeaders,
Rcpp::Function onBodyData,
Rcpp::Function onRequest,
Rcpp::Function onWSOpen,
Rcpp::Function onWSMessage,
Rcpp::Function onWSClose) {

using namespace Rcpp;
REGISTER_MAIN_THREAD()

Expand Down Expand Up @@ -328,130 +302,15 @@ void stop_loop_timer_cb(uv_timer_t* handle) {
}


// ============================================================================
// Miscellaneous utility functions
// ============================================================================

// [[Rcpp::export]]
std::string base64encode(const Rcpp::RawVector& x) {
return b64encode(x.begin(), x.end());
}

/*
* Daemonizing
*
* On UNIX-like environments: Uses the R event loop to trigger the libuv default loop. This is a similar mechanism as that used by Rhttpd.
* It adds an event listener on the port where the TCP server was created by libuv. This triggers uv_run on the
* default loop any time there is an event on the server port. It also adds an event listener to a file descriptor
* exposed by the get_io_loop to trigger uv_run whenever necessary. It uses the non-blocking version
* of uv_run (UV_RUN_NOWAIT).
*
* On Windows: creates a thread that runs the libuv default loop. It uses the usual "service" mechanism
* on the new thread (it uses the run function defined above). TODO: check synchronization.
*
*/

#ifndef WIN32
#include <R_ext/eventloop.h>

#define UVSERVERACTIVITY 55
#define UVLOOPACTIVITY 57
#endif

void loop_input_handler(void *data) {
#ifndef WIN32
// this fake loop is here to force
// processing events
// deals with strange behavior in some Ubuntu installations
for (int i=0; i < 5; ++i) {
uv_run(get_io_loop(), UV_RUN_NOWAIT);
}
#else
bool res = 1;
while (res) {
// res = run(100);
Sleep(1);
}
#endif
}

#ifdef WIN32
static DWORD WINAPI ServerThreadProc(LPVOID lpParameter) {
loop_input_handler(lpParameter);
return 0;
}
#endif

class DaemonizedServer {
public:
uv_stream_t *_pServer;
#ifndef WIN32
InputHandler *serverHandler;
InputHandler *loopHandler;
#else
HANDLE server_thread;
#endif

DaemonizedServer(uv_stream_t *pServer)
: _pServer(pServer) {}

~DaemonizedServer() {
#ifndef WIN32
if (loopHandler) {
removeInputHandler(&R_InputHandlers, loopHandler);
}

if (serverHandler) {
removeInputHandler(&R_InputHandlers, serverHandler);
}
#else
if (server_thread) {
DWORD ts = 0;
if (GetExitCodeThread(server_thread, &ts) && ts == STILL_ACTIVE)
TerminateThread(server_thread, 0);
server_thread = 0;
}
#endif

if (_pServer) {
freeServer(_pServer);
}
}
void setup(){
};
};

// [[Rcpp::export]]
Rcpp::RObject daemonize(std::string handle) {
uv_stream_t *pServer = internalize<uv_stream_t >(handle);
DaemonizedServer *dServer = new DaemonizedServer(pServer);

#ifndef WIN32
int fd = pServer->io_watcher.fd;
dServer->serverHandler = addInputHandler(R_InputHandlers, fd, &loop_input_handler, UVSERVERACTIVITY);

fd = uv_backend_fd(get_io_loop());
dServer->loopHandler = addInputHandler(R_InputHandlers, fd, &loop_input_handler, UVLOOPACTIVITY);
#else
if (dServer->server_thread) {
DWORD ts = 0;
if (GetExitCodeThread(dServer->server_thread, &ts) && ts == STILL_ACTIVE)
TerminateThread(dServer->server_thread, 0);
dServer->server_thread = 0;
}
dServer->server_thread = CreateThread(NULL, 0, ServerThreadProc, 0, 0, 0);
#endif

return Rcpp::wrap(externalize(dServer));
}

// [[Rcpp::export]]
void destroyDaemonizedServer(std::string handle) {
DaemonizedServer *dServer = internalize<DaemonizedServer >(handle);
delete dServer;
}


// ============================================================================
// Miscellaneous utility functions
// ============================================================================

static std::string allowed = ";,/?:@&=+$abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_.!~*'()";

bool isReservedUrlChar(char c) {
Expand Down
4 changes: 4 additions & 0 deletions src/uvutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class WriteOp {
}

void end() {
ASSERT_BACKGROUND_THREAD()
pParent->_pDataSource->freeData(buffer);
pParent->_activeWrites--;

Expand Down Expand Up @@ -48,16 +49,19 @@ uv_buf_t InMemoryDataSource::getData(size_t bytesDesired) {
void InMemoryDataSource::freeData(uv_buf_t buffer) {
}
void InMemoryDataSource::close() {
ASSERT_BACKGROUND_THREAD()
_buffer.clear();
}

void InMemoryDataSource::add(const std::vector<uint8_t>& moreData) {
ASSERT_BACKGROUND_THREAD()
if (_buffer.capacity() < _buffer.size() + moreData.size())
_buffer.reserve(_buffer.size() + moreData.size());
_buffer.insert(_buffer.end(), moreData.begin(), moreData.end());
}

static void writecb(uv_write_t* handle, int status) {
ASSERT_BACKGROUND_THREAD()
WriteOp* pWriteOp = (WriteOp*)handle->data;
pWriteOp->end();
}
Expand Down

0 comments on commit 32f0e26

Please sign in to comment.