diff --git a/Cargo.toml b/Cargo.toml index 4e2e6db..3f93db6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,27 +1,33 @@ [package] name = "stop-token" -version = "0.2.0" -authors = ["Aleksey Kladov "] +version = "0.7.0" +authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" repository = "https://github.com/async-rs/stop-token" +homepage = "https://docs.rs/stop-token" +documentation = "https://docs.rs/stop-token" -description = "Experimental cooperative cancellation for async-std" +description = "Experimental cooperative cancellation for async Rust" [package.metadata.docs.rs] features = ["docs"] rustdoc-args = ["--cfg", "feature=\"docs\""] [features] +all = ["tokio", "async-io", "async-std"] docs = ["async-io"] [dependencies] -pin-project-lite = "0.2.0" async-channel = "1.6.1" -futures-core = "0.3.17" -tokio = { version = "1.12.0", features = ["time"], optional = true } +async-global-executor = { version = "2.0.2", optional = true } async-io = { version = "1.6.0", optional = true } +async-std = { version = "1.10.0", optional = true } +cfg-if = "1.0.0" +futures-core = "0.3.17" +pin-project-lite = "0.2.0" +tokio = { version = "1.9.0", features = ["time"], optional = true } [dev-dependencies] -async-std = "1.10.0" -tokio = { version = "1.12.0", features = ["rt", "macros"] } +async-std = { version = "1.10.0", features = ["attributes"] } +tokio = { version = "1.9.0", features = ["rt", "macros"] } diff --git a/README.md b/README.md index 840e71e..6610fea 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,102 @@ -# Cooperative cancellation for [async-std](https://async.rs/). +

stop-token

+
+ + Cooperative cancellation for async Rust + +
-Status: experimental. +
+ +
+ + + Crates.io version + + + + Download + + + + docs.rs docs + +
+ +
+

+ + API Docs + + | + + Releases + + | + + Contributing + +

+
See crate docs for details +You can use this crate to create a deadline received through a `StopToken`: + +```rust +use async_std::prelude::*; +use async_std::{stream, task}; + +use stop_token::prelude::*; +use stop_token::StopSource; + +use std::time::Duration; + +#[async_std::main] +async fn main() { + // Create a stop source and generate a token. + let src = StopSource::new(); + let deadline = src.token(); + + // When stop source is dropped, the loop will stop. + // Move the source to a task, and drop it after 100 millis. + task::spawn(async move { + task::sleep(Duration::from_millis(100)).await; + drop(src); + }); + + // Create a stream that generates numbers until + // it receives a signal it needs to stop. + let mut work = stream::repeat(12u8).timeout_at(deadline); + + // Loop over each item in the stream. + while let Some(Ok(ev)) = work.next().await { + println!("{}", ev); + } +} +``` + +Or `Instant` to create a `time`-based deadline: + ```rust -use stop_token::StopToken; +use async_std::prelude::*; +use async_std::stream; + +use stop_token::prelude::*; + +use std::time::{Instant, Duration}; + +#[async_std::main] +async fn main() { + // Create a stream that generates numbers for 100 millis. + let deadline = Instant::now() + Duration::from_millis(100); + let mut work = stream::repeat(12u8).timeout_at(deadline); -async fn do_work(work: impl Stream, stop_token: StopToken) { - // The `work` stream will end early: as soon as `stop_token` is cancelled. - let mut work = stop_token.stop_stream(work); - while let Some(event) = work.next().await { - process_event(event).await + // Loop over each item in the stream. + while let Some(Ok(ev)) = work.next().await { + println!("{}", ev); } } ``` diff --git a/src/async_io.rs b/src/async_io.rs new file mode 100644 index 0000000..911dc59 --- /dev/null +++ b/src/async_io.rs @@ -0,0 +1,82 @@ +//! Create deadlines from `Instant`. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, deadline: Instant) { +//! let mut work = work.timeout_at(deadline); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` + +use async_io::Timer; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use pin_project_lite::pin_project; + +pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub(crate) struct Deadline { + instant: Instant, + #[pin] + delay: Timer, + } +} + +impl Clone for Deadline { + fn clone(&self) -> Self { + Self { + instant: self.instant, + delay: Timer::at(self.instant), + } + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} + +impl Into for std::time::Instant { + fn into(self) -> crate::Deadline { + let deadline = Deadline { + instant: self, + delay: Timer::at(self), + }; + crate::Deadline { + kind: crate::deadline::DeadlineKind::AsyncIo { t: deadline }, + } + } +} diff --git a/src/async_std.rs b/src/async_std.rs new file mode 100644 index 0000000..143f4e2 --- /dev/null +++ b/src/async_std.rs @@ -0,0 +1,51 @@ +use async_std::task::JoinHandle; + +/// Extend the `Task` type `until` method. +pub trait TaskExt { + /// Run a future until it resolves, or until a deadline is hit. + fn timeout_at(self, target: T) -> TimeoutAt + where + Self: Sized, + T: Into, + { + TimeoutAt { + deadline: target.into(), + join_handle: self, + } + } +} + +impl JoinHandleExt for JoinHandle {} + +pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct TimeoutAt { + #[pin] + futur_handlee: F, + #[pin] + deadline: Deadline, + } +} + +impl Future for TimeoutAt +where + F: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(()) = this.deadline.poll(cx) { + let _fut = this.join_handle.cancel(); + return Poll::Ready(Err(TimedOutError::new())); + } + match this.join_handle.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(it) => Poll::Ready(Ok(it)), + } + } +} diff --git a/src/deadline.rs b/src/deadline.rs index df80381..033abdc 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -1,5 +1,13 @@ use core::fmt; -use std::{error::Error, future::Future, io}; +use std::{ + error::Error, + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::StopToken; /// An error returned when a future times out. #[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] @@ -33,13 +41,74 @@ impl fmt::Display for TimedOutError { } } -/// Conversion into a deadline. -/// -/// A deadline is a future which resolves after a certain period or event. -pub trait IntoDeadline { - /// Which kind of future are we turning this into? - type Deadline: Future; +pin_project_lite::pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + #[pin] + pub(crate) kind: DeadlineKind, + } +} + +cfg_if::cfg_if! { + if #[cfg(all(feature = "tokio", feature = "async-io"))] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug, Clone)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + Tokio{#[pin]t: crate::tokio::Deadline}, + AsyncIo{#[pin]t: crate::async_io::Deadline}, + } + } + } else if #[cfg(feature = "tokio")] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug, Clone)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + Tokio{#[pin]t: crate::tokio::Deadline}, + } + } + } else if #[cfg(feature = "async-io")] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug, Clone)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + AsyncIo{#[pin]t: crate::async_io::Deadline}, + } + } + } else { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug, Clone)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + } + } + } +} - /// Creates a deadline from a value. - fn into_deadline(self) -> Self::Deadline; +impl Clone for Deadline { + fn clone(&self) -> Self { + Self { + kind: self.kind.clone(), + } + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project().kind.project() { + DeadlineKindProj::StopToken { t } => t.poll(cx), + #[cfg(feature = "tokio")] + DeadlineKindProj::Tokio { t } => t.poll(cx), + #[cfg(feature = "async-io")] + DeadlineKindProj::AsyncIo { t } => t.poll(cx), + } + } } diff --git a/src/future.rs b/src/future.rs index ef4729e..c03a396 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Future` trait. -use crate::{deadline::TimedOutError, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline}; use core::future::Future; use core::pin::Pin; @@ -10,36 +10,37 @@ use std::task::{Context, Poll}; /// Extend the `Future` trait with the `until` method. pub trait FutureExt: Future { /// Run a future until it resolves, or until a deadline is hit. - fn until(self, target: T) -> Stop + fn timeout_at(self, target: T) -> TimeoutAt where Self: Sized, - T: IntoDeadline, + T: Into, { - Stop { - deadline: target.into_deadline(), + TimeoutAt { + deadline: target.into(), future: self, } } } +impl FutureExt for F {} + pin_project! { /// Run a future until it resolves, or until a deadline is hit. /// /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Stop { + pub struct TimeoutAt { #[pin] future: F, #[pin] - deadline: D, + deadline: Deadline, } } -impl Future for Stop +impl Future for TimeoutAt where F: Future, - D: Future, { type Output = Result; diff --git a/src/lib.rs b/src/lib.rs index b125fdc..b51c483 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -//! Cooperative cancellation for [async-std](https://async.rs/). +//! Cooperative cancellation for async Rust. //! //! # Status //! @@ -34,30 +34,75 @@ //! //! # Usage //! -//! You can use `stop_token` for this: +//! You can use this crate to create a deadline received through a +//! [`StopToken`]. You can think of a `StopSource` + `StopToken` as a +//! single-producer, multi-consumer channel that receives a single message to +//! "stop" when the producer is dropped: //! //! ``` //! use async_std::prelude::*; -//! use stop_token::prelude::*; -//! use stop_token::StopToken; -//! -//! struct Event; +//! use async_std::{stream, task}; //! -//! async fn do_work(work: impl Stream + Unpin, stop: StopToken) { -//! let mut work = work.until(stop); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await +//! use stop_token::prelude::*; +//! use stop_token::StopSource; +//! +//! use std::time::Duration; +//! +//! #[async_std::main] +//! async fn main() { +//! // Create a stop source and generate a token. +//! let src = StopSource::new(); +//! let stop = src.token(); +//! +//! // When stop source is dropped, the loop will stop. +//! // Move the source to a task, and drop it after 100 millis. +//! task::spawn(async move { +//! task::sleep(Duration::from_millis(100)).await; +//! drop(src); +//! }); +//! +//! // Create a stream that generates numbers until +//! // it receives a signal it needs to stop. +//! let mut work = stream::repeat(12u8).timeout_at(stop); +//! +//! // Loop over each item in the stream. +//! while let Some(Ok(ev)) = work.next().await { +//! println!("{}", ev); //! } //! } +//! ``` +//! +//! Or `Instant` to create a `time`-based deadline: +//! +//! ``` +//! # #![allow(dead_code)] +//! use async_std::prelude::*; +//! use async_std::stream; //! -//! async fn process_event(_event: Event) { +//! use stop_token::prelude::*; +//! +//! use std::time::{Duration, Instant}; +//! +//! # #[cfg(feature = "tokio")] +//! # fn main() {} +//! # #[cfg(not(feature = "tokio"))] +//! #[async_std::main] +//! async fn main() { +//! // Create a stream that generates numbers for 100 millis. +//! let stop = Instant::now() + Duration::from_millis(100); +//! let mut work = stream::repeat(12u8).timeout_at(stop); +//! +//! // Loop over each item in the stream. +//! while let Some(Ok(ev)) = work.next().await { +//! println!("{}", ev); +//! } //! } //! ``` //! //! # Features //! -//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`] -//! for `Instant` and `Duration` you can enable one of the following features: +//! The `time` submodule is empty when no features are enabled. To implement `Into` +//! for `Instant` you can enable one of the following features: //! //! - `async-io`: for use with the `async-std` or `smol` runtimes. //! - `tokio`: for use with the `tokio` runtime. @@ -67,14 +112,24 @@ //! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads). //! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660). +#![forbid(unsafe_code)] +#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)] +#![warn(missing_docs, future_incompatible, unreachable_pub)] + pub mod future; pub mod stream; -pub mod time; + +#[cfg(any(feature = "async-io", feature = "docs"))] +pub mod async_io; +#[cfg(feature = "async_std")] +pub mod async_std; +#[cfg(feature = "tokio")] +pub mod tokio; mod deadline; mod stop_source; -pub use deadline::{IntoDeadline, TimedOutError}; +pub use deadline::{Deadline, TimedOutError}; pub use stop_source::{StopSource, StopToken}; /// A prelude for `stop-token`. diff --git a/src/stop_source.rs b/src/stop_source.rs index 4476c4e..a119592 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -5,6 +5,8 @@ use core::task::{Context, Poll}; use async_channel::{bounded, Receiver, Sender}; use futures_core::stream::Stream; +use crate::Deadline; + enum Never {} /// `StopSource` produces `StopToken` and cancels all of its tokens on drop. @@ -12,10 +14,10 @@ enum Never {} /// # Example: /// /// ```ignore -/// let stop_source = StopSource::new(); -/// let stop_token = stop_source.stop_token(); -/// schedule_some_work(stop_token); -/// drop(stop_source); // At this point, scheduled work notices that it is canceled. +/// let source = StopSource::new(); +/// let token = source.token(); +/// schedule_some_work(token); +/// drop(source); // At this point, scheduled work notices that it is canceled. /// ``` #[derive(Debug)] pub struct StopSource { @@ -50,16 +52,16 @@ impl StopSource { /// Produces a new `StopToken`, associated with this source. /// /// Once the source is destroyed, `StopToken` future completes. - pub fn stop_token(&self) -> StopToken { + pub fn token(&self) -> StopToken { self.stop_token.clone() } } -impl super::IntoDeadline for StopToken { - type Deadline = Self; - - fn into_deadline(self) -> Self::Deadline { - self +impl Into for StopToken { + fn into(self) -> Deadline { + Deadline { + kind: crate::deadline::DeadlineKind::StopToken { t: self }, + } } } diff --git a/src/stream.rs b/src/stream.rs index 66d4c27..f932d7d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::{deadline::TimedOutError, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline}; use core::future::Future; use core::pin::Pin; @@ -8,17 +8,18 @@ use futures_core::Stream; use pin_project_lite::pin_project; use std::task::{Context, Poll}; +/// Extend the `Stream` trait with the `until` method. pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, target: T) -> StopStream + fn timeout_at(self, target: T) -> TimeoutAt where Self: Sized, - T: IntoDeadline, + T: Into, { - StopStream { + TimeoutAt { stream: self, - deadline: target.into_deadline(), + deadline: target.into(), } } } @@ -31,18 +32,24 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct StopStream { + pub struct TimeoutAt { #[pin] stream: S, #[pin] - deadline: D, + deadline: Deadline, } } -impl Stream for StopStream +impl TimeoutAt { + /// Unwraps this `Stop` stream, returning the underlying stream. + pub fn into_inner(self) -> S { + self.stream + } +} + +impl Stream for TimeoutAt where S: Stream, - D: Future, { type Item = Result; diff --git a/src/time.rs b/src/time.rs deleted file mode 100644 index 17a3ba0..0000000 --- a/src/time.rs +++ /dev/null @@ -1,139 +0,0 @@ -//! Create deadlines from `Duration` and `Instant` types. -//! -//! # Features -//! -//! This module is empty when no features are enabled. To implement deadlines -//! for `Instant` and `Duration` you can enable one of the following features: -//! -//! - `async-io`: use this when using the `async-std` or `smol` runtimes. -//! - `tokio`: use this when using the `tokio` runtime. -//! -//! # Examples -//! -//! ``` -//! use std::time::Instant; -//! use async_std::prelude::*; -//! use stop_token::prelude::*; -//! use stop_token::StopToken; -//! -//! struct Event; -//! -//! async fn do_work(work: impl Stream + Unpin, until: Instant) { -//! let mut work = work.until(until); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await -//! } -//! } -//! -//! async fn process_event(_event: Event) { -//! } -//! ``` - -#[cfg(feature = "async-io")] -pub use asyncio::*; - -#[cfg(any(feature = "async-io", feature = "docs"))] -mod asyncio { - use async_io::Timer; - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - use crate::IntoDeadline; - - use pin_project_lite::pin_project; - - pin_project! { - /// A future that times out after a duration of time. - #[must_use = "Futures do nothing unless polled or .awaited"] - #[derive(Debug)] - pub struct Deadline { - #[pin] - delay: Timer, - } - } - - impl Future for Deadline { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.delay.poll(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - } - } - } - - impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { - delay: Timer::after(self), - } - } - } - - impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { - delay: Timer::at(self), - } - } - } -} - -#[cfg(feature = "tokio")] -pub use tokiooo::*; - -#[cfg(any(feature = "tokio", feature = "docs"))] -mod tokiooo { - use std::future::{pending, Future, Pending}; - use std::pin::Pin; - use std::task::{Context, Poll}; - use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; - - use crate::IntoDeadline; - - /// A future that times out after a duration of time. - #[must_use = "Futures do nothing unless polled or .awaited"] - #[derive(Debug)] - pub struct Deadline { - delay: Pin>>>, - } - - impl Future for Deadline { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - } - } - } - - impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { - delay: Box::pin(timeout(self, pending())), - } - } - } - - impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - let instant = TokioInstant::from(self); - Deadline { - delay: Box::pin(timeout_at(instant, pending())), - } - } - } -} diff --git a/src/tokio.rs b/src/tokio.rs new file mode 100644 index 0000000..8724b8c --- /dev/null +++ b/src/tokio.rs @@ -0,0 +1,59 @@ +//! Create deadlines from `Instant`. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples + +use std::future::{pending, Future, Pending}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{timeout_at, Instant as TokioInstant, Timeout}; + +/// A future that times out after a duration of time. +#[must_use = "Futures do nothing unless polled or .awaited"] +#[derive(Debug)] +pub(crate) struct Deadline { + instant: TokioInstant, + delay: Pin>>>, +} + +impl Clone for Deadline { + fn clone(&self) -> Self { + let instant = self.instant.clone(); + Self { + instant, + delay: Box::pin(timeout_at(instant, pending())), + } + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} + +impl Into for tokio::time::Instant { + fn into(self) -> crate::Deadline { + let instant = TokioInstant::from(self); + let deadline = Deadline { + instant, + delay: Box::pin(timeout_at(instant, pending())), + }; + + crate::Deadline { + kind: crate::deadline::DeadlineKind::Tokio { t: deadline }, + } + } +} diff --git a/tests/tests.rs b/tests/tests.rs index f302230..0a700db 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -12,13 +12,13 @@ use stop_token::StopSource; fn smoke() { task::block_on(async { let (sender, receiver) = bounded::(10); - let stop_source = StopSource::new(); + let source = StopSource::new(); let task = task::spawn({ - let stop_token = stop_source.stop_token(); + let token = source.token(); let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(stop_token); + let mut stream = receiver.timeout_at(token); while let Some(Ok(x)) = stream.next().await { xs.push(x) } @@ -30,7 +30,7 @@ fn smoke() { sender.send(3).await.unwrap(); task::sleep(Duration::from_millis(250)).await; - drop(stop_source); + drop(source); task::sleep(Duration::from_millis(250)).await; sender.send(4).await.unwrap(); @@ -43,13 +43,14 @@ fn smoke() { #[cfg(feature = "async-io")] #[test] fn async_io_time() { + use std::time::Instant; task::block_on(async { let (sender, receiver) = bounded::(10); let task = task::spawn({ let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(Duration::from_millis(200)); + let mut stream = receiver.timeout_at(Instant::now() + Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) } @@ -72,12 +73,13 @@ fn async_io_time() { #[cfg(feature = "tokio")] #[tokio::test] async fn tokio_time() { + use tokio::time::Instant; let (sender, receiver) = bounded::(10); let task = tokio::task::spawn({ let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(Duration::from_millis(200)); + let mut stream = receiver.timeout_at(Instant::now() + Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) }