Skip to content

Commit

Permalink
Modify op dispatcher to include &mut Isolate argument (denoland#4821)
Browse files Browse the repository at this point in the history
- Removes unnecessary RwLock and Rc around the op registry table
- Preparation to move resource_table to deno_core::Isolate.
- Towards denoland#3453, denoland#4222
  • Loading branch information
ry authored Apr 20, 2020
1 parent 4e3532f commit c1ec042
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 205 deletions.
16 changes: 12 additions & 4 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ struct AsyncArgs {
promise_id: Option<u64>,
}

pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
pub fn json_op<D>(
d: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
D: Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| {
move |isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
let async_args: AsyncArgs = match serde_json::from_slice(control) {
Ok(args) => args,
Err(e) => {
Expand All @@ -58,7 +66,7 @@ where

let result = serde_json::from_slice(control)
.map_err(OpError::from)
.and_then(|args| d(args, zero_copy));
.and_then(|args| d(isolate, args, zero_copy));

// Convert to Op
match result {
Expand Down
8 changes: 6 additions & 2 deletions cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,15 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}

pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
pub fn minimal_op<D>(
d: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| {
move |_isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,
None => {
Expand Down
21 changes: 11 additions & 10 deletions cli/ops/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ use crate::fs as deno_fs;
use crate::op_error::OpError;
use crate::ops::json_op;
use crate::state::State;
use deno_core::*;
use deno_core::Isolate;
use deno_core::OpDispatcher;
use deno_core::OpId;
use deno_core::PluginInitContext;
use deno_core::PluginInitFn;
use deno_core::ZeroCopyBuf;
use dlopen::symbor::Library;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::Path;
use std::rc::Rc;

pub fn init(i: &mut Isolate, s: &State, r: Rc<deno_core::OpRegistry>) {
let r_ = r;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
"op_open_plugin",
s.core_op(json_op(s.stateful_op(move |state, args, zero_copy| {
op_open_plugin(&r_, state, args, zero_copy)
}))),
s.core_op(json_op(s.stateful_op2(op_open_plugin))),
);
}

Expand Down Expand Up @@ -52,7 +53,7 @@ struct OpenPluginArgs {
}

pub fn op_open_plugin(
registry: &Rc<deno_core::OpRegistry>,
isolate: &mut deno_core::Isolate,
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
Expand Down Expand Up @@ -91,8 +92,8 @@ pub fn op_open_plugin(
// The inclusion of prefix and rid is designed to avoid any
// op name collision beyond the bound of a single loaded
// plugin instance.
let op_id = registry
.register(&format!("plugin_{}_{}", rid, op.0), state.core_op(op.1));
let op_id = isolate
.register_op(&format!("plugin_{}_{}", rid, op.0), state.core_op(op.1));
plugin_resource.ops.insert(op.0, op_id);
}

Expand Down
18 changes: 14 additions & 4 deletions cli/ops/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ use std::convert::From;
pub fn web_worker_op<D>(
sender: mpsc::Sender<WorkerEvent>,
dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>
) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where
D: Fn(
&mpsc::Sender<WorkerEvent>,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
move |args: Value,
move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) }
}
Expand All @@ -29,7 +34,11 @@ pub fn web_worker_op2<D>(
handle: WebWorkerHandle,
sender: mpsc::Sender<WorkerEvent>,
dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>
) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where
D: Fn(
WebWorkerHandle,
Expand All @@ -38,7 +47,8 @@ where
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
move |args: Value,
move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> {
dispatcher(handle.clone(), &sender, args, zero_copy)
Expand Down
47 changes: 40 additions & 7 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl State {
pub fn stateful_json_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{
Expand All @@ -85,18 +85,21 @@ impl State {
pub fn core_op<D>(
&self,
dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where
D: Fn(&[u8], Option<ZeroCopyBuf>) -> Op,
D: Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op,
{
let state = self.clone();

move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| -> Op {
move |isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>|
-> Op {
let bytes_sent_control = control.len() as u64;
let bytes_sent_zero_copy =
zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64;

let op = dispatcher(control, zero_copy);
let op = dispatcher(isolate, control, zero_copy);

match op {
Op::Sync(buf) => {
Expand Down Expand Up @@ -162,15 +165,45 @@ impl State {
pub fn stateful_op<D>(
&self,
dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>
) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{
let state = self.clone();
move |args: Value,
move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) }
}

pub fn stateful_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where
D: Fn(
&mut deno_core::Isolate,
&State,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
let state = self.clone();
move |isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> {
dispatcher(isolate, &state, args, zero_copy)
}
}
}

impl ModuleLoader for State {
Expand Down
3 changes: 1 addition & 2 deletions cli/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,10 @@ impl WebWorker {
ops::fetch::init(isolate, &state);

if has_deno_namespace {
let op_registry = isolate.op_registry.clone();
ops::runtime_compiler::init(isolate, &state);
ops::fs::init(isolate, &state);
ops::fs_events::init(isolate, &state);
ops::plugins::init(isolate, &state, op_registry);
ops::plugins::init(isolate, &state);
ops::net::init(isolate, &state);
ops::tls::init(isolate, &state);
ops::os::init(isolate, &state);
Expand Down
3 changes: 1 addition & 2 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ impl MainWorker {
let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_);
{
let op_registry = worker.isolate.op_registry.clone();
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::runtime_compiler::init(isolate, &state);
Expand All @@ -231,7 +230,7 @@ impl MainWorker {
ops::fs::init(isolate, &state);
ops::fs_events::init(isolate, &state);
ops::io::init(isolate, &state);
ops::plugins::init(isolate, &state, op_registry);
ops::plugins::init(isolate, &state);
ops::net::init(isolate, &state);
ops::tls::init(isolate, &state);
ops::os::init(isolate, &state);
Expand Down
18 changes: 10 additions & 8 deletions core/es_isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,14 +580,16 @@ pub mod tests {

let mut isolate = EsIsolate::new(loader, StartupData::None, false);

let dispatcher =
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
};
let dispatcher = move |_isolate: &mut Isolate,
control: &[u8],
_zero_copy: Option<ZeroCopyBuf>|
-> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
};

isolate.register_op("test", dispatcher);

Expand Down
64 changes: 34 additions & 30 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,22 @@ impl Isolate {
F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
{
let state = self.state.clone();
let core_handler =
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);

let result: i32 = match handler(state, record.rid, zero_copy_buf) {
Ok(r) => r as i32,
Err(_) => -1,
};
let buf = RecordBuf::from(Record { result, ..record })[..].into();
Op::Sync(buf)
let core_handler = move |_isolate: &mut deno_core::Isolate,
control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);

let result: i32 = match handler(state, record.rid, zero_copy_buf) {
Ok(r) => r as i32,
Err(_) => -1,
};
let buf = RecordBuf::from(Record { result, ..record })[..].into();
Op::Sync(buf)
};

self.core_isolate.register_op(name, core_handler);
}
Expand All @@ -139,25 +141,27 @@ impl Isolate {
<F::Ok as TryInto<i32>>::Error: Debug,
{
let state = self.state.clone();
let core_handler =
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);

let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
.await;
RecordBuf::from(Record { result, ..record })[..].into()
};

Op::Async(fut.boxed_local())
let core_handler = move |_isolate: &mut deno_core::Isolate,
control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
-> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);

let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
.await;
RecordBuf::from(Record { result, ..record })[..].into()
};

Op::Async(fut.boxed_local())
};

self.core_isolate.register_op(name, core_handler);
}
}
Expand Down
Loading

0 comments on commit c1ec042

Please sign in to comment.