Skip to content

Commit

Permalink
move easy handles tracking in @requests from C to Ruby
Browse files Browse the repository at this point in the history
This attempts to address segfaults when curb is used in multithreaded
environment.

The solution is to move `@requests` handling code to remove unsafe API
calls from C (`st.c` is not thread safe).

More details can be found here: https://bugs.ruby-lang.org/issues/14357

clean up unused function and unused variables
  • Loading branch information
robuye authored and taf2 committed Nov 26, 2018
1 parent d8a462c commit 01b715a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 164 deletions.
4 changes: 2 additions & 2 deletions ext/curb_easy.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ static VALUE ruby_curl_easy_initialize(int argc, VALUE *argv, VALUE self) {

rb_easy_set("url", url);

/* set the new_curl pointer to the curl handle */

/* set the pointer to the curl handle */
ecode = curl_easy_setopt(rbce->curl, CURLOPT_PRIVATE, (void*)self);
if (ecode != CURLE_OK) {
raise_curl_easy_error_exception(ecode);
Expand Down Expand Up @@ -2091,7 +2092,6 @@ VALUE ruby_curl_easy_setup(ruby_curl_easy *rbce) {
}

url = rb_check_string_type(_url);

curl_easy_setopt(curl, CURLOPT_URL, StringValuePtr(url));

// network stuff and auth
Expand Down
222 changes: 67 additions & 155 deletions ext/curb_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,9 @@ static void rb_curl_multi_run(VALUE self, CURLM *multi_handle, int *still_runnin

static VALUE callback_exception(VALUE unused) {
return Qfalse;
}

static void curl_multi_mark(ruby_curl_multi *rbcm) {
if (!NIL_P(rbcm->requests)) rb_gc_mark(rbcm->requests);
}

/* Hash#foreach callback for curl_multi_free */
static int curl_multi_flush_easy(VALUE key, VALUE easy, ruby_curl_multi *rbcm) {
CURLMcode result;
ruby_curl_easy *rbce;

// sometimes the type is T_ZOMBIE, e.g. after Ruby has received the SIGTERM signal
if (rb_type(easy) == T_DATA) {
Data_Get_Struct(easy, ruby_curl_easy, rbce);

result = curl_multi_remove_handle(rbcm->handle, rbce->curl);
if (result != 0) {
raise_curl_multi_error_exception(result);
}
}

return ST_DELETE;
}

void curl_multi_free(ruby_curl_multi *rbcm) {
VALUE hash = rbcm->requests;

if (!NIL_P(hash) && rb_type(hash) == T_HASH && RHASH_SIZE(hash) > 0) {

rb_hash_foreach(hash, curl_multi_flush_easy, (VALUE)rbcm);
/* rb_hash_clear(rbcm->requests); */

rbcm->requests = Qnil;
}

curl_multi_cleanup(rbcm->handle);
free(rbcm);
}
Expand All @@ -90,23 +58,23 @@ void curl_multi_free(ruby_curl_multi *rbcm) {
* Create a new Curl::Multi instance
*/
VALUE ruby_curl_multi_new(VALUE klass) {
VALUE new_curlm;

ruby_curl_multi *rbcm = ALLOC(ruby_curl_multi);

rbcm->handle = curl_multi_init();
if (!rbcm->handle) {
rb_raise(mCurlErrFailedInit, "Failed to initialize multi handle");
}

rbcm->requests = rb_hash_new();

rbcm->active = 0;
rbcm->running = 0;

new_curlm = Data_Wrap_Struct(klass, curl_multi_mark, curl_multi_free, rbcm);

return new_curlm;
/*
* The mark routine will be called by the garbage collector during its ``mark'' phase.
* If your structure references other Ruby objects, then your mark function needs to
* identify these objects using rb_gc_mark(value). If the structure doesn't reference
* other Ruby objects, you can simply pass 0 as a function pointer.
*/
return Data_Wrap_Struct(klass, 0, curl_multi_free, rbcm);
}

/*
Expand All @@ -133,52 +101,12 @@ VALUE ruby_curl_multi_get_default_timeout(VALUE klass) {
return LONG2NUM(cCurlMutiDefaulttimeout);
}

/* Hash#foreach callback for ruby_curl_multi_requests */
static int ruby_curl_multi_requests_callback(VALUE key, VALUE value, VALUE result_array) {
rb_ary_push(result_array, value);

return ST_CONTINUE;
}

/*
* call-seq:
* multi.requests => [#<Curl::Easy...>, ...]
*
* Returns an array containing all the active requests on this Curl::Multi object.
*/
static VALUE ruby_curl_multi_requests(VALUE self) {
ruby_curl_multi *rbcm;
VALUE result_array;

Data_Get_Struct(self, ruby_curl_multi, rbcm);

result_array = rb_ary_new();

/* iterate over the requests hash, and stuff references into the array. */
rb_hash_foreach(rbcm->requests, ruby_curl_multi_requests_callback, result_array);

return result_array;
}

/*
* call-seq:
* multi.idle? => true or false
*
* Returns whether or not this Curl::Multi handle is processing any requests. E.g. this returns
* true when multi.requests.length == 0.
*/
static VALUE ruby_curl_multi_idle(VALUE self) {
ruby_curl_multi *rbcm;

Data_Get_Struct(self, ruby_curl_multi, rbcm);

if (RHASH_SIZE(rbcm->requests) == 0) {
return Qtrue;
} else {
return Qfalse;
}
}

/*
* call-seq:
* multi = Curl::Multi.new
Expand Down Expand Up @@ -241,19 +169,12 @@ static VALUE ruby_curl_multi_pipeline(VALUE self, VALUE method) {
*/
VALUE ruby_curl_multi_add(VALUE self, VALUE easy) {
CURLMcode mcode;
VALUE r;
ruby_curl_easy *rbce;
ruby_curl_multi *rbcm;

Data_Get_Struct(self, ruby_curl_multi, rbcm);
Data_Get_Struct(easy, ruby_curl_easy, rbce);

// check if this curl handle has been added before adding again
r = rb_hash_aref(rbcm->requests, LONG2NUM((long)rbce->curl));
if ( r != Qnil ) {
return Qnil;
}

/* setup the easy handle */
ruby_curl_easy_setup( rbce );

Expand All @@ -271,8 +192,6 @@ VALUE ruby_curl_multi_add(VALUE self, VALUE easy) {
/* track a reference to associated multi handle */
rbce->multi = self;

rb_hash_aset( rbcm->requests, LONG2NUM((long)rbce->curl), easy );

return self;
}

Expand All @@ -290,28 +209,20 @@ VALUE ruby_curl_multi_add(VALUE self, VALUE easy) {
*
* Will raise an exception if the easy handle is not found
*/
VALUE ruby_curl_multi_remove(VALUE self, VALUE easy) {
VALUE ruby_curl_multi_remove(VALUE self, VALUE rb_easy_handle) {
ruby_curl_multi *rbcm;

Data_Get_Struct(self, ruby_curl_multi, rbcm);

rb_curl_multi_remove(rbcm,easy);
rb_curl_multi_remove(rbcm, rb_easy_handle);

return self;
}
static void rb_curl_multi_remove(ruby_curl_multi *rbcm, VALUE easy) {
CURLMcode result;
ruby_curl_easy *rbce;
VALUE r;

Data_Get_Struct(easy, ruby_curl_easy, rbce);

// check if this curl handle has been added before removing
r = rb_hash_aref(rbcm->requests, LONG2NUM((long)rbce->curl));
if ( r == Qnil ) {
return;
}

result = curl_multi_remove_handle(rbcm->handle, rbce->curl);
if (result != 0) {
raise_curl_multi_error_exception(result);
Expand All @@ -320,36 +231,6 @@ static void rb_curl_multi_remove(ruby_curl_multi *rbcm, VALUE easy) {
rbcm->active--;

ruby_curl_easy_cleanup( easy, rbce );

// active should equal LONG2NUM(RHASH(rbcm->requests)->tbl->num_entries)
r = rb_hash_delete( rbcm->requests, LONG2NUM((long)rbce->curl) );
if( r != easy || r == Qnil ) {
rb_warn("Possibly lost track of Curl::Easy VALUE, it may not be reclaimed by GC");
}
}

/* Hash#foreach callback for ruby_curl_multi_cancel */
static int ruby_curl_multi_cancel_callback(VALUE key, VALUE value, ruby_curl_multi *rbcm) {
rb_curl_multi_remove(rbcm, value);

return ST_CONTINUE;
}

/*
* call-seq:
* multi.cancel!
*
* Cancels all requests currently being made on this Curl::Multi handle.
*/
static VALUE ruby_curl_multi_cancel(VALUE self) {
ruby_curl_multi *rbcm;

Data_Get_Struct(self, ruby_curl_multi, rbcm);

rb_hash_foreach( rbcm->requests, ruby_curl_multi_cancel_callback, (VALUE)rbcm );

/* for chaining */
return self;
}

// on_success, on_failure, on_complete
Expand All @@ -361,7 +242,6 @@ static VALUE call_status_handler2(VALUE ary) {
}

static void rb_curl_mutli_handle_complete(VALUE self, CURL *easy_handle, int result) {

long response_code = -1;
VALUE easy;
ruby_curl_easy *rbce = NULL;
Expand All @@ -373,7 +253,8 @@ static void rb_curl_mutli_handle_complete(VALUE self, CURL *easy_handle, int res

rbce->last_result = result; /* save the last easy result code */

ruby_curl_multi_remove( self, easy );
// remove the easy handle from multi on completion so it can be reused again
rb_funcall(self, rb_intern("remove"), 1, easy);

/* after running a request cleanup the headers, these are set before each request */
if (rbce->curl_headers) {
Expand Down Expand Up @@ -439,34 +320,60 @@ static void rb_curl_mutli_handle_complete(VALUE self, CURL *easy_handle, int res
}

static void rb_curl_multi_read_info(VALUE self, CURLM *multi_handle) {
int msgs_left, result;
CURLMsg *msg;
CURL *easy_handle;

/* check for finished easy handles and remove from the multi handle */
while ((msg = curl_multi_info_read(multi_handle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
easy_handle = msg->easy_handle;
result = msg->data.result;
if (easy_handle) {
rb_curl_mutli_handle_complete(self, easy_handle, result);
}
}
int msgs_left;

CURLcode c_easy_result;
CURLMsg *c_multi_result; // for picking up messages with the transfer status
CURL *c_easy_handle;

/* Check for finished easy handles and remove from the multi handle.
* curl_multi_info_read will query for messages from individual handles.
*
* The messages fetched with this function are removed from the curl internal
* queue and when there are no messages left it will return NULL (and break
* the loop effectively).
*/
while ((c_multi_result = curl_multi_info_read(multi_handle, &msgs_left))) {
// A message is there, but we really care only about transfer completetion.
if (c_multi_result->msg != CURLMSG_DONE) continue;

c_easy_handle = c_multi_result->easy_handle;
c_easy_result = c_multi_result->data.result; /* return code for transfer */

rb_curl_mutli_handle_complete(self, c_easy_handle, c_easy_result);
}
}

/* called within ruby_curl_multi_perform */
static void rb_curl_multi_run(VALUE self, CURLM *multi_handle, int *still_running) {
CURLMcode mcode;

/*
* curl_multi_perform will return CURLM_CALL_MULTI_PERFORM only when it wants to be called again immediately.
* When things are fine and there is nothing immediate it wants done, it'll return CURLM_OK.
*
* It will perform all pending actions on all added easy handles attached to this multi handle. We will loop
* here as long as mcode is CURLM_CALL_MULTIPERFORM.
*/
do {
mcode = curl_multi_perform(multi_handle, still_running);
} while (mcode == CURLM_CALL_MULTI_PERFORM);

/*
* Nothing more to do, check if an error occured in the loop above and raise an exception if necessary.
*/

if (mcode != CURLM_OK) {
raise_curl_multi_error_exception(mcode);
}


/*
* Everything is ok, but this does not mean all the transfers are completed.
* There is no data to read or write available for Curl at the moment.
*
* At this point we can return control to the caller to do something else while
* curl is waiting for more actions to queue.
*/
}

#ifdef _WIN32
Expand Down Expand Up @@ -549,9 +456,22 @@ VALUE ruby_curl_multi_perform(int argc, VALUE *argv, VALUE self) {

timeout_milliseconds = cCurlMutiDefaulttimeout;

// Run curl_multi_perform for the first time to get the ball rolling
rb_curl_multi_run( self, rbcm->handle, &(rbcm->running) );

// Check the easy handles for new messages one more time before yielding
// control to passed ruby block.
//
// This call will block until all queued messages are processed and if any
// handle completed the transfer we will run the on_complete callback here too.
rb_curl_multi_read_info( self, rbcm->handle );
if (block != Qnil) { rb_funcall(block, rb_intern("call"), 1, self); }

// There are no more messages to handle by curl and we can run the ruby block
// passed to perform method.
// When the block completes curl will resume.
if (block != Qnil) {
rb_funcall(block, rb_intern("call"), 1, self);
}

do {
while (rbcm->running) {
Expand Down Expand Up @@ -651,30 +571,22 @@ VALUE ruby_curl_multi_perform(int argc, VALUE *argv, VALUE self) {

rb_curl_multi_read_info( self, rbcm->handle );
if (block != Qnil) { rb_funcall(block, rb_intern("call"), 1, self); }

return Qtrue;
}

/* =================== INIT LIB =====================*/
void init_curb_multi() {
idCall = rb_intern("call");

cCurlMulti = rb_define_class_under(mCurl, "Multi", rb_cObject);

/* Class methods */
rb_define_singleton_method(cCurlMulti, "new", ruby_curl_multi_new, 0);
rb_define_singleton_method(cCurlMulti, "default_timeout=", ruby_curl_multi_set_default_timeout, 1);
rb_define_singleton_method(cCurlMulti, "default_timeout", ruby_curl_multi_get_default_timeout, 0);

/* "Attributes" */
rb_define_method(cCurlMulti, "requests", ruby_curl_multi_requests, 0);
rb_define_method(cCurlMulti, "idle?", ruby_curl_multi_idle, 0);

/* Instance methods */
rb_define_method(cCurlMulti, "max_connects=", ruby_curl_multi_max_connects, 1);
rb_define_method(cCurlMulti, "pipeline=", ruby_curl_multi_pipeline, 1);
rb_define_method(cCurlMulti, "add", ruby_curl_multi_add, 1);
rb_define_method(cCurlMulti, "remove", ruby_curl_multi_remove, 1);
rb_define_method(cCurlMulti, "cancel!", ruby_curl_multi_cancel, 0);
rb_define_method(cCurlMulti, "_add", ruby_curl_multi_add, 1);
rb_define_method(cCurlMulti, "_remove", ruby_curl_multi_remove, 1);
rb_define_method(cCurlMulti, "perform", ruby_curl_multi_perform, -1);
}
1 change: 0 additions & 1 deletion ext/curb_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
typedef struct {
int active;
int running;
VALUE requests; /* hash of handles currently added */
CURLM *handle;
} ruby_curl_multi;

Expand Down
Loading

0 comments on commit 01b715a

Please sign in to comment.