Skip to content

Commit

Permalink
Schedule request fibers in Iodine
Browse files Browse the repository at this point in the history
  • Loading branch information
rsamoilov committed Dec 9, 2023
1 parent 4ea4b33 commit acff6e6
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 95 deletions.
4 changes: 4 additions & 0 deletions ext/iodine/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ static void http_resume_wrapper(intptr_t uuid, fio_protocol_s *p_, void *arg) {
http_pause_handle_s *http = arg;
http_s *h = http->h;
h->udata = http->udata;
h->fiber = http->fiber;
h->subscription = http->subscription;
http_vtable_s *vtbl = (http_vtable_s *)h->private_data.vtbl;
if (http->task)
http->task(h);
Expand Down Expand Up @@ -769,6 +771,8 @@ void http_pause(http_s *h, void (*task)(http_pause_handle_s *http)) {
.uuid = p->uuid,
.h = h,
.udata = h->udata,
.fiber = h->fiber,
.subscription = h->subscription,
};
vtbl->http_on_pause(h, p);
fio_defer(http_pause_wrapper, http, (void *)((uintptr_t)task));
Expand Down
9 changes: 4 additions & 5 deletions ext/iodine/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,8 @@ typedef struct {
/** in case the request was paused, this will hold a Ruby fiber, that was scheduled during the request. */
void *fiber;

/**
* in case the request needs to be paused, Iodine will subscribe to a channel with this name;
* once Ruby finishes processing, it will publish to this channel telling Iodine the request can be resumed.
*/
FIOBJ request_id;
/** in case the request needs to be paused, Iodine will subscribe to a channel identified by this object. */
void *subscription;
} http_s;

/**
Expand Down Expand Up @@ -283,6 +280,8 @@ struct http_pause_handle_s {
uintptr_t uuid;
http_s *h;
void *udata;
void *fiber;
void *subscription;
void (*task)(http_s *);
void (*fallback)(void *);
};
Expand Down
11 changes: 3 additions & 8 deletions ext/iodine/http_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ HTTP request/response object management

static inline void http_s_new(http_s *h, http_fio_protocol_s *owner,
http_vtable_s *vtbl) {
FIOBJ request_id = fiobj_str_buf(14);
fiobj_str_write(request_id, "req:", 4);
fiobj_str_write_i(request_id, (uint32_t)fio_rand64());

*h = (http_s){
.private_data =
{
Expand All @@ -110,8 +106,7 @@ static inline void http_s_new(http_s *h, http_fio_protocol_s *owner,
},
.headers = fiobj_hash_new(),
.received_at = fio_last_tick(),
.status = 200,
.request_id = request_id,
.status = 200
};
}

Expand All @@ -129,9 +124,9 @@ static inline void http_s_destroy(http_s *h, uint8_t log) {
fiobj_free(h->cookies);
fiobj_free(h->body);
fiobj_free(h->params);
fiobj_free(h->request_id);

IodineStore.remove((VALUE)h->fiber);
h->fiber = NULL;
h->subscription = NULL;

*h = (http_s){
.private_data.vtbl = h->private_data.vtbl,
Expand Down
143 changes: 62 additions & 81 deletions ext/iodine/iodine_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ static ID each_method_id;
static ID attach_method_id;
static ID iodine_call_proc_id;
static ID fiber_result_var_id;
static VALUE http_wait_directive;
static ID fiber_id_method_id;
static VALUE rb_cFiber;
static ID schedule_method_id;

static VALUE env_template_no_upgrade;
static VALUE env_template_websockets;
Expand Down Expand Up @@ -118,7 +120,7 @@ typedef struct {
IODINE_HTTP_XSENDFILE,
IODINE_HTTP_EMPTY,
IODINE_HTTP_ERROR,
IODINE_HTTP_WAIT,
IODINE_HTTP_DEFERRED,
} type;
enum iodine_upgrade_type_enum {
IODINE_UPGRADE_NONE = 0,
Expand Down Expand Up @@ -470,9 +472,6 @@ static inline VALUE copy2env(iodine_http_request_handle_s *handle) {
/* no TLS, no forwarding, assume `http`, which is the default */
}
}
{
rb_hash_aset(env, IODINE_REQUEST_ID, rb_str_new_cstr(fiobj_obj2cstr(handle->h->request_id).data));
}

/* add all remaining headers */
fiobj_each1(h->headers, 0, iodine_copy2env_task, (void *)env);
Expand Down Expand Up @@ -674,33 +673,36 @@ Handling HTTP requests
static inline void *iodine_handle_request_in_GVL(void *handle_) {
iodine_http_request_handle_s *handle = handle_;
VALUE rbresponse = 0;
VALUE env = 0;
VALUE env = 0, tmp;
http_s *h = handle->h;
if (!h->udata)
goto err_not_found;

// create / register env variable
env = copy2env(handle);
// create rack.io
VALUE tmp = IodineRackIO.create(h, env);
// pass env variable to handler
rbresponse =
IodineCaller.call2((VALUE)h->udata, iodine_call_proc_id, 1, &env);
// close rack.io
IodineRackIO.close(tmp);
if (handle->type == IODINE_HTTP_DEFERRED) {
// the deferred response is now ready so fetch it from the fiber
rbresponse = rb_ivar_get((VALUE)h->fiber, fiber_result_var_id);
} else {
// create / register env variable
env = copy2env(handle);
// create rack.io
tmp = IodineRackIO.create(h, env);
// pass env variable to handler
rbresponse = IodineCaller.call2((VALUE)h->udata, iodine_call_proc_id, 1, &env);
IodineStore.add(rbresponse);
// close rack.io
IodineRackIO.close(tmp);

// if there's a fiber in the http_s object means the request has to be deferred
if (h->fiber)
goto defer;
}

// test handler's return value
if (rbresponse == 0 || rbresponse == Qnil || TYPE(rbresponse) != T_ARRAY) {
goto internal_error;
}

tmp = rb_ary_entry(rbresponse, 0);
// rack will return `[:__http_defer__, fiber_to_wait_on]` in case the request needs to be paused
if (TYPE(tmp) == T_SYMBOL && tmp == http_wait_directive) {
h->fiber = (void *)IodineStore.add(rb_ary_entry(rbresponse, 1));
goto defer;
}

IodineStore.add(rbresponse);
// set response status
if (TYPE(tmp) == T_STRING) {
char *data = RSTRING_PTR(tmp);
Expand Down Expand Up @@ -739,8 +741,7 @@ static inline void *iodine_handle_request_in_GVL(void *handle_) {
// review each header and write it to the response.
rb_hash_foreach(response_headers, for_each_header_data, (VALUE)(h));
// review for upgrade.
if ((intptr_t)h->status < 300 &&
ruby2c_review_upgrade(handle, rbresponse, env))
if (handle->type != IODINE_HTTP_DEFERRED && (intptr_t)h->status < 300 && ruby2c_review_upgrade(handle, rbresponse, env))
goto external_done;
// send the request body.
if (ruby2c_response_send(handle, rbresponse, env))
Expand All @@ -749,68 +750,31 @@ static inline void *iodine_handle_request_in_GVL(void *handle_) {
finish:
IodineStore.remove(rbresponse);
IodineStore.remove(env);
return NULL;
return (void *)rbresponse;

external_done:
IodineStore.remove(rbresponse);
IodineStore.remove(env);
handle->type = IODINE_HTTP_NONE;
return NULL;
return (void *)rbresponse;

err_not_found:
IodineStore.remove(rbresponse);
IodineStore.remove(env);
h->status = 404;
handle->type = IODINE_HTTP_ERROR;
return NULL;
return (void *)rbresponse;

internal_error:
IodineStore.remove(rbresponse);
IodineStore.remove(env);
h->status = 500;
handle->type = IODINE_HTTP_ERROR;
return NULL;
return (void *)rbresponse;

defer:
IodineStore.remove(env);
handle->type = IODINE_HTTP_WAIT;
return NULL;
}

// called once a request that was paused previously needs to be resumed
static inline void *iodine_handle_deferred_request_in_GVL(void *handle_) {
iodine_http_request_handle_s *handle = handle_;
http_s *h = handle->h;

VALUE rbresponse = rb_ivar_get((VALUE)h->fiber, fiber_result_var_id);
VALUE tmp = rb_ary_entry(rbresponse, 0);

// set response status
if (TYPE(tmp) == T_STRING) {
char *data = RSTRING_PTR(tmp);
h->status = fio_atol(&data);
} else if (TYPE(tmp) == T_FIXNUM) {
h->status = FIX2ULONG(tmp);
} else {
goto internal_error;
}

// handle header copy from ruby land to C land.
VALUE response_headers = rb_ary_entry(rbresponse, 1);

// review each header and write it to the response.
rb_hash_foreach(response_headers, for_each_header_data, (VALUE)(h));

// send the request body.
if (ruby2c_response_send(handle, rbresponse, 0))
goto internal_error;

return NULL;

internal_error:
h->status = 500;
handle->type = IODINE_HTTP_ERROR;
return NULL;
return (void *)rbresponse;
}

static inline void
Expand Down Expand Up @@ -856,10 +820,13 @@ static inline void http_resume_deferred_request_handler(http_s *h) {
iodine_http_request_handle_s handle = (iodine_http_request_handle_s){
.h = h,
.upgrade = IODINE_UPGRADE_NONE,
.type = IODINE_HTTP_DEFERRED,
};

IodineCaller.enterGVL((void *(*)(void *))iodine_handle_deferred_request_in_GVL,
&handle);
IodineCaller.enterGVL((void *(*)(void *))iodine_handle_request_in_GVL, &handle);

IodineStore.remove((VALUE)h->fiber);
fio_unsubscribe((subscription_s *)h->subscription);

iodine_perform_handle_action(handle);
}
Expand All @@ -868,29 +835,41 @@ static inline void on_iodine_request_id_message(fio_msg_s *msg) {
http_resume((http_pause_handle_s *)msg->udata1, http_resume_deferred_request_handler, NULL);
}

static inline void http_close_deferred_request_handler(void *sub) {
fio_unsubscribe((subscription_s *)sub);
}

// when Ruby sends a message into the `request_id` channel means the fiber attached to
// to the `http_s h` var can be resumed
static inline void http_pause_request_handler(http_pause_handle_s *s) {
subscription_s *sub = fio_subscribe(.channel = fiobj_obj2cstr(s->h->request_id),
VALUE fiber_id = rb_funcall((VALUE)s->fiber, fiber_id_method_id, 0);

subscription_s *sub = fio_subscribe(.channel = {0, RSTRING_LEN(fiber_id), RSTRING_PTR(fiber_id)},
.on_message = on_iodine_request_id_message,
.udata1 = (void *)s);
fio_uuid_link(s->uuid, (void *)sub, http_close_deferred_request_handler);

s->subscription = (void *)sub;
}

static VALUE fiber_request_block(RB_BLOCK_CALL_FUNC_ARGLIST(_, handle_)) {
VALUE fiber_id = rb_funcall(rb_fiber_current(), fiber_id_method_id, 0);

iodine_http_request_handle_s *handle = (iodine_http_request_handle_s *)handle_;
VALUE rbresponse = (VALUE)IodineCaller.enterGVL((void *(*)(void *))iodine_handle_request_in_GVL, handle);

fio_publish(.channel = {0, RSTRING_LEN(fiber_id), RSTRING_PTR(fiber_id)});
rb_ivar_set(rb_fiber_current(), fiber_result_var_id, rbresponse);

return rbresponse;
}

static void on_rack_request(http_s *h) {
iodine_http_request_handle_s handle = (iodine_http_request_handle_s){
.h = h,
.upgrade = IODINE_UPGRADE_NONE,
};
IodineCaller.enterGVL((void *(*)(void *))iodine_handle_request_in_GVL,
&handle);

if (handle.type == IODINE_HTTP_WAIT) {
http_pause(handle.h, http_pause_request_handler);
VALUE fiber = rb_block_call(rb_cFiber, schedule_method_id, 0, NULL, fiber_request_block, (VALUE)&handle);

// the fiber encountered blocking IO and yielded - pause the request
if (rb_fiber_alive_p(fiber)) {
IodineStore.add(fiber);
h->fiber = (void *)fiber;
http_pause(h, http_pause_request_handler);
} else {
iodine_perform_handle_action(handle);
}
Expand Down Expand Up @@ -1270,7 +1249,9 @@ void iodine_init_http(void) {
attach_method_id = rb_intern("attach_fd");
iodine_call_proc_id = rb_intern("call");
fiber_result_var_id = rb_intern("@__result");
http_wait_directive = ID2SYM(rb_intern("__http_defer__"));
fiber_id_method_id = rb_intern("__get_id");
rb_cFiber = rb_const_get(rb_cObject, rb_intern("Fiber"));
schedule_method_id = rb_intern("schedule");

IodineUTF8Encoding = rb_enc_find("UTF-8");
IodineBinaryEncoding = rb_enc_find("binary");
Expand Down
2 changes: 1 addition & 1 deletion lib/iodine/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Iodine
VERSION = '2.3.0'.freeze
VERSION = '3.0.0'.freeze
end

0 comments on commit acff6e6

Please sign in to comment.