diff --git a/Cargo.toml b/Cargo.toml index 0873d99..71948b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "parallel-stream" -version = "2.1.1" +version = "2.1.3" license = "MIT OR Apache-2.0" repository = "https://github.com/async-rs/parallel-stream" documentation = "https://docs.rs/parallel-stream" @@ -16,7 +16,7 @@ authors = [ [features] [dependencies] -async-std = { version = "1.5.0", features = ["attributes", "unstable"] } -pin-project-lite = "0.1.4" +async-std = { version = "1.9.0", features = ["attributes", "unstable"] } +pin-project-lite = "0.2.0" [dev-dependencies] diff --git a/src/from_parallel_stream.rs b/src/from_parallel_stream.rs index fd26d73..46abac2 100644 --- a/src/from_parallel_stream.rs +++ b/src/from_parallel_stream.rs @@ -18,7 +18,7 @@ async fn is_send() { let v: Vec = vec![1, 2, 3, 4]; let stream = v.into_par_stream().map(|n| async move { n * n }); let mut res = Vec::from_par_stream(stream).await; - res.sort(); + res.sort_unstable(); assert_eq!(res, vec![1, 4, 9, 16]); }) .await; diff --git a/src/lib.rs b/src/lib.rs index 3e95b36..4bbf48d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ #![forbid(unsafe_code)] #![deny(missing_debug_implementations, nonstandard_style)] -#![warn(missing_docs, missing_doc_code_examples)] +#![warn(missing_docs)] mod from_parallel_stream; mod from_stream; diff --git a/src/par_stream/for_each.rs b/src/par_stream/for_each.rs index e971db1..d42cf75 100644 --- a/src/par_stream/for_each.rs +++ b/src/par_stream/for_each.rs @@ -1,5 +1,5 @@ +use async_std::channel::{self, Receiver, Sender}; use async_std::prelude::*; -use async_std::sync::{self, Receiver, Sender}; use async_std::task::{self, Context, Poll}; use std::pin::Pin; @@ -32,7 +32,7 @@ impl ForEach { { let exhausted = Arc::new(AtomicBool::new(false)); let ref_count = Arc::new(AtomicU64::new(0)); - let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1); + let (sender, receiver): (Sender<()>, Receiver<()>) = channel::bounded(1); let _limit = stream.get_limit(); // Initialize the return type here to prevent borrowing issues. @@ -57,7 +57,7 @@ impl ForEach { // Wake up the receiver if we know we're done. ref_count.fetch_sub(1, Ordering::SeqCst); if exhausted.load(Ordering::SeqCst) && ref_count.load(Ordering::SeqCst) == 0 { - sender.send(()).await; + sender.send(()).await.expect("message failed to send"); } }); } diff --git a/src/par_stream/map.rs b/src/par_stream/map.rs index e10ca00..abdddb1 100644 --- a/src/par_stream/map.rs +++ b/src/par_stream/map.rs @@ -1,6 +1,6 @@ // use async_std::prelude::*; +use async_std::channel::{self, Receiver}; use async_std::future::Future; -use async_std::sync::{self, Receiver}; use async_std::task; use std::pin::Pin; @@ -26,14 +26,14 @@ impl Map { F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, Fut: Future + Send, { - let (sender, receiver) = sync::channel(1); + let (sender, receiver) = channel::bounded(1); let limit = stream.get_limit(); task::spawn(async move { while let Some(item) = stream.next().await { let sender = sender.clone(); task::spawn(async move { let res = f(item).await; - sender.send(res).await; + sender.send(res).await.expect("message failed to send"); }); } }); diff --git a/src/vec.rs b/src/vec.rs index d0c6a77..8ed849f 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -94,7 +94,7 @@ async fn smoke() { while let Some(n) = stream.next().await { out.push(n); } - out.sort(); + out.sort_unstable(); assert_eq!(out, vec![1usize, 4, 9, 16]); }