Skip to content

Commit 0ceb177

Browse files
bors[bot]yuyuyurekacuviper
authoredFeb 23, 2023
1022: Add TakeAny and SkipAny r=cuviper a=cuviper The existing `take` and `skip` are only part of `IndexedParallelIterator`, while these new adaptors will take/skip from anywhere in a `ParallelIterator`. This PR extends the documentation from rayon-rs#1012. Co-authored-by: Yureka <[email protected]> Co-authored-by: Josh Stone <[email protected]>
2 parents 76a3030 + 8309133 commit 0ceb177

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed
 

‎src/iter/mod.rs

+54
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,12 @@ mod reduce;
141141
mod repeat;
142142
mod rev;
143143
mod skip;
144+
mod skip_any;
144145
mod splitter;
145146
mod step_by;
146147
mod sum;
147148
mod take;
149+
mod take_any;
148150
mod try_fold;
149151
mod try_reduce;
150152
mod try_reduce_with;
@@ -185,9 +187,11 @@ pub use self::{
185187
repeat::{repeat, repeatn, Repeat, RepeatN},
186188
rev::Rev,
187189
skip::Skip,
190+
skip_any::SkipAny,
188191
splitter::{split, Split},
189192
step_by::StepBy,
190193
take::Take,
194+
take_any::TakeAny,
191195
try_fold::{TryFold, TryFoldWith},
192196
update::Update,
193197
while_some::WhileSome,
@@ -2194,6 +2198,56 @@ pub trait ParallelIterator: Sized + Send {
21942198
Intersperse::new(self, element)
21952199
}
21962200

2201+
/// Creates an iterator that yields `n` elements from *anywhere* in the original iterator.
2202+
///
2203+
/// This is similar to [`IndexedParallelIterator::take`] without being
2204+
/// constrained to the "first" `n` of the original iterator order. The
2205+
/// taken items will still maintain their relative order where that is
2206+
/// visible in `collect`, `reduce`, and similar outputs.
2207+
///
2208+
/// # Examples
2209+
///
2210+
/// ```
2211+
/// use rayon::prelude::*;
2212+
///
2213+
/// let result: Vec<_> = (0..100)
2214+
/// .into_par_iter()
2215+
/// .filter(|&x| x % 2 == 0)
2216+
/// .take_any(5)
2217+
/// .collect();
2218+
///
2219+
/// assert_eq!(result.len(), 5);
2220+
/// assert!(result.windows(2).all(|w| w[0] < w[1]));
2221+
/// ```
2222+
fn take_any(self, n: usize) -> TakeAny<Self> {
2223+
TakeAny::new(self, n)
2224+
}
2225+
2226+
/// Creates an iterator that skips `n` elements from *anywhere* in the original iterator.
2227+
///
2228+
/// This is similar to [`IndexedParallelIterator::skip`] without being
2229+
/// constrained to the "first" `n` of the original iterator order. The
2230+
/// remaining items will still maintain their relative order where that is
2231+
/// visible in `collect`, `reduce`, and similar outputs.
2232+
///
2233+
/// # Examples
2234+
///
2235+
/// ```
2236+
/// use rayon::prelude::*;
2237+
///
2238+
/// let result: Vec<_> = (0..100)
2239+
/// .into_par_iter()
2240+
/// .filter(|&x| x % 2 == 0)
2241+
/// .skip_any(5)
2242+
/// .collect();
2243+
///
2244+
/// assert_eq!(result.len(), 45);
2245+
/// assert!(result.windows(2).all(|w| w[0] < w[1]));
2246+
/// ```
2247+
fn skip_any(self, n: usize) -> SkipAny<Self> {
2248+
SkipAny::new(self, n)
2249+
}
2250+
21972251
/// Internal method used to define the behavior of this parallel
21982252
/// iterator. You should not need to call this directly.
21992253
///

‎src/iter/skip_any.rs

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use super::plumbing::*;
2+
use super::*;
3+
use std::sync::atomic::{AtomicUsize, Ordering};
4+
5+
/// `SkipAny` is an iterator that skips over `n` elements from anywhere in `I`.
6+
/// This struct is created by the [`skip_any()`] method on [`ParallelIterator`]
7+
///
8+
/// [`skip_any()`]: trait.ParallelIterator.html#method.skip_any
9+
/// [`ParallelIterator`]: trait.ParallelIterator.html
10+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11+
#[derive(Debug)]
12+
pub struct SkipAny<I: ParallelIterator> {
13+
base: I,
14+
count: AtomicUsize,
15+
}
16+
17+
impl<I> SkipAny<I>
18+
where
19+
I: ParallelIterator,
20+
{
21+
/// Creates a new `SkipAny` iterator.
22+
pub(super) fn new(base: I, count: usize) -> Self {
23+
SkipAny {
24+
base,
25+
count: AtomicUsize::new(count),
26+
}
27+
}
28+
}
29+
30+
impl<I, T> ParallelIterator for SkipAny<I>
31+
where
32+
I: ParallelIterator<Item = T>,
33+
T: Send,
34+
{
35+
type Item = T;
36+
37+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
38+
where
39+
C: UnindexedConsumer<Self::Item>,
40+
{
41+
let consumer1 = SkipAnyConsumer {
42+
base: consumer,
43+
count: &self.count,
44+
};
45+
self.base.drive_unindexed(consumer1)
46+
}
47+
}
48+
49+
/// ////////////////////////////////////////////////////////////////////////
50+
/// Consumer implementation
51+
52+
struct SkipAnyConsumer<'f, C> {
53+
base: C,
54+
count: &'f AtomicUsize,
55+
}
56+
57+
impl<'f, T, C> Consumer<T> for SkipAnyConsumer<'f, C>
58+
where
59+
C: Consumer<T>,
60+
T: Send,
61+
{
62+
type Folder = SkipAnyFolder<'f, C::Folder>;
63+
type Reducer = C::Reducer;
64+
type Result = C::Result;
65+
66+
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
67+
let (left, right, reducer) = self.base.split_at(index);
68+
(
69+
SkipAnyConsumer { base: left, ..self },
70+
SkipAnyConsumer {
71+
base: right,
72+
..self
73+
},
74+
reducer,
75+
)
76+
}
77+
78+
fn into_folder(self) -> Self::Folder {
79+
SkipAnyFolder {
80+
base: self.base.into_folder(),
81+
count: self.count,
82+
}
83+
}
84+
85+
fn full(&self) -> bool {
86+
self.base.full()
87+
}
88+
}
89+
90+
impl<'f, T, C> UnindexedConsumer<T> for SkipAnyConsumer<'f, C>
91+
where
92+
C: UnindexedConsumer<T>,
93+
T: Send,
94+
{
95+
fn split_off_left(&self) -> Self {
96+
SkipAnyConsumer {
97+
base: self.base.split_off_left(),
98+
..*self
99+
}
100+
}
101+
102+
fn to_reducer(&self) -> Self::Reducer {
103+
self.base.to_reducer()
104+
}
105+
}
106+
107+
struct SkipAnyFolder<'f, C> {
108+
base: C,
109+
count: &'f AtomicUsize,
110+
}
111+
112+
fn checked_decrement(u: &AtomicUsize) -> bool {
113+
u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
114+
.is_ok()
115+
}
116+
117+
impl<'f, T, C> Folder<T> for SkipAnyFolder<'f, C>
118+
where
119+
C: Folder<T>,
120+
{
121+
type Result = C::Result;
122+
123+
fn consume(mut self, item: T) -> Self {
124+
if !checked_decrement(self.count) {
125+
self.base = self.base.consume(item);
126+
}
127+
self
128+
}
129+
130+
fn consume_iter<I>(mut self, iter: I) -> Self
131+
where
132+
I: IntoIterator<Item = T>,
133+
{
134+
self.base = self.base.consume_iter(
135+
iter.into_iter()
136+
.skip_while(move |_| checked_decrement(self.count)),
137+
);
138+
self
139+
}
140+
141+
fn complete(self) -> C::Result {
142+
self.base.complete()
143+
}
144+
145+
fn full(&self) -> bool {
146+
self.base.full()
147+
}
148+
}

‎src/iter/take_any.rs

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use super::plumbing::*;
2+
use super::*;
3+
use std::sync::atomic::{AtomicUsize, Ordering};
4+
5+
/// `TakeAny` is an iterator that iterates over `n` elements from anywhere in `I`.
6+
/// This struct is created by the [`take_any()`] method on [`ParallelIterator`]
7+
///
8+
/// [`take_any()`]: trait.ParallelIterator.html#method.take_any
9+
/// [`ParallelIterator`]: trait.ParallelIterator.html
10+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11+
#[derive(Debug)]
12+
pub struct TakeAny<I: ParallelIterator> {
13+
base: I,
14+
count: AtomicUsize,
15+
}
16+
17+
impl<I> TakeAny<I>
18+
where
19+
I: ParallelIterator,
20+
{
21+
/// Creates a new `TakeAny` iterator.
22+
pub(super) fn new(base: I, count: usize) -> Self {
23+
TakeAny {
24+
base,
25+
count: AtomicUsize::new(count),
26+
}
27+
}
28+
}
29+
30+
impl<I, T> ParallelIterator for TakeAny<I>
31+
where
32+
I: ParallelIterator<Item = T>,
33+
T: Send,
34+
{
35+
type Item = T;
36+
37+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
38+
where
39+
C: UnindexedConsumer<Self::Item>,
40+
{
41+
let consumer1 = TakeAnyConsumer {
42+
base: consumer,
43+
count: &self.count,
44+
};
45+
self.base.drive_unindexed(consumer1)
46+
}
47+
}
48+
49+
/// ////////////////////////////////////////////////////////////////////////
50+
/// Consumer implementation
51+
52+
struct TakeAnyConsumer<'f, C> {
53+
base: C,
54+
count: &'f AtomicUsize,
55+
}
56+
57+
impl<'f, T, C> Consumer<T> for TakeAnyConsumer<'f, C>
58+
where
59+
C: Consumer<T>,
60+
T: Send,
61+
{
62+
type Folder = TakeAnyFolder<'f, C::Folder>;
63+
type Reducer = C::Reducer;
64+
type Result = C::Result;
65+
66+
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
67+
let (left, right, reducer) = self.base.split_at(index);
68+
(
69+
TakeAnyConsumer { base: left, ..self },
70+
TakeAnyConsumer {
71+
base: right,
72+
..self
73+
},
74+
reducer,
75+
)
76+
}
77+
78+
fn into_folder(self) -> Self::Folder {
79+
TakeAnyFolder {
80+
base: self.base.into_folder(),
81+
count: self.count,
82+
}
83+
}
84+
85+
fn full(&self) -> bool {
86+
self.count.load(Ordering::Relaxed) == 0 || self.base.full()
87+
}
88+
}
89+
90+
impl<'f, T, C> UnindexedConsumer<T> for TakeAnyConsumer<'f, C>
91+
where
92+
C: UnindexedConsumer<T>,
93+
T: Send,
94+
{
95+
fn split_off_left(&self) -> Self {
96+
TakeAnyConsumer {
97+
base: self.base.split_off_left(),
98+
..*self
99+
}
100+
}
101+
102+
fn to_reducer(&self) -> Self::Reducer {
103+
self.base.to_reducer()
104+
}
105+
}
106+
107+
struct TakeAnyFolder<'f, C> {
108+
base: C,
109+
count: &'f AtomicUsize,
110+
}
111+
112+
fn checked_decrement(u: &AtomicUsize) -> bool {
113+
u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
114+
.is_ok()
115+
}
116+
117+
impl<'f, T, C> Folder<T> for TakeAnyFolder<'f, C>
118+
where
119+
C: Folder<T>,
120+
{
121+
type Result = C::Result;
122+
123+
fn consume(mut self, item: T) -> Self {
124+
if checked_decrement(self.count) {
125+
self.base = self.base.consume(item);
126+
}
127+
self
128+
}
129+
130+
fn consume_iter<I>(mut self, iter: I) -> Self
131+
where
132+
I: IntoIterator<Item = T>,
133+
{
134+
self.base = self.base.consume_iter(
135+
iter.into_iter()
136+
.take_while(move |_| checked_decrement(self.count)),
137+
);
138+
self
139+
}
140+
141+
fn complete(self) -> C::Result {
142+
self.base.complete()
143+
}
144+
145+
fn full(&self) -> bool {
146+
self.count.load(Ordering::Relaxed) == 0 || self.base.full()
147+
}
148+
}

0 commit comments

Comments
 (0)
Please sign in to comment.