From b2e016ebeda1997b5f2cb4971e81b0f24d9f12ef Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 17 Oct 2020 17:59:01 +0200 Subject: [PATCH 1/5] v2.1.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0873d99..4a1d974 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "parallel-stream" -version = "2.1.1" +version = "2.1.2" license = "MIT OR Apache-2.0" repository = "https://github.com/async-rs/parallel-stream" documentation = "https://docs.rs/parallel-stream" From 5e3146a8384766a5822b7435d9e1514108603fcf Mon Sep 17 00:00:00 2001 From: Alexandru Macovei Date: Thu, 21 Jan 2021 12:14:11 +0200 Subject: [PATCH 2/5] Update for async_std 1.9.0 (with pin-project-lite 0.2.0) Previously, the sync module in async_std was unstable in 1.5.0. After stabilization the API was changed, so parallel-stream would fail to build for projects which depend on newer versions of async_std. --- Cargo.toml | 4 ++-- src/par_stream/for_each.rs | 4 ++-- src/par_stream/map.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a1d974..a463aee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ authors = [ [features] [dependencies] -async-std = { version = "1.5.0", features = ["attributes", "unstable"] } -pin-project-lite = "0.1.4" +async-std = { version = "1.9.0", features = ["attributes", "unstable"] } +pin-project-lite = "0.2.0" [dev-dependencies] diff --git a/src/par_stream/for_each.rs b/src/par_stream/for_each.rs index e971db1..ef269f9 100644 --- a/src/par_stream/for_each.rs +++ b/src/par_stream/for_each.rs @@ -1,5 +1,5 @@ +use async_std::channel::{self, Receiver, Sender}; use async_std::prelude::*; -use async_std::sync::{self, Receiver, Sender}; use async_std::task::{self, Context, Poll}; use std::pin::Pin; @@ -32,7 +32,7 @@ impl ForEach { { let exhausted = Arc::new(AtomicBool::new(false)); let ref_count = Arc::new(AtomicU64::new(0)); - let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1); + let (sender, receiver): (Sender<()>, Receiver<()>) = channel::bounded(1); let _limit = stream.get_limit(); // Initialize the return type here to prevent borrowing issues. diff --git a/src/par_stream/map.rs b/src/par_stream/map.rs index e10ca00..9ecfb94 100644 --- a/src/par_stream/map.rs +++ b/src/par_stream/map.rs @@ -1,6 +1,6 @@ // use async_std::prelude::*; +use async_std::channel::{self, Receiver}; use async_std::future::Future; -use async_std::sync::{self, Receiver}; use async_std::task; use std::pin::Pin; @@ -26,7 +26,7 @@ impl Map { F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, Fut: Future + Send, { - let (sender, receiver) = sync::channel(1); + let (sender, receiver) = channel::bounded(1); let limit = stream.get_limit(); task::spawn(async move { while let Some(item) = stream.next().await { From 411ef7aab33ac62b3ec8581d0ddad0cb79232fa2 Mon Sep 17 00:00:00 2001 From: Alexandru Macovei Date: Thu, 21 Jan 2021 12:19:43 +0200 Subject: [PATCH 3/5] Apply some clippy lints: sort_unstable when T is usize --- src/from_parallel_stream.rs | 2 +- src/vec.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/from_parallel_stream.rs b/src/from_parallel_stream.rs index fd26d73..46abac2 100644 --- a/src/from_parallel_stream.rs +++ b/src/from_parallel_stream.rs @@ -18,7 +18,7 @@ async fn is_send() { let v: Vec = vec![1, 2, 3, 4]; let stream = v.into_par_stream().map(|n| async move { n * n }); let mut res = Vec::from_par_stream(stream).await; - res.sort(); + res.sort_unstable(); assert_eq!(res, vec![1, 4, 9, 16]); }) .await; diff --git a/src/vec.rs b/src/vec.rs index d0c6a77..8ed849f 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -94,7 +94,7 @@ async fn smoke() { while let Some(n) = stream.next().await { out.push(n); } - out.sort(); + out.sort_unstable(); assert_eq!(out, vec![1usize, 4, 9, 16]); } From bbc3270f763a5badabff702a2ad9a086ed19c78f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 22:53:48 +0200 Subject: [PATCH 4/5] fix --- src/lib.rs | 2 +- src/par_stream/for_each.rs | 2 +- src/par_stream/map.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3e95b36..4bbf48d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ #![forbid(unsafe_code)] #![deny(missing_debug_implementations, nonstandard_style)] -#![warn(missing_docs, missing_doc_code_examples)] +#![warn(missing_docs)] mod from_parallel_stream; mod from_stream; diff --git a/src/par_stream/for_each.rs b/src/par_stream/for_each.rs index ef269f9..d42cf75 100644 --- a/src/par_stream/for_each.rs +++ b/src/par_stream/for_each.rs @@ -57,7 +57,7 @@ impl ForEach { // Wake up the receiver if we know we're done. ref_count.fetch_sub(1, Ordering::SeqCst); if exhausted.load(Ordering::SeqCst) && ref_count.load(Ordering::SeqCst) == 0 { - sender.send(()).await; + sender.send(()).await.expect("message failed to send"); } }); } diff --git a/src/par_stream/map.rs b/src/par_stream/map.rs index 9ecfb94..abdddb1 100644 --- a/src/par_stream/map.rs +++ b/src/par_stream/map.rs @@ -33,7 +33,7 @@ impl Map { let sender = sender.clone(); task::spawn(async move { let res = f(item).await; - sender.send(res).await; + sender.send(res).await.expect("message failed to send"); }); } }); From 0c52d2579931fe7c4f3485811e7a726b46e69f13 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Oct 2021 22:53:57 +0200 Subject: [PATCH 5/5] v2.1.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a463aee..71948b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "parallel-stream" -version = "2.1.2" +version = "2.1.3" license = "MIT OR Apache-2.0" repository = "https://github.com/async-rs/parallel-stream" documentation = "https://docs.rs/parallel-stream"