Skip to content

Commit

Permalink
Added the ability to collect a stream of results
Browse files Browse the repository at this point in the history
  • Loading branch information
sunjay committed Sep 17, 2019
1 parent e6880e1 commit ad05101
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod stream;
pub mod sync;
pub mod task;
mod vec;
mod result;

#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]
Expand Down
43 changes: 43 additions & 0 deletions src/result/from_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::stream::{FromStream, IntoStream, Stream};

use std::pin::Pin;

impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
where V: FromStream<T> {
/// Takes each element in the stream: if it is an `Err`, no further
/// elements are taken, and the `Err` is returned. Should no `Err`
/// occur, a container with the values of each `Result` is returned.
#[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
{
let stream = stream.into_stream();

Pin::from(Box::new(async move {
pin_utils::pin_mut!(stream);

// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out: V = stream.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
},
}
}).collect().await;

match found_error {
Some(err) => Err(err),
None => Ok(out),
}
}))
}
}

9 changes: 9 additions & 0 deletions src/result/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! The Rust core error handling type
//!
//! This module provides the `Result<T, E>` type for returning and
//! propagating errors.
mod from_stream;

#[doc(inline)]
pub use std::result::Result;
16 changes: 16 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,22 @@ pub trait Stream {
/// let buf: Vec<u8> = s.collect().await;
///
/// assert_eq!(buf, vec![9; 3]);
///
/// // You can also collect streams of Result values
/// // into any collection that implements FromStream
/// let s = stream::repeat(Ok(9)).take(3);
/// // We are using Vec here, but other collections
/// // are supported as well
/// let buf: Result<Vec<u8>, ()> = s.collect().await;
///
/// assert_eq!(buf, Ok(vec![9; 3]));
///
/// // The stream will stop on the first Err and
/// // return that instead
/// let s = stream::repeat(Err(5)).take(3);
/// let buf: Result<Vec<u8>, u8> = s.collect().await;
///
/// assert_eq!(buf, Err(5));
/// #
/// # }) }
/// ```
Expand Down

0 comments on commit ad05101

Please sign in to comment.