From f520d9a0dd5e76c1065d96c1d80ad9d8ac8c82e4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:14:09 +0200 Subject: [PATCH 01/27] docs, into_inner, renames - rename StopStream to Stop - implement into_inner for Stop stream - fix docs --- src/lib.rs | 25 +++++++++++++++++++++++-- src/stream.rs | 15 +++++++++++---- src/time.rs | 2 +- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b125fdc..1d05ef4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ //! //! # Usage //! -//! You can use `stop_token` for this: +//! You can use this crate to create a deadline received through a [`StopToken`]: //! //! ``` //! use async_std::prelude::*; @@ -54,9 +54,30 @@ //! } //! ``` //! +//! Or `Duration` or `Instant` to create a [`time`]-based deadline: +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, until: Instant) { +//! let mut work = work.until(until); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` +//! //! # Features //! -//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`] +//! The `time` submodule is empty when no features are enabled. To implement [`IntoDeadline`] //! for `Instant` and `Duration` you can enable one of the following features: //! //! - `async-io`: for use with the `async-std` or `smol` runtimes. diff --git a/src/stream.rs b/src/stream.rs index 66d4c27..1e30de8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -11,12 +11,12 @@ use std::task::{Context, Poll}; pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, target: T) -> StopStream + fn until(self, target: T) -> Stop where Self: Sized, T: IntoDeadline, { - StopStream { + Stop { stream: self, deadline: target.into_deadline(), } @@ -31,7 +31,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct StopStream { + pub struct Stop { #[pin] stream: S, #[pin] @@ -39,7 +39,14 @@ pin_project! { } } -impl Stream for StopStream +impl Stop { + /// Unwraps this `Stop` stream, returning the underlying stream. + pub fn into_inner(self) -> S { + self.stream + } +} + +impl Stream for Stop where S: Stream, D: Future, diff --git a/src/time.rs b/src/time.rs index 17a3ba0..5ae039e 100644 --- a/src/time.rs +++ b/src/time.rs @@ -89,7 +89,7 @@ mod asyncio { #[cfg(feature = "tokio")] pub use tokiooo::*; -#[cfg(any(feature = "tokio", feature = "docs"))] +#[cfg(feature = "tokio")] mod tokiooo { use std::future::{pending, Future, Pending}; use std::pin::Pin; From 233d2f874c455fe4623ab959e0b8f1440fd0a1fb Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:14:20 +0200 Subject: [PATCH 02/27] 0.3.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4e2e6db..690b9da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.2.0" +version = "0.3.0" authors = ["Aleksey Kladov "] edition = "2018" license = "MIT OR Apache-2.0" From 3efae830802637e11ce5c6a28b8173097962b415 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:16:34 +0200 Subject: [PATCH 03/27] readme --- README.md | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 840e71e..dfeba64 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,44 @@ Status: experimental. See crate docs for details + +You can use this crate to create a deadline received through a `StopToken`: + ```rust +use async_std::prelude::*; +use stop_token::prelude::*; use stop_token::StopToken; -async fn do_work(work: impl Stream, stop_token: StopToken) { - // The `work` stream will end early: as soon as `stop_token` is cancelled. - let mut work = stop_token.stop_stream(work); - while let Some(event) = work.next().await { +struct Event; + +async fn do_work(work: impl Stream + Unpin, stop: StopToken) { + let mut work = work.until(stop); + while let Some(Ok(event)) = work.next().await { process_event(event).await } } + +async fn process_event(_event: Event) { +} +``` + +Or `Duration` or `Instant` to create a `time`-based deadline: + +```rust +use std::time::Instant; +use async_std::prelude::*; +use stop_token::prelude::*; +use stop_token::StopToken; + +struct Event; + +async fn do_work(work: impl Stream + Unpin, until: Instant) { + let mut work = work.until(until); + while let Some(Ok(event)) = work.next().await { + process_event(event).await + } +} + +async fn process_event(_event: Event) { +} ``` From ef8b9550c2bde57c950c8474b9aba2433d16711c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:16:41 +0200 Subject: [PATCH 04/27] v0.3.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 690b9da..79b1678 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.3.0" +version = "0.3.1" authors = ["Aleksey Kladov "] edition = "2018" license = "MIT OR Apache-2.0" From 0b77322a4bb06876e81bdbc633be135a6b194ac0 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:20:13 +0200 Subject: [PATCH 05/27] v0.3.2 --- Cargo.toml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 79b1678..6cba5cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,12 @@ [package] name = "stop-token" -version = "0.3.1" -authors = ["Aleksey Kladov "] +version = "0.3.2" +authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" repository = "https://github.com/async-rs/stop-token" +homepage = "https://docs.rs/stop-token" +documentation = "https://docs.rs/stop-token" description = "Experimental cooperative cancellation for async-std" From ff7d3b243d0c515591339d8ec56f9274fb619808 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 11:23:25 +0200 Subject: [PATCH 06/27] fix all docs --- src/lib.rs | 4 ++++ src/stream.rs | 1 + 2 files changed, 5 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 1d05ef4..4c26f92 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,6 +88,10 @@ //! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads). //! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660). +#![forbid(unsafe_code)] +#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)] +#![warn(missing_docs, future_incompatible, unreachable_pub)] + pub mod future; pub mod stream; pub mod time; diff --git a/src/stream.rs b/src/stream.rs index 1e30de8..7cb4ea8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -8,6 +8,7 @@ use futures_core::Stream; use pin_project_lite::pin_project; use std::task::{Context, Poll}; +/// Extend the `Stream` trait with the `until` method. pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. From 02ac38b4ce0fc3c4963855e5f5611d805ebe6ca4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 11:23:38 +0200 Subject: [PATCH 07/27] v0.3.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6cba5cb..f66ae42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.3.2" +version = "0.3.3" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From 7f6e2fadf68f2fc83f04d325dacf0e48badd7b94 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 12:32:26 +0200 Subject: [PATCH 08/27] description --- Cargo.toml | 2 +- README.md | 44 +++++++++++++++++++++++++++++++++++++++++--- src/lib.rs | 2 +- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f66ae42..1a4eec8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/async-rs/stop-token" homepage = "https://docs.rs/stop-token" documentation = "https://docs.rs/stop-token" -description = "Experimental cooperative cancellation for async-std" +description = "Experimental cooperative cancellation for async Rust" [package.metadata.docs.rs] features = ["docs"] diff --git a/README.md b/README.md index dfeba64..7be9e88 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,47 @@ -# Cooperative cancellation for [async-std](https://async.rs/). +

stop-token

+
+ + Cooperative cancellation for async Rust + +
-Status: experimental. +
-See crate docs for details +
+ + + Crates.io version + + + + Download + + + + docs.rs docs + +
+ + +See crate docs for details You can use this crate to create a deadline received through a `StopToken`: diff --git a/src/lib.rs b/src/lib.rs index 4c26f92..ce619ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -//! Cooperative cancellation for [async-std](https://async.rs/). +//! Cooperative cancellation for async Rust. //! //! # Status //! From 5a30855cf9fc416c12bb0df211edcfd89ae188eb Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 12:32:38 +0200 Subject: [PATCH 09/27] v0.3.4 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1a4eec8..90466ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.3.3" +version = "0.3.4" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From fbd0b808ae5de714000db92830ff09e59d1706ef Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 12:51:04 +0200 Subject: [PATCH 10/27] rename api and improve examples --- Cargo.toml | 2 +- README.md | 55 +++++++++++++++++++++++++++--------------- src/lib.rs | 59 +++++++++++++++++++++++++++++----------------- src/stop_source.rs | 10 ++++---- tests/tests.rs | 8 +++---- 5 files changed, 84 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 90466ca..fe9ff47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,5 +25,5 @@ tokio = { version = "1.12.0", features = ["time"], optional = true } async-io = { version = "1.6.0", optional = true } [dev-dependencies] -async-std = "1.10.0" +async-std = { version = "1.10.0", features = ["attributes"] } tokio = { version = "1.12.0", features = ["rt", "macros"] } diff --git a/README.md b/README.md index 7be9e88..a15d423 100644 --- a/README.md +++ b/README.md @@ -47,39 +47,56 @@ You can use this crate to create a deadline received through a `StopToken`: ```rust use async_std::prelude::*; +use async_std::{stream, task}; + use stop_token::prelude::*; -use stop_token::StopToken; +use stop_token::StopSource; -struct Event; +use std::time::Duration; -async fn do_work(work: impl Stream + Unpin, stop: StopToken) { - let mut work = work.until(stop); - while let Some(Ok(event)) = work.next().await { - process_event(event).await - } -} +#[async_std::main] +async fn main() { + // Create a stop source and generate a token. + let src = StopSource::new(); + let stop = src.token(); + + // When stop source is dropped, the loop will stop. + // Move the source to a task, and drop it after 100 millis. + task::spawn(async move { + task::sleep(Duration::from_millis(100)).await; + drop(src); + }); + + // Create a stream that generates numbers until + // it receives a signal it needs to stop. + let mut work = stream::repeat(12u8).until(stop); -async fn process_event(_event: Event) { + // Loop over each item in the stream. + while let Some(Ok(ev)) = work.next().await { + println!("{}", ev); + } } ``` Or `Duration` or `Instant` to create a `time`-based deadline: ```rust -use std::time::Instant; use async_std::prelude::*; +use async_std::stream; + use stop_token::prelude::*; -use stop_token::StopToken; -struct Event; +use std::time::Duration; -async fn do_work(work: impl Stream + Unpin, until: Instant) { - let mut work = work.until(until); - while let Some(Ok(event)) = work.next().await { - process_event(event).await - } -} +#[async_std::main] +async fn main() { + // Create a stream that generates numbers for 100 millis. + let stop = Duration::from_millis(100); + let mut work = stream::repeat(12u8).until(stop); -async fn process_event(_event: Event) { + // Loop over each item in the stream. + while let Some(Ok(ev)) = work.next().await { + println!("{}", ev); + } } ``` diff --git a/src/lib.rs b/src/lib.rs index ce619ff..9abe5dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,40 +38,57 @@ //! //! ``` //! use async_std::prelude::*; -//! use stop_token::prelude::*; -//! use stop_token::StopToken; -//! -//! struct Event; +//! use async_std::{stream, task}; //! -//! async fn do_work(work: impl Stream + Unpin, stop: StopToken) { -//! let mut work = work.until(stop); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await +//! use stop_token::prelude::*; +//! use stop_token::StopSource; +//! +//! use std::time::Duration; +//! +//! #[async_std::main] +//! async fn main() { +//! // Create a stop source and generate a token. +//! let src = StopSource::new(); +//! let stop = src.token(); +//! +//! // When stop source is dropped, the loop will stop. +//! // Move the source to a task, and drop it after 100 millis. +//! task::spawn(async move { +//! task::sleep(Duration::from_millis(100)).await; +//! drop(src); +//! }); +//! +//! // Create a stream that generates numbers until +//! // it receives a signal it needs to stop. +//! let mut work = stream::repeat(12u8).until(stop); +//! +//! // Loop over each item in the stream. +//! while let Some(Ok(ev)) = work.next().await { +//! println!("{}", ev); //! } //! } -//! -//! async fn process_event(_event: Event) { -//! } //! ``` //! //! Or `Duration` or `Instant` to create a [`time`]-based deadline: //! //! ``` -//! use std::time::Instant; //! use async_std::prelude::*; +//! use async_std::stream; +//! //! use stop_token::prelude::*; -//! use stop_token::StopToken; //! -//! struct Event; +//! use std::time::Duration; //! -//! async fn do_work(work: impl Stream + Unpin, until: Instant) { -//! let mut work = work.until(until); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await -//! } -//! } +//! #[async_std::main] +//! async fn main() { +//! // Create a stream that generates numbers for 100 millis. +//! let stop = Duration::from_millis(100); +//! let mut work = stream::repeat(12u8).until(stop); //! -//! async fn process_event(_event: Event) { +//! // Loop over each item in the stream. +//! while let Some(Ok(ev)) = work.next().await { +//! println!("{}", ev); +//! } //! } //! ``` //! diff --git a/src/stop_source.rs b/src/stop_source.rs index 4476c4e..c46ade6 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -12,10 +12,10 @@ enum Never {} /// # Example: /// /// ```ignore -/// let stop_source = StopSource::new(); -/// let stop_token = stop_source.stop_token(); -/// schedule_some_work(stop_token); -/// drop(stop_source); // At this point, scheduled work notices that it is canceled. +/// let source = StopSource::new(); +/// let token = source.token(); +/// schedule_some_work(token); +/// drop(source); // At this point, scheduled work notices that it is canceled. /// ``` #[derive(Debug)] pub struct StopSource { @@ -50,7 +50,7 @@ impl StopSource { /// Produces a new `StopToken`, associated with this source. /// /// Once the source is destroyed, `StopToken` future completes. - pub fn stop_token(&self) -> StopToken { + pub fn token(&self) -> StopToken { self.stop_token.clone() } } diff --git a/tests/tests.rs b/tests/tests.rs index f302230..53cf9ac 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -12,13 +12,13 @@ use stop_token::StopSource; fn smoke() { task::block_on(async { let (sender, receiver) = bounded::(10); - let stop_source = StopSource::new(); + let source = StopSource::new(); let task = task::spawn({ - let stop_token = stop_source.stop_token(); + let token = source.token(); let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(stop_token); + let mut stream = receiver.until(token); while let Some(Ok(x)) = stream.next().await { xs.push(x) } @@ -30,7 +30,7 @@ fn smoke() { sender.send(3).await.unwrap(); task::sleep(Duration::from_millis(250)).await; - drop(stop_source); + drop(source); task::sleep(Duration::from_millis(250)).await; sender.send(4).await.unwrap(); From 492757042abb5d0c0ab9668d7a58a1008db50899 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 4 Oct 2021 12:58:55 +0200 Subject: [PATCH 11/27] 0.4.0 --- Cargo.toml | 2 +- src/lib.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe9ff47..22e72ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.3.4" +version = "0.4.0" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" diff --git a/src/lib.rs b/src/lib.rs index 9abe5dc..fff1ed8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,10 @@ //! //! # Usage //! -//! You can use this crate to create a deadline received through a [`StopToken`]: +//! You can use this crate to create a deadline received through a +//! [`StopToken`]. You can think of a `StopSource` + `StopToken` as a +//! single-producer, multi-consumer channel that receives a single message to +//! "stop" when the producer is dropped: //! //! ``` //! use async_std::prelude::*; @@ -72,6 +75,7 @@ //! Or `Duration` or `Instant` to create a [`time`]-based deadline: //! //! ``` +//! # #![allow(dead_code)] //! use async_std::prelude::*; //! use async_std::stream; //! @@ -79,6 +83,9 @@ //! //! use std::time::Duration; //! +//! # #[cfg(feature = "tokio")] +//! # fn main() {} +//! # #[cfg(not(feature = "tokio"))] //! #[async_std::main] //! async fn main() { //! // Create a stream that generates numbers for 100 millis. From 66981d4c625e85de4c9c01187dfef958efc2bc0e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 19:06:05 +0200 Subject: [PATCH 12/27] fix FutureExt impl --- src/future.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/future.rs b/src/future.rs index ef4729e..5df8cf3 100644 --- a/src/future.rs +++ b/src/future.rs @@ -22,6 +22,8 @@ pub trait FutureExt: Future { } } +impl FutureExt for F {} + pin_project! { /// Run a future until it resolves, or until a deadline is hit. /// From f700e35325f11154bed79c8fd7807bcfa4179481 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 19:06:16 +0200 Subject: [PATCH 13/27] v0.4.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 22e72ed..b3abbe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.4.0" +version = "0.4.1" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From 4a1a973846b5028d1225af8f10c79fd6c2821f13 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 19:23:13 +0200 Subject: [PATCH 14/27] Allow deadlines to be cloned --- src/deadline.rs | 2 +- src/time.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/deadline.rs b/src/deadline.rs index df80381..fbd8887 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -38,7 +38,7 @@ impl fmt::Display for TimedOutError { /// A deadline is a future which resolves after a certain period or event. pub trait IntoDeadline { /// Which kind of future are we turning this into? - type Deadline: Future; + type Deadline: Future + Clone; /// Creates a deadline from a value. fn into_deadline(self) -> Self::Deadline; diff --git a/src/time.rs b/src/time.rs index 5ae039e..7bb7522 100644 --- a/src/time.rs +++ b/src/time.rs @@ -38,6 +38,7 @@ mod asyncio { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; + use std::time::Instant; use crate::IntoDeadline; @@ -48,11 +49,21 @@ mod asyncio { #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] pub struct Deadline { + instant: Instant, #[pin] delay: Timer, } } + impl Clone for Deadline { + fn clone(&self) -> Self { + Self { + instant: self.instant, + delay: Timer::at(self.instant), + } + } + } + impl Future for Deadline { type Output = (); @@ -69,7 +80,9 @@ mod asyncio { type Deadline = Deadline; fn into_deadline(self) -> Self::Deadline { + let instant = Instant::now() + self; Deadline { + instant, delay: Timer::after(self), } } @@ -80,6 +93,7 @@ mod asyncio { fn into_deadline(self) -> Self::Deadline { Deadline { + instant: self, delay: Timer::at(self), } } @@ -102,9 +116,20 @@ mod tokiooo { #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] pub struct Deadline { + instant: TokioInstant, delay: Pin>>>, } + impl Clone for Deadline { + fn clone(&self) -> Self { + let instant = self.instant.clone(); + Self { + instant, + delay: Box::pin(timeout_at(instant, pending())), + } + } + } + impl Future for Deadline { type Output = (); @@ -120,7 +145,9 @@ mod tokiooo { type Deadline = Deadline; fn into_deadline(self) -> Self::Deadline { + let instant = std::time::Instant::now() + self; Deadline { + instant: instant.into(), delay: Box::pin(timeout(self, pending())), } } @@ -132,6 +159,7 @@ mod tokiooo { fn into_deadline(self) -> Self::Deadline { let instant = TokioInstant::from(self); Deadline { + instant, delay: Box::pin(timeout_at(instant, pending())), } } From f70322953ab2c141358a9e3d14b43d524247b518 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 19:28:28 +0200 Subject: [PATCH 15/27] create tokio / async-io submodules --- src/async_io.rs | 95 +++++++++++++++++++++++++++ src/lib.rs | 6 +- src/time.rs | 167 ------------------------------------------------ src/tokio.rs | 90 ++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 168 deletions(-) create mode 100644 src/async_io.rs delete mode 100644 src/time.rs create mode 100644 src/tokio.rs diff --git a/src/async_io.rs b/src/async_io.rs new file mode 100644 index 0000000..a805c85 --- /dev/null +++ b/src/async_io.rs @@ -0,0 +1,95 @@ +//! Create deadlines from `Duration` and `Instant` types. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, until: Instant) { +//! let mut work = work.until(until); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` + +use async_io::Timer; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use crate::IntoDeadline; + +use pin_project_lite::pin_project; + +pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + instant: Instant, + #[pin] + delay: Timer, + } +} + +impl Clone for Deadline { + fn clone(&self) -> Self { + Self { + instant: self.instant, + delay: Timer::at(self.instant), + } + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} + +impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + let instant = Instant::now() + self; + Deadline { + instant, + delay: Timer::after(self), + } + } +} + +impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + instant: self, + delay: Timer::at(self), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index fff1ed8..6d43632 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,11 @@ pub mod future; pub mod stream; -pub mod time; + +#[cfg(any(feature = "async-io", feature = "docs"))] +pub mod async_io; +#[cfg(feature = "tokio")] +pub mod tokio; mod deadline; mod stop_source; diff --git a/src/time.rs b/src/time.rs deleted file mode 100644 index 7bb7522..0000000 --- a/src/time.rs +++ /dev/null @@ -1,167 +0,0 @@ -//! Create deadlines from `Duration` and `Instant` types. -//! -//! # Features -//! -//! This module is empty when no features are enabled. To implement deadlines -//! for `Instant` and `Duration` you can enable one of the following features: -//! -//! - `async-io`: use this when using the `async-std` or `smol` runtimes. -//! - `tokio`: use this when using the `tokio` runtime. -//! -//! # Examples -//! -//! ``` -//! use std::time::Instant; -//! use async_std::prelude::*; -//! use stop_token::prelude::*; -//! use stop_token::StopToken; -//! -//! struct Event; -//! -//! async fn do_work(work: impl Stream + Unpin, until: Instant) { -//! let mut work = work.until(until); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await -//! } -//! } -//! -//! async fn process_event(_event: Event) { -//! } -//! ``` - -#[cfg(feature = "async-io")] -pub use asyncio::*; - -#[cfg(any(feature = "async-io", feature = "docs"))] -mod asyncio { - use async_io::Timer; - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - use std::time::Instant; - - use crate::IntoDeadline; - - use pin_project_lite::pin_project; - - pin_project! { - /// A future that times out after a duration of time. - #[must_use = "Futures do nothing unless polled or .awaited"] - #[derive(Debug)] - pub struct Deadline { - instant: Instant, - #[pin] - delay: Timer, - } - } - - impl Clone for Deadline { - fn clone(&self) -> Self { - Self { - instant: self.instant, - delay: Timer::at(self.instant), - } - } - } - - impl Future for Deadline { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.delay.poll(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - } - } - } - - impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - let instant = Instant::now() + self; - Deadline { - instant, - delay: Timer::after(self), - } - } - } - - impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { - instant: self, - delay: Timer::at(self), - } - } - } -} - -#[cfg(feature = "tokio")] -pub use tokiooo::*; - -#[cfg(feature = "tokio")] -mod tokiooo { - use std::future::{pending, Future, Pending}; - use std::pin::Pin; - use std::task::{Context, Poll}; - use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; - - use crate::IntoDeadline; - - /// A future that times out after a duration of time. - #[must_use = "Futures do nothing unless polled or .awaited"] - #[derive(Debug)] - pub struct Deadline { - instant: TokioInstant, - delay: Pin>>>, - } - - impl Clone for Deadline { - fn clone(&self) -> Self { - let instant = self.instant.clone(); - Self { - instant, - delay: Box::pin(timeout_at(instant, pending())), - } - } - } - - impl Future for Deadline { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - } - } - } - - impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - let instant = std::time::Instant::now() + self; - Deadline { - instant: instant.into(), - delay: Box::pin(timeout(self, pending())), - } - } - } - - impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - let instant = TokioInstant::from(self); - Deadline { - instant, - delay: Box::pin(timeout_at(instant, pending())), - } - } - } -} diff --git a/src/tokio.rs b/src/tokio.rs new file mode 100644 index 0000000..86010c9 --- /dev/null +++ b/src/tokio.rs @@ -0,0 +1,90 @@ +//! Create deadlines from `Duration` and `Instant` types. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, until: Instant) { +//! let mut work = work.until(until); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` + +use std::future::{pending, Future, Pending}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; + +use crate::IntoDeadline; + +/// A future that times out after a duration of time. +#[must_use = "Futures do nothing unless polled or .awaited"] +#[derive(Debug)] +pub struct Deadline { + instant: TokioInstant, + delay: Pin>>>, +} + +impl Clone for Deadline { + fn clone(&self) -> Self { + let instant = self.instant.clone(); + Self { + instant, + delay: Box::pin(timeout_at(instant, pending())), + } + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} + +impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + let instant = std::time::Instant::now() + self; + Deadline { + instant: instant.into(), + delay: Box::pin(timeout(self, pending())), + } + } +} + +impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + let instant = TokioInstant::from(self); + Deadline { + instant, + delay: Box::pin(timeout_at(instant, pending())), + } + } +} From f835ffe944313e6521939262b7bddaf729d2d028 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 20:19:57 +0200 Subject: [PATCH 16/27] create a concrete Deadline type --- Cargo.toml | 2 ++ src/async_io.rs | 21 +++++++----- src/deadline.rs | 85 +++++++++++++++++++++++++++++++++++++++++++--- src/future.rs | 13 ++++--- src/lib.rs | 2 +- src/stop_source.rs | 10 +++--- src/stream.rs | 15 ++++---- src/tokio.rs | 47 +++++-------------------- tests/tests.rs | 3 +- 9 files changed, 125 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b3abbe8..6a315d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [features] docs = ["async-io"] +all = ["tokio", "async-io"] [dependencies] pin-project-lite = "0.2.0" @@ -23,6 +24,7 @@ async-channel = "1.6.1" futures-core = "0.3.17" tokio = { version = "1.12.0", features = ["time"], optional = true } async-io = { version = "1.6.0", optional = true } +cfg-if = "1.0.0" [dev-dependencies] async-std = { version = "1.10.0", features = ["attributes"] } diff --git a/src/async_io.rs b/src/async_io.rs index a805c85..ace9e66 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -43,7 +43,7 @@ pin_project! { /// A future that times out after a duration of time. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Deadline { + pub(crate) struct Deadline { instant: Instant, #[pin] delay: Timer, @@ -72,24 +72,27 @@ impl Future for Deadline { } impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { + fn into_deadline(self) -> crate::Deadline { let instant = Instant::now() + self; - Deadline { + + let deadline = Deadline { instant, delay: Timer::after(self), + }; + crate::Deadline { + kind: crate::deadline::DeadlineKind::AsyncIo { t: deadline }, } } } impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { + fn into_deadline(self) -> crate::Deadline { + let deadline = Deadline { instant: self, delay: Timer::at(self), + }; + crate::Deadline { + kind: crate::deadline::DeadlineKind::AsyncIo { t: deadline }, } } } diff --git a/src/deadline.rs b/src/deadline.rs index fbd8887..e0f6f04 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -1,5 +1,13 @@ use core::fmt; -use std::{error::Error, future::Future, io}; +use std::{ + error::Error, + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::StopToken; /// An error returned when a future times out. #[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] @@ -37,9 +45,76 @@ impl fmt::Display for TimedOutError { /// /// A deadline is a future which resolves after a certain period or event. pub trait IntoDeadline { - /// Which kind of future are we turning this into? - type Deadline: Future + Clone; - /// Creates a deadline from a value. - fn into_deadline(self) -> Self::Deadline; + fn into_deadline(self) -> Deadline; +} + +pin_project_lite::pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + #[pin] + pub(crate) kind: DeadlineKind, + } +} + +cfg_if::cfg_if! { + if #[cfg(all(feature = "tokio", feature = "async-io"))] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + Tokio{#[pin]t: crate::tokio::Deadline}, + AsyncIo{#[pin]t: crate::async_io::Deadline}, + } + } + } else if #[cfg(feature = "tokio")] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + Tokio{#[pin]t: crate::tokio::Deadline}, + } + } + } else if #[cfg(feature = "async-io")] { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + AsyncIo{#[pin]t: crate::async_io::Deadline}, + } + } + } else { + pin_project_lite::pin_project! { + #[project = DeadlineKindProj] + #[derive(Debug)] + pub(crate) enum DeadlineKind { + StopToken{ #[pin]t: StopToken}, + } + } + } +} + +impl IntoDeadline for Deadline { + fn into_deadline(self) -> Deadline { + self + } +} + +impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project().kind.project() { + DeadlineKindProj::StopToken { t } => t.poll(cx), + #[cfg(feature = "tokio")] + DeadlineKindProj::Tokio { t } => t.poll(cx), + #[cfg(feature = "async-io")] + DeadlineKindProj::AsyncIo { t } => t.poll(cx), + } + } } diff --git a/src/future.rs b/src/future.rs index 5df8cf3..3e7d1b1 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Future` trait. -use crate::{deadline::TimedOutError, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline, IntoDeadline}; use core::future::Future; use core::pin::Pin; @@ -10,10 +10,10 @@ use std::task::{Context, Poll}; /// Extend the `Future` trait with the `until` method. pub trait FutureExt: Future { /// Run a future until it resolves, or until a deadline is hit. - fn until(self, target: T) -> Stop + fn until(self, target: T) -> Stop where Self: Sized, - T: IntoDeadline, + T: IntoDeadline, { Stop { deadline: target.into_deadline(), @@ -30,18 +30,17 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Stop { + pub struct Stop { #[pin] future: F, #[pin] - deadline: D, + deadline: Deadline, } } -impl Future for Stop +impl Future for Stop where F: Future, - D: Future, { type Output = Result; diff --git a/src/lib.rs b/src/lib.rs index 6d43632..f027627 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,7 +127,7 @@ pub mod tokio; mod deadline; mod stop_source; -pub use deadline::{IntoDeadline, TimedOutError}; +pub use deadline::{Deadline, IntoDeadline, TimedOutError}; pub use stop_source::{StopSource, StopToken}; /// A prelude for `stop-token`. diff --git a/src/stop_source.rs b/src/stop_source.rs index c46ade6..ae19a27 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -5,6 +5,8 @@ use core::task::{Context, Poll}; use async_channel::{bounded, Receiver, Sender}; use futures_core::stream::Stream; +use crate::Deadline; + enum Never {} /// `StopSource` produces `StopToken` and cancels all of its tokens on drop. @@ -56,10 +58,10 @@ impl StopSource { } impl super::IntoDeadline for StopToken { - type Deadline = Self; - - fn into_deadline(self) -> Self::Deadline { - self + fn into_deadline(self) -> Deadline { + Deadline { + kind: crate::deadline::DeadlineKind::StopToken { t: self }, + } } } diff --git a/src/stream.rs b/src/stream.rs index 7cb4ea8..9e682d0 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::{deadline::TimedOutError, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline, IntoDeadline}; use core::future::Future; use core::pin::Pin; @@ -12,10 +12,10 @@ use std::task::{Context, Poll}; pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, target: T) -> Stop + fn until(self, target: T) -> Stop where Self: Sized, - T: IntoDeadline, + T: IntoDeadline, { Stop { stream: self, @@ -32,25 +32,24 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Stop { + pub struct Stop { #[pin] stream: S, #[pin] - deadline: D, + deadline: Deadline, } } -impl Stop { +impl Stop { /// Unwraps this `Stop` stream, returning the underlying stream. pub fn into_inner(self) -> S { self.stream } } -impl Stream for Stop +impl Stream for Stop where S: Stream, - D: Future, { type Item = Result; diff --git a/src/tokio.rs b/src/tokio.rs index 86010c9..0b74277 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -9,37 +9,18 @@ //! - `tokio`: use this when using the `tokio` runtime. //! //! # Examples -//! -//! ``` -//! use std::time::Instant; -//! use async_std::prelude::*; -//! use stop_token::prelude::*; -//! use stop_token::StopToken; -//! -//! struct Event; -//! -//! async fn do_work(work: impl Stream + Unpin, until: Instant) { -//! let mut work = work.until(until); -//! while let Some(Ok(event)) = work.next().await { -//! process_event(event).await -//! } -//! } -//! -//! async fn process_event(_event: Event) { -//! } -//! ``` use std::future::{pending, Future, Pending}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; +use tokio::time::{timeout_at, Instant as TokioInstant, Timeout}; use crate::IntoDeadline; /// A future that times out after a duration of time. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] -pub struct Deadline { +pub(crate) struct Deadline { instant: TokioInstant, delay: Pin>>>, } @@ -65,26 +46,16 @@ impl Future for Deadline { } } -impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - let instant = std::time::Instant::now() + self; - Deadline { - instant: instant.into(), - delay: Box::pin(timeout(self, pending())), - } - } -} - -impl IntoDeadline for std::time::Instant { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { +impl IntoDeadline for tokio::time::Instant { + fn into_deadline(self) -> crate::Deadline { let instant = TokioInstant::from(self); - Deadline { + let deadline = Deadline { instant, delay: Box::pin(timeout_at(instant, pending())), + }; + + crate::Deadline { + kind: crate::deadline::DeadlineKind::Tokio { t: deadline }, } } } diff --git a/tests/tests.rs b/tests/tests.rs index 53cf9ac..6732021 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -72,12 +72,13 @@ fn async_io_time() { #[cfg(feature = "tokio")] #[tokio::test] async fn tokio_time() { + use tokio::time::Instant; let (sender, receiver) = bounded::(10); let task = tokio::task::spawn({ let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(Duration::from_millis(200)); + let mut stream = receiver.until(Instant::now() + Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) } From c046bc42a62797203a214cacf80c100a690c8b83 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 20:25:03 +0200 Subject: [PATCH 17/27] remove IntoDeadline --- src/async_io.rs | 10 ++++------ src/deadline.rs | 14 -------------- src/future.rs | 6 +++--- src/lib.rs | 4 ++-- src/stop_source.rs | 4 ++-- src/stream.rs | 6 +++--- src/tokio.rs | 6 ++---- 7 files changed, 16 insertions(+), 34 deletions(-) diff --git a/src/async_io.rs b/src/async_io.rs index ace9e66..92a9840 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -35,8 +35,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; -use crate::IntoDeadline; - use pin_project_lite::pin_project; pin_project! { @@ -71,8 +69,8 @@ impl Future for Deadline { } } -impl IntoDeadline for std::time::Duration { - fn into_deadline(self) -> crate::Deadline { +impl Into for std::time::Duration { + fn into(self) -> crate::Deadline { let instant = Instant::now() + self; let deadline = Deadline { @@ -85,8 +83,8 @@ impl IntoDeadline for std::time::Duration { } } -impl IntoDeadline for std::time::Instant { - fn into_deadline(self) -> crate::Deadline { +impl Into for std::time::Instant { + fn into(self) -> crate::Deadline { let deadline = Deadline { instant: self, delay: Timer::at(self), diff --git a/src/deadline.rs b/src/deadline.rs index e0f6f04..c7a21d1 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -41,14 +41,6 @@ impl fmt::Display for TimedOutError { } } -/// Conversion into a deadline. -/// -/// A deadline is a future which resolves after a certain period or event. -pub trait IntoDeadline { - /// Creates a deadline from a value. - fn into_deadline(self) -> Deadline; -} - pin_project_lite::pin_project! { /// A future that times out after a duration of time. #[must_use = "Futures do nothing unless polled or .awaited"] @@ -99,12 +91,6 @@ cfg_if::cfg_if! { } } -impl IntoDeadline for Deadline { - fn into_deadline(self) -> Deadline { - self - } -} - impl Future for Deadline { type Output = (); diff --git a/src/future.rs b/src/future.rs index 3e7d1b1..7595200 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Future` trait. -use crate::{deadline::TimedOutError, Deadline, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline}; use core::future::Future; use core::pin::Pin; @@ -13,10 +13,10 @@ pub trait FutureExt: Future { fn until(self, target: T) -> Stop where Self: Sized, - T: IntoDeadline, + T: Into, { Stop { - deadline: target.into_deadline(), + deadline: target.into(), future: self, } } diff --git a/src/lib.rs b/src/lib.rs index f027627..2108b66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,7 +101,7 @@ //! //! # Features //! -//! The `time` submodule is empty when no features are enabled. To implement [`IntoDeadline`] +//! The `time` submodule is empty when no features are enabled. To implement `Into` //! for `Instant` and `Duration` you can enable one of the following features: //! //! - `async-io`: for use with the `async-std` or `smol` runtimes. @@ -127,7 +127,7 @@ pub mod tokio; mod deadline; mod stop_source; -pub use deadline::{Deadline, IntoDeadline, TimedOutError}; +pub use deadline::{Deadline, TimedOutError}; pub use stop_source::{StopSource, StopToken}; /// A prelude for `stop-token`. diff --git a/src/stop_source.rs b/src/stop_source.rs index ae19a27..a119592 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -57,8 +57,8 @@ impl StopSource { } } -impl super::IntoDeadline for StopToken { - fn into_deadline(self) -> Deadline { +impl Into for StopToken { + fn into(self) -> Deadline { Deadline { kind: crate::deadline::DeadlineKind::StopToken { t: self }, } diff --git a/src/stream.rs b/src/stream.rs index 9e682d0..b09a983 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::{deadline::TimedOutError, Deadline, IntoDeadline}; +use crate::{deadline::TimedOutError, Deadline}; use core::future::Future; use core::pin::Pin; @@ -15,11 +15,11 @@ pub trait StreamExt: Stream { fn until(self, target: T) -> Stop where Self: Sized, - T: IntoDeadline, + T: Into, { Stop { stream: self, - deadline: target.into_deadline(), + deadline: target.into(), } } } diff --git a/src/tokio.rs b/src/tokio.rs index 0b74277..5444675 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -15,8 +15,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::time::{timeout_at, Instant as TokioInstant, Timeout}; -use crate::IntoDeadline; - /// A future that times out after a duration of time. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] @@ -46,8 +44,8 @@ impl Future for Deadline { } } -impl IntoDeadline for tokio::time::Instant { - fn into_deadline(self) -> crate::Deadline { +impl Into for tokio::time::Instant { + fn into(self) -> crate::Deadline { let instant = TokioInstant::from(self); let deadline = Deadline { instant, From 9c7223beea06d2bd739ebaf1ca91297f02c58c58 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 20:30:03 +0200 Subject: [PATCH 18/27] v0.5.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6a315d8..64cd784 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.4.1" +version = "0.5.0" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From c77337adc9cb0babc34bbeed738c55313bd2bd85 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 20:32:25 +0200 Subject: [PATCH 19/27] ' fix cloning' --- src/deadline.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/deadline.rs b/src/deadline.rs index c7a21d1..033abdc 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -55,7 +55,7 @@ cfg_if::cfg_if! { if #[cfg(all(feature = "tokio", feature = "async-io"))] { pin_project_lite::pin_project! { #[project = DeadlineKindProj] - #[derive(Debug)] + #[derive(Debug, Clone)] pub(crate) enum DeadlineKind { StopToken{ #[pin]t: StopToken}, Tokio{#[pin]t: crate::tokio::Deadline}, @@ -65,7 +65,7 @@ cfg_if::cfg_if! { } else if #[cfg(feature = "tokio")] { pin_project_lite::pin_project! { #[project = DeadlineKindProj] - #[derive(Debug)] + #[derive(Debug, Clone)] pub(crate) enum DeadlineKind { StopToken{ #[pin]t: StopToken}, Tokio{#[pin]t: crate::tokio::Deadline}, @@ -74,7 +74,7 @@ cfg_if::cfg_if! { } else if #[cfg(feature = "async-io")] { pin_project_lite::pin_project! { #[project = DeadlineKindProj] - #[derive(Debug)] + #[derive(Debug, Clone)] pub(crate) enum DeadlineKind { StopToken{ #[pin]t: StopToken}, AsyncIo{#[pin]t: crate::async_io::Deadline}, @@ -83,7 +83,7 @@ cfg_if::cfg_if! { } else { pin_project_lite::pin_project! { #[project = DeadlineKindProj] - #[derive(Debug)] + #[derive(Debug, Clone)] pub(crate) enum DeadlineKind { StopToken{ #[pin]t: StopToken}, } @@ -91,6 +91,14 @@ cfg_if::cfg_if! { } } +impl Clone for Deadline { + fn clone(&self) -> Self { + Self { + kind: self.kind.clone(), + } + } +} + impl Future for Deadline { type Output = (); From e21ee6b46d9eb1644304442a818275f6cc1342f1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 20:32:34 +0200 Subject: [PATCH 20/27] v0.5.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 64cd784..8cb19e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.5.0" +version = "0.5.1" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From c7d7c8ba922f989886a27267c56dcbb0e385fb98 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 16 Oct 2021 00:00:34 +0200 Subject: [PATCH 21/27] rename future names and add async-std task support --- Cargo.toml | 10 ++++++---- src/async_std.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ src/future.rs | 8 ++++---- src/lib.rs | 2 ++ src/stream.rs | 10 +++++----- 5 files changed, 68 insertions(+), 13 deletions(-) create mode 100644 src/async_std.rs diff --git a/Cargo.toml b/Cargo.toml index 8cb19e8..6ed7aaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,16 +15,18 @@ features = ["docs"] rustdoc-args = ["--cfg", "feature=\"docs\""] [features] +all = ["tokio", "async-io", "async-std"] docs = ["async-io"] -all = ["tokio", "async-io"] [dependencies] -pin-project-lite = "0.2.0" async-channel = "1.6.1" -futures-core = "0.3.17" -tokio = { version = "1.12.0", features = ["time"], optional = true } +async-global-executor = { version = "2.0.2", optional = true } async-io = { version = "1.6.0", optional = true } +async-std = { version = "1.10.0", optional = true } cfg-if = "1.0.0" +futures-core = "0.3.17" +pin-project-lite = "0.2.0" +tokio = { version = "1.12.0", features = ["time"], optional = true } [dev-dependencies] async-std = { version = "1.10.0", features = ["attributes"] } diff --git a/src/async_std.rs b/src/async_std.rs new file mode 100644 index 0000000..b3016b2 --- /dev/null +++ b/src/async_std.rs @@ -0,0 +1,51 @@ +use async_std::task::JoinHandle; + +/// Extend the `Task` type `until` method. +pub trait TaskExt { + /// Run a future until it resolves, or until a deadline is hit. + fn until(self, target: T) -> Until + where + Self: Sized, + T: Into, + { + Until { + deadline: target.into(), + join_handle: self, + } + } +} + +impl JoinHandleExt for JoinHandle {} + +pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Until { + #[pin] + futur_handlee: F, + #[pin] + deadline: Deadline, + } +} + +impl Future for Until +where + F: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(()) = this.deadline.poll(cx) { + let _fut = this.join_handle.cancel(); + return Poll::Ready(Err(TimedOutError::new())); + } + match this.join_handle.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(it) => Poll::Ready(Ok(it)), + } + } +} diff --git a/src/future.rs b/src/future.rs index 7595200..3201e0e 100644 --- a/src/future.rs +++ b/src/future.rs @@ -10,12 +10,12 @@ use std::task::{Context, Poll}; /// Extend the `Future` trait with the `until` method. pub trait FutureExt: Future { /// Run a future until it resolves, or until a deadline is hit. - fn until(self, target: T) -> Stop + fn until(self, target: T) -> Until where Self: Sized, T: Into, { - Stop { + Until { deadline: target.into(), future: self, } @@ -30,7 +30,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Stop { + pub struct Until { #[pin] future: F, #[pin] @@ -38,7 +38,7 @@ pin_project! { } } -impl Future for Stop +impl Future for Until where F: Future, { diff --git a/src/lib.rs b/src/lib.rs index 2108b66..a4cada5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,6 +121,8 @@ pub mod stream; #[cfg(any(feature = "async-io", feature = "docs"))] pub mod async_io; +#[cfg(feature = "async_std")] +pub mod async_std; #[cfg(feature = "tokio")] pub mod tokio; diff --git a/src/stream.rs b/src/stream.rs index b09a983..371bd26 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -12,12 +12,12 @@ use std::task::{Context, Poll}; pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, target: T) -> Stop + fn until(self, target: T) -> Until where Self: Sized, T: Into, { - Stop { + Until { stream: self, deadline: target.into(), } @@ -32,7 +32,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Stop { + pub struct Until { #[pin] stream: S, #[pin] @@ -40,14 +40,14 @@ pin_project! { } } -impl Stop { +impl Until { /// Unwraps this `Stop` stream, returning the underlying stream. pub fn into_inner(self) -> S { self.stream } } -impl Stream for Stop +impl Stream for Until where S: Stream, { From 809c7c278e57f29c987fd5d1298b36bbf3cbb885 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 16 Oct 2021 00:00:50 +0200 Subject: [PATCH 22/27] v0.6.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6ed7aaa..15172e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.5.1" +version = "0.6.0" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From 63042722d5c9cef501ba6a911003d6f427d49f80 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 28 Oct 2021 11:31:13 +0200 Subject: [PATCH 23/27] lower tokio version --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 15172e0..affea52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,8 @@ async-std = { version = "1.10.0", optional = true } cfg-if = "1.0.0" futures-core = "0.3.17" pin-project-lite = "0.2.0" -tokio = { version = "1.12.0", features = ["time"], optional = true } +tokio = { version = "1.9.0", features = ["time"], optional = true } [dev-dependencies] async-std = { version = "1.10.0", features = ["attributes"] } -tokio = { version = "1.12.0", features = ["rt", "macros"] } +tokio = { version = "1.9.0", features = ["rt", "macros"] } From 72609fe7153094a3d96839179921dd5e021cfc74 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 28 Oct 2021 11:31:26 +0200 Subject: [PATCH 24/27] 0.6.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index affea52..908d44e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.6.0" +version = "0.6.1" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0" From 639c14329e5ad29c9938e9abf1cf61ce58443512 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 19 Jan 2022 23:05:07 +0100 Subject: [PATCH 25/27] rename `until` to `timeout_at` --- README.md | 8 ++++---- src/async_io.rs | 4 ++-- src/async_std.rs | 8 ++++---- src/future.rs | 8 ++++---- src/lib.rs | 6 +++--- src/stream.rs | 10 +++++----- tests/tests.rs | 6 +++--- 7 files changed, 25 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index a15d423..7645ed3 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ use std::time::Duration; async fn main() { // Create a stop source and generate a token. let src = StopSource::new(); - let stop = src.token(); + let deadline = src.token(); // When stop source is dropped, the loop will stop. // Move the source to a task, and drop it after 100 millis. @@ -69,7 +69,7 @@ async fn main() { // Create a stream that generates numbers until // it receives a signal it needs to stop. - let mut work = stream::repeat(12u8).until(stop); + let mut work = stream::repeat(12u8).timeout_at(deadline); // Loop over each item in the stream. while let Some(Ok(ev)) = work.next().await { @@ -91,8 +91,8 @@ use std::time::Duration; #[async_std::main] async fn main() { // Create a stream that generates numbers for 100 millis. - let stop = Duration::from_millis(100); - let mut work = stream::repeat(12u8).until(stop); + let deadline = Duration::from_millis(100); + let mut work = stream::repeat(12u8).timeout_at(deadline); // Loop over each item in the stream. while let Some(Ok(ev)) = work.next().await { diff --git a/src/async_io.rs b/src/async_io.rs index 92a9840..66a3b09 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -18,8 +18,8 @@ //! //! struct Event; //! -//! async fn do_work(work: impl Stream + Unpin, until: Instant) { -//! let mut work = work.until(until); +//! async fn do_work(work: impl Stream + Unpin, deadline: Instant) { +//! let mut work = work.timeout_at(deadline); //! while let Some(Ok(event)) = work.next().await { //! process_event(event).await //! } diff --git a/src/async_std.rs b/src/async_std.rs index b3016b2..143f4e2 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -3,12 +3,12 @@ use async_std::task::JoinHandle; /// Extend the `Task` type `until` method. pub trait TaskExt { /// Run a future until it resolves, or until a deadline is hit. - fn until(self, target: T) -> Until + fn timeout_at(self, target: T) -> TimeoutAt where Self: Sized, T: Into, { - Until { + TimeoutAt { deadline: target.into(), join_handle: self, } @@ -23,7 +23,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Until { + pub struct TimeoutAt { #[pin] futur_handlee: F, #[pin] @@ -31,7 +31,7 @@ pin_project! { } } -impl Future for Until +impl Future for TimeoutAt where F: Future, { diff --git a/src/future.rs b/src/future.rs index 3201e0e..c03a396 100644 --- a/src/future.rs +++ b/src/future.rs @@ -10,12 +10,12 @@ use std::task::{Context, Poll}; /// Extend the `Future` trait with the `until` method. pub trait FutureExt: Future { /// Run a future until it resolves, or until a deadline is hit. - fn until(self, target: T) -> Until + fn timeout_at(self, target: T) -> TimeoutAt where Self: Sized, T: Into, { - Until { + TimeoutAt { deadline: target.into(), future: self, } @@ -30,7 +30,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Until { + pub struct TimeoutAt { #[pin] future: F, #[pin] @@ -38,7 +38,7 @@ pin_project! { } } -impl Future for Until +impl Future for TimeoutAt where F: Future, { diff --git a/src/lib.rs b/src/lib.rs index a4cada5..5bcbd29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ //! //! // Create a stream that generates numbers until //! // it receives a signal it needs to stop. -//! let mut work = stream::repeat(12u8).until(stop); +//! let mut work = stream::repeat(12u8).timeout_at(stop); //! //! // Loop over each item in the stream. //! while let Some(Ok(ev)) = work.next().await { @@ -72,7 +72,7 @@ //! } //! ``` //! -//! Or `Duration` or `Instant` to create a [`time`]-based deadline: +//! Or `Duration` or `Instant` to create a `time`-based deadline: //! //! ``` //! # #![allow(dead_code)] @@ -90,7 +90,7 @@ //! async fn main() { //! // Create a stream that generates numbers for 100 millis. //! let stop = Duration::from_millis(100); -//! let mut work = stream::repeat(12u8).until(stop); +//! let mut work = stream::repeat(12u8).timeout_at(stop); //! //! // Loop over each item in the stream. //! while let Some(Ok(ev)) = work.next().await { diff --git a/src/stream.rs b/src/stream.rs index 371bd26..f932d7d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -12,12 +12,12 @@ use std::task::{Context, Poll}; pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, target: T) -> Until + fn timeout_at(self, target: T) -> TimeoutAt where Self: Sized, T: Into, { - Until { + TimeoutAt { stream: self, deadline: target.into(), } @@ -32,7 +32,7 @@ pin_project! { /// This method is returned by [`FutureExt::deadline`]. #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct Until { + pub struct TimeoutAt { #[pin] stream: S, #[pin] @@ -40,14 +40,14 @@ pin_project! { } } -impl Until { +impl TimeoutAt { /// Unwraps this `Stop` stream, returning the underlying stream. pub fn into_inner(self) -> S { self.stream } } -impl Stream for Until +impl Stream for TimeoutAt where S: Stream, { diff --git a/tests/tests.rs b/tests/tests.rs index 6732021..19c8b03 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -18,7 +18,7 @@ fn smoke() { let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(token); + let mut stream = receiver.timeout_at(token); while let Some(Ok(x)) = stream.next().await { xs.push(x) } @@ -49,7 +49,7 @@ fn async_io_time() { let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(Duration::from_millis(200)); + let mut stream = receiver.timeout_at(Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) } @@ -78,7 +78,7 @@ async fn tokio_time() { let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.until(Instant::now() + Duration::from_millis(200)); + let mut stream = receiver.timeout_at(Instant::now() + Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) } From c10c7138cf1a520560425bc9dc961849e84202c7 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 19 Jan 2022 23:24:44 +0100 Subject: [PATCH 26/27] remove Duration-based deadline --- README.md | 6 +++--- src/async_io.rs | 18 ++---------------- src/lib.rs | 8 ++++---- src/tokio.rs | 2 +- tests/tests.rs | 3 ++- 5 files changed, 12 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 7645ed3..6610fea 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ async fn main() { } ``` -Or `Duration` or `Instant` to create a `time`-based deadline: +Or `Instant` to create a `time`-based deadline: ```rust use async_std::prelude::*; @@ -86,12 +86,12 @@ use async_std::stream; use stop_token::prelude::*; -use std::time::Duration; +use std::time::{Instant, Duration}; #[async_std::main] async fn main() { // Create a stream that generates numbers for 100 millis. - let deadline = Duration::from_millis(100); + let deadline = Instant::now() + Duration::from_millis(100); let mut work = stream::repeat(12u8).timeout_at(deadline); // Loop over each item in the stream. diff --git a/src/async_io.rs b/src/async_io.rs index 66a3b09..911dc59 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -1,9 +1,9 @@ -//! Create deadlines from `Duration` and `Instant` types. +//! Create deadlines from `Instant`. //! //! # Features //! //! This module is empty when no features are enabled. To implement deadlines -//! for `Instant` and `Duration` you can enable one of the following features: +//! for `Instant` you can enable one of the following features: //! //! - `async-io`: use this when using the `async-std` or `smol` runtimes. //! - `tokio`: use this when using the `tokio` runtime. @@ -69,20 +69,6 @@ impl Future for Deadline { } } -impl Into for std::time::Duration { - fn into(self) -> crate::Deadline { - let instant = Instant::now() + self; - - let deadline = Deadline { - instant, - delay: Timer::after(self), - }; - crate::Deadline { - kind: crate::deadline::DeadlineKind::AsyncIo { t: deadline }, - } - } -} - impl Into for std::time::Instant { fn into(self) -> crate::Deadline { let deadline = Deadline { diff --git a/src/lib.rs b/src/lib.rs index 5bcbd29..b51c483 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,7 +72,7 @@ //! } //! ``` //! -//! Or `Duration` or `Instant` to create a `time`-based deadline: +//! Or `Instant` to create a `time`-based deadline: //! //! ``` //! # #![allow(dead_code)] @@ -81,7 +81,7 @@ //! //! use stop_token::prelude::*; //! -//! use std::time::Duration; +//! use std::time::{Duration, Instant}; //! //! # #[cfg(feature = "tokio")] //! # fn main() {} @@ -89,7 +89,7 @@ //! #[async_std::main] //! async fn main() { //! // Create a stream that generates numbers for 100 millis. -//! let stop = Duration::from_millis(100); +//! let stop = Instant::now() + Duration::from_millis(100); //! let mut work = stream::repeat(12u8).timeout_at(stop); //! //! // Loop over each item in the stream. @@ -102,7 +102,7 @@ //! # Features //! //! The `time` submodule is empty when no features are enabled. To implement `Into` -//! for `Instant` and `Duration` you can enable one of the following features: +//! for `Instant` you can enable one of the following features: //! //! - `async-io`: for use with the `async-std` or `smol` runtimes. //! - `tokio`: for use with the `tokio` runtime. diff --git a/src/tokio.rs b/src/tokio.rs index 5444675..8724b8c 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -1,4 +1,4 @@ -//! Create deadlines from `Duration` and `Instant` types. +//! Create deadlines from `Instant`. //! //! # Features //! diff --git a/tests/tests.rs b/tests/tests.rs index 19c8b03..0a700db 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -43,13 +43,14 @@ fn smoke() { #[cfg(feature = "async-io")] #[test] fn async_io_time() { + use std::time::Instant; task::block_on(async { let (sender, receiver) = bounded::(10); let task = task::spawn({ let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = receiver.timeout_at(Duration::from_millis(200)); + let mut stream = receiver.timeout_at(Instant::now() + Duration::from_millis(200)); while let Some(Ok(x)) = stream.next().await { xs.push(x) } From 33a32c175f5d5141e6447381f126622406f295ac Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 19 Jan 2022 23:25:08 +0100 Subject: [PATCH 27/27] v0.7.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 908d44e..3f93db6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stop-token" -version = "0.6.1" +version = "0.7.0" authors = ["Aleksey Kladov ", "Yoshua Wuyts "] edition = "2018" license = "MIT OR Apache-2.0"