Skip to content

Commit

Permalink
fix: Refactor executor (dfinity#357)
Browse files Browse the repository at this point in the history
* Overhaul executor

* Update changelog

* Remove all the implementation details from the documentation

* Update the docs for the new lints

* Update the Rust version elsewhere

* Add PR number to changelog

* lint

* Fix a comment

* Additions

* that doesn't work
  • Loading branch information
adamspofford-dfinity authored Feb 27, 2023
1 parent b730f82 commit 68fb05b
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 229 deletions.
1 change: 1 addition & 0 deletions e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ic-cdk-macros = { path = "../src/ic-cdk-macros" }
ic-cdk-timers = { path = "../src/ic-cdk-timers" }
lazy_static = "1.4.0"
serde_bytes = "0.11"
futures = "0.3"

[[bin]]
name = "simple-kv-store"
Expand Down
17 changes: 17 additions & 0 deletions e2e-tests/canisters/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fn invocation_count() -> u64 {
}

#[update]
#[allow(clippy::await_holding_lock)]
async fn panic_after_async() {
let value = {
let mut lock = RESOURCE
Expand All @@ -36,6 +37,22 @@ async fn panic_after_async() {
ic_cdk::api::trap("Goodbye, cruel world.")
}

#[update]
#[allow(clippy::await_holding_lock)]
async fn panic_twice() {
let _lock = RESOURCE.write().unwrap();
let fut1 = async_then_panic();
let fut2 = async_then_panic();
futures::future::join_all([fut1, fut2]).await;
}

async fn async_then_panic() {
let _: (u64,) = ic_cdk::call(ic_cdk::api::id(), "on_notify", ())
.await
.expect("Failed to call self");
panic!();
}

#[query]
fn notifications_received() -> u64 {
*NOTIFICATIONS_RECEIVED.read().unwrap()
Expand Down
10 changes: 10 additions & 0 deletions e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ fn test_panic_after_async_frees_resources() {
call_candid(&env, canister_id, "invalid_reply_payload_does_not_trap", ())
.expect("call failed");
assert_eq!(&message, "handled decoding error gracefully with code 5");

let err =
call_candid::<_, ()>(&env, canister_id, "panic_twice", ()).expect_err("failed to panic");
assert!(
matches!(err, CallError::UserError(u) if u.description().contains("Call already trapped"))
);
let _: (u64,) = call_candid(&env, canister_id, "notifications_received", ())
.expect("failed to call unrelated function afterwards");
let _: (u64,) =
call_candid(&env, canister_id, "invocation_count", ()).expect("failed to recover lock");
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions library/ic-certified-map/src/hashtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::borrow::Cow;
/// SHA-256 hash bytes.
pub type Hash = [u8; 32];

/// HashTree as defined in the interfaces spec.
/// <https://internetcomputer.org/docs/current/references/ic-interface-spec#certificate>
/// HashTree as defined in the [interfaces spec](https://internetcomputer.org/docs/current/references/ic-interface-spec#certificate).
#[derive(Debug)]
pub enum HashTree<'a> {
Empty,
Expand Down
2 changes: 1 addition & 1 deletion library/ic-certified-map/src/rbtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl<K: 'static + AsRef<[u8]>, V: AsHashTree + 'static> RbTree<K, V> {
) -> HashTree<'a> {
match n {
None => Empty,
Some(n) => match (n).key.as_ref().cmp(lo.as_ref()) {
Some(n) => match n.key.as_ref().cmp(lo.as_ref()) {
Equal => three_way_fork(
n.left_hash_tree(),
match lo {
Expand Down
2 changes: 1 addition & 1 deletion library/ic-ledger-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["internet-computer", "ledger"]
categories = ["cryptography::cryptocurrencies", "data-structures"]
include = ["src", "Cargo.toml", "CHANGELOG.md", "LICENSE", "README.md"]
repository = "https://github.com/dfinity/cdk-rs"
rust-version = "1.60.0"
rust-version = "1.65.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
2 changes: 1 addition & 1 deletion library/ic-ledger-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub struct GetBlocksArgs {
#[derive(CandidType, Deserialize, Clone, Debug)]
pub struct QueryBlocksResponse {
pub chain_length: u64,
/// The replica certificate for the last block hash (see <https://internetcomputer.org/docs/current/references/ic-interface-spec#certification-encoding>).
/// The replica certificate for the last block hash (see [Encoding of Certificates](https://internetcomputer.org/docs/current/references/ic-interface-spec#certification-encoding)).
/// Not available when querying blocks from a canister.
pub certificate: Option<ByteBuf>,
pub blocks: Vec<Block>,
Expand Down
2 changes: 1 addition & 1 deletion src/ic-cdk-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ categories = ["api-bindings", "data-structures", "no-std", "development-tools::f
keywords = ["internet-computer", "types", "dfinity", "canister", "cdk"]
include = ["src", "Cargo.toml", "LICENSE", "README.md"]
repository = "https://github.com/dfinity/cdk-rs"
rust-version = "1.60.0"
rust-version = "1.65.0"

[lib]
proc-macro = true
Expand Down
1 change: 1 addition & 0 deletions src/ic-cdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Refactored

- Change from pleco to tanton for the chess library in the chess example. (#345)
- Refactor the executor to prevent a double-free on `join_all`. (#357)

## [0.6.8] - 2022-11-28

Expand Down
4 changes: 2 additions & 2 deletions src/ic-cdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ categories = ["api-bindings", "data-structures", "no-std", "development-tools::f
keywords = ["internet-computer", "types", "dfinity", "canister", "cdk"]
include = ["src", "Cargo.toml", "LICENSE", "README.md"]
repository = "https://github.com/dfinity/cdk-rs"
rust-version = "1.60.0"
rust-version = "1.65.0"

[dependencies]
candid = "0.8"
Expand All @@ -26,4 +26,4 @@ rstest = "0.12.0"

[package.metadata.docs.rs]
default-target = "wasm32-unknown-unknown"
rustc-args = ["--cfg=docsrs"]
rustdoc-args = ["--cfg=docsrs"]
169 changes: 23 additions & 146 deletions src/ic-cdk/src/api/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,134 +3,14 @@ use crate::api::trap;
use candid::utils::{ArgumentDecoder, ArgumentEncoder};
use candid::{decode_args, encode_args, write_args, CandidType, Deserialize, Principal};
use serde::ser::Error;
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll, Waker};

#[cfg(all(
target_arch = "wasm32-unknown-unknown",
not(target_feature = "atomics")
))]
#[allow(dead_code)]
mod rc {
use std::cell::{RefCell, RefMut};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

pub(crate) type InnerCell<T> = RefCell<T>;

/// A reference counted cell. This is a specific implementation that is
/// both Send and Sync, but does not rely on Mutex and Arc in WASM as
/// the actual implementation of Mutex can break in async flows.
pub(crate) struct WasmCell<T>(Rc<InnerCell<T>>);

/// In order to be able to have an async method that returns the
/// result of a call to another canister, we need that result to
/// be Send + Sync, but Rc and RefCell are not.
///
/// Since inside a canister there isn't actual concurrent access to
/// the referenced cell or the reference counted container, it is
/// safe to force these to be Send/Sync.
unsafe impl<T> Send for WasmCell<T> {}
unsafe impl<T> Sync for WasmCell<T> {}

impl<T> WasmCell<T> {
pub fn new(val: T) -> Self {
WasmCell(Rc::new(InnerCell::new(val)))
}
pub fn into_raw(self) -> *const InnerCell<T> {
Rc::into_raw(self.0)
}
/// # Safety
/// The pointer must have been created with [`into_raw`].
pub unsafe fn from_raw(ptr: *const InnerCell<T>) -> Self {
Self(Rc::from_raw(ptr))
}
pub fn borrow_mut(&self) -> RefMut<'_, T> {
self.0.borrow_mut()
}
pub fn as_ptr(&self) -> *const InnerCell<T> {
self.0.as_ptr() as *const _
}
}

impl<O, T: Future<Output = O>> Future for WasmCell<T> {
type Output = O;

#[allow(unused_mut)]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::new_unchecked(&mut *self.0.borrow_mut()) }.poll(ctx)
}
}

impl<T> Clone for WasmCell<T> {
fn clone(&self) -> Self {
WasmCell(Rc::clone(&self.0))
}
}
}

#[cfg(not(target_arch = "wasm32-unknown-unknown"))]
#[allow(dead_code)]
mod rc {
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll};

pub(crate) type InnerCell<T> = Mutex<T>;

/// A reference counted cell. This is a specific implementation that is
/// both Send and Sync, but does not rely on Mutex and Arc in WASM as
/// the actual implementation of Mutex can break in async flows.
///
/// The RefCell is for
pub(crate) struct WasmCell<T>(Arc<InnerCell<T>>);

impl<T> WasmCell<T> {
pub fn new(val: T) -> Self {
WasmCell(Arc::new(InnerCell::new(val)))
}
pub fn into_raw(self) -> *const InnerCell<T> {
Arc::into_raw(self.0)
}
/// # Safety
/// The pointer must have been created with [`into_raw`].
pub unsafe fn from_raw(ptr: *const InnerCell<T>) -> Self {
// SAFETY: If the pointer was created from into_raw, it internally was created from Arc::into_raw.
Self(unsafe { Arc::from_raw(ptr) })
}
pub fn borrow_mut(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}
pub fn as_ptr(&self) -> *const InnerCell<T> {
Arc::<_>::as_ptr(&self.0)
}
}

impl<O, T: Future<Output = O>> Future for WasmCell<T> {
type Output = O;

#[allow(unused_mut)]
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: this is a projection of self, which is pinned
unsafe { Pin::new_unchecked(&mut *self.0.lock().unwrap()) }.poll(ctx)
}
}

impl<T> Clone for WasmCell<T> {
fn clone(&self) -> Self {
WasmCell(Arc::clone(&self.0))
}
}
}

use rc::{InnerCell, WasmCell};

/// Rejection code from calling another canister.
///
/// These can be obtained either using `reject_code()` or `reject_result()`.
Expand Down Expand Up @@ -175,23 +55,20 @@ impl From<u32> for RejectionCode {
pub type CallResult<R> = Result<R, (RejectionCode, String)>;

// Internal state for the Future when sending a call.
struct CallFutureState<R: serde::de::DeserializeOwned> {
result: Option<CallResult<R>>,
struct CallFutureState {
result: Option<CallResult<Vec<u8>>>,
waker: Option<Waker>,
}

struct CallFuture<R: serde::de::DeserializeOwned> {
// We basically use Rc instead of Arc (since we're single threaded), and use
// RefCell instead of Mutex (because we cannot lock in WASM).
state: rc::WasmCell<CallFutureState<R>>,
struct CallFuture {
state: Rc<RefCell<CallFutureState>>,
}

impl<R: serde::de::DeserializeOwned> Future for CallFuture<R> {
type Output = Result<R, (RejectionCode, String)>;
impl Future for CallFuture {
type Output = CallResult<Vec<u8>>;

fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let self_ref = Pin::into_ref(self);

let self_ref = Pin::into_inner(self);
let mut state = self_ref.state.borrow_mut();

if let Some(result) = state.result.take() {
Expand All @@ -209,10 +86,10 @@ impl<R: serde::de::DeserializeOwned> Future for CallFuture<R> {
///
/// # Safety
///
/// This function must only be passed to the IC with a pointer from WasmCell::into_raw as userdata.
unsafe fn callback(state_ptr: *const InnerCell<CallFutureState<Vec<u8>>>) {
// SAFETY: This function is only ever called by the IC, and we only ever pass a WasmCell as userdata.
let state = unsafe { WasmCell::from_raw(state_ptr) };
/// This function must only be passed to the IC with a pointer from Rc::into_raw as userdata.
unsafe fn callback(state_ptr: *const RefCell<CallFutureState>) {
// SAFETY: This function is only ever called by the IC, and we only ever pass a Rc as userdata.
let state = unsafe { Rc::from_raw(state_ptr) };
// Make sure to un-borrow_mut the state.
{
state.borrow_mut().result = Some(match reject_code() {
Expand All @@ -234,18 +111,18 @@ unsafe fn callback(state_ptr: *const InnerCell<CallFutureState<Vec<u8>>>) {
///
/// # Safety
///
/// This function must only be passed to the IC with a pointer from WasmCell::into_raw as userdata.
unsafe fn cleanup(state_ptr: *const InnerCell<CallFutureState<Vec<u8>>>) {
// SAFETY: This function is only ever called by the IC, and we only ever pass a WasmCell as userdata.
let state = unsafe { WasmCell::from_raw(state_ptr) };
/// This function must only be passed to the IC with a pointer from Rc::into_raw as userdata.
unsafe fn cleanup(state_ptr: *const RefCell<CallFutureState>) {
// SAFETY: This function is only ever called by the IC, and we only ever pass a Rc as userdata.
let state = unsafe { Rc::from_raw(state_ptr) };
// We set the call result, even though it won't be read on the
// default executor, because we can't guarantee it was called on
// our executor. However, we are not allowed to inspect
// reject_code() inside of a cleanup callback, so always set the
// result to a reject.
//
// Borrowing does not trap - the rollback from the
// previous trap ensures that the WasmCell can be borrowed again.
// previous trap ensures that the RefCell can be borrowed again.
state.borrow_mut().result = Some(Err((RejectionCode::NoError, "cleanup".to_string())));
let w = state.borrow_mut().waker.take();
if let Some(waker) = w {
Expand Down Expand Up @@ -387,19 +264,19 @@ fn call_raw_internal(
payment_func: impl FnOnce(),
) -> impl Future<Output = CallResult<Vec<u8>>> {
let callee = id.as_slice();
let state = WasmCell::new(CallFutureState {
let state = Rc::new(RefCell::new(CallFutureState {
result: None,
waker: None,
});
let state_ptr = WasmCell::into_raw(state.clone());
}));
let state_ptr = Rc::into_raw(state.clone());
// SAFETY:
// `callee`, being &[u8], is a readable sequence of bytes and therefore can be passed to ic0.call_new.
// `method`, being &str, is a readable sequence of bytes and therefore can be passed to ic0.call_new.
// `callback` is a function with signature (env : i32) -> () and therefore can be called as both reply and reject fn for ic0.call_new.
// `state_ptr` is a pointer created via WasmCell::into_raw, and can therefore be passed as the userdata for `callback`.
// `state_ptr` is a pointer created via Rc::into_raw, and can therefore be passed as the userdata for `callback`.
// `args`, being a &[u8], is a readable sequence of bytes and therefore can be passed to ic0.call_data_append.
// `cleanup` is a function with signature (env : i32) -> () and therefore can be called as a cleanup fn for ic0.call_on_cleanup.
// `state_ptr` is a pointer created via WasmCell::into_raw, and can therefore be passed as the userdata for `cleanup`.
// `state_ptr` is a pointer created via Rc::into_raw, and can therefore be passed as the userdata for `cleanup`.
// ic0.call_perform is always safe to call.
let err_code = unsafe {
ic0::call_new(
Expand Down
6 changes: 3 additions & 3 deletions src/ic-cdk/src/api/management_canister/http_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use core::hash::Hash;
use serde::{Deserialize, Serialize};

/// "transform" function of type: `func (http_request) -> (http_response) query`
#[derive(Deserialize, Debug, PartialEq, Clone)]
#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct TransformFunc(pub candid::Func);

impl CandidType for TransformFunc {
Expand Down Expand Up @@ -47,7 +47,7 @@ pub struct TransformArgs {
/// function : func (record {response : http_response; context : blob}) -> (http_response) query;
/// context : blob;
/// }`
#[derive(CandidType, Clone, Debug, Deserialize, PartialEq)]
#[derive(CandidType, Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct TransformContext {
/// Reference function with signature: `func (record {response : http_response; context : blob}) -> (http_response) query;`.
pub function: TransformFunc,
Expand Down Expand Up @@ -126,7 +126,7 @@ pub enum HttpMethod {
}

/// Argument type of [http_request].
#[derive(CandidType, Deserialize, Debug, PartialEq, Clone)]
#[derive(CandidType, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct CanisterHttpRequestArgument {
/// The requested URL.
pub url: String,
Expand Down
2 changes: 1 addition & 1 deletion src/ic-cdk/src/api/management_canister/main/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use types::*;

/// Cycles cost to create a canister.
///
/// <https://internetcomputer.org/docs/current/developer-docs/deploy/computation-and-storage-costs>
/// See [Computation and Storage Costs](https://internetcomputer.org/docs/current/developer-docs/deploy/computation-and-storage-costs)
pub const CREATE_CANISTER_CYCLES: u128 = 100_000_000_000u128;

/// Register a new canister and get its canister id.
Expand Down
Loading

0 comments on commit 68fb05b

Please sign in to comment.