Skip to content

Commit

Permalink
Merge async-rs#125
Browse files Browse the repository at this point in the history
125: from/into stream r=yoshuawuyts a=yoshuawuyts

This adds `Stream` counterparts to `FromIterator`, `IntoIterator` and `Iterator::collect`, allowing to use the same patterns that are common in streams. Thanks!

## Tasks
- [x]  `FromStream`
- [x] `IntoStream`
- [x] `Stream::collect`

## Screenshot
![Screenshot_2019-08-29 async_std stream - Rust](https://user-images.githubusercontent.com/2467194/63928985-ec2bd200-ca50-11e9-868c-9899800e5b83.png)

Co-authored-by: Yoshua Wuyts <[email protected]>
  • Loading branch information
bors[bot] and yoshuawuyts authored Sep 17, 2019
2 parents 60a62f9 + 98927a7 commit e6880e1
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod prelude;
pub mod stream;
pub mod sync;
pub mod task;
mod vec;

#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]
Expand Down
29 changes: 29 additions & 0 deletions src/stream/from_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::IntoStream;

use std::pin::Pin;

/// Conversion from a `Stream`.
///
/// By implementing `FromStream` for a type, you define how it will be created from a stream.
/// This is common for types which describe a collection of some kind.
///
/// See also: [`IntoStream`].
///
/// [`IntoStream`]: trait.IntoStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait FromStream<T: Send> {
/// Creates a value from a stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// // use async_std::stream::FromStream;
///
/// // let _five_fives = async_std::stream::repeat(5).take(5);
/// ```
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>;
}
36 changes: 36 additions & 0 deletions src/stream/into_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use futures_core::stream::Stream;

/// Conversion into a `Stream`.
///
/// By implementing `IntoIterator` for a type, you define how it will be
/// converted to an iterator. This is common for types which describe a
/// collection of some kind.
///
/// [`from_stream`]: #tymethod.from_stream
/// [`Stream`]: trait.Stream.html
/// [`collect`]: trait.Stream.html#method.collect
///
/// See also: [`FromStream`].
///
/// [`FromStream`]: trait.FromStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait IntoStream {
/// The type of the elements being iterated over.
type Item;

/// Which kind of stream are we turning this into?
type IntoStream: Stream<Item = Self::Item> + Send;

/// Creates a stream from a value.
fn into_stream(self) -> Self::IntoStream;
}

impl<I: Stream + Send> IntoStream for I {
type Item = I::Item;
type IntoStream = I;

#[inline]
fn into_stream(self) -> I {
self
}
}
4 changes: 4 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
pub use double_ended_stream::DoubleEndedStream;
pub use empty::{empty, Empty};
pub use from_stream::FromStream;
pub use into_stream::IntoStream;
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Scan, Stream, Take, Zip};

mod double_ended_stream;
mod empty;
mod from_stream;
mod into_stream;
mod once;
mod repeat;
mod stream;
63 changes: 60 additions & 3 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ use min_by::MinByFuture;
use next::NextFuture;
use nth::NthFuture;

use super::from_stream::FromStream;
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use cfg_if::cfg_if;

use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
Expand All @@ -80,6 +80,21 @@ cfg_if! {
}
}

cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);

macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
}
} else {
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + Send + 'a>>)
}
}
}

/// An asynchronous stream of values.
///
/// This trait is an async version of [`std::iter::Iterator`].
Expand Down Expand Up @@ -536,7 +551,6 @@ pub trait Stream {
///
/// let mut s = stream::repeat::<u32>(42).take(3);
/// assert!(s.any(|x| x == 42).await);
///
/// #
/// # }) }
/// ```
Expand Down Expand Up @@ -652,6 +666,49 @@ pub trait Stream {
{
Zip::new(self, other)
}

/// Transforms a stream into a collection.
///
/// `collect()` can take anything streamable, and turn it into a relevant
/// collection. This is one of the more powerful methods in the async
/// standard library, used in a variety of contexts.
///
/// The most basic pattern in which `collect()` is used is to turn one
/// collection into another. You take a collection, call [`stream`] on it,
/// do a bunch of transformations, and then `collect()` at the end.
///
/// Because `collect()` is so general, it can cause problems with type
/// inference. As such, `collect()` is one of the few times you'll see
/// the syntax affectionately known as the 'turbofish': `::<>`. This
/// helps the inference algorithm understand specifically which collection
/// you're trying to collect into.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat(9u8).take(3);
/// let buf: Vec<u8> = s.collect().await;
///
/// assert_eq!(buf, vec![9; 3]);
/// #
/// # }) }
/// ```
///
/// [`stream`]: trait.Stream.html#tymethod.next
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> dyn_ret!('a, B)
where
Self: futures_core::stream::Stream + Sized + Send + 'a,
<Self as futures_core::stream::Stream>::Item: Send,
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
{
FromStream::from_stream(self)
}
}

impl<T: futures_core::stream::Stream + ?Sized> Stream for T {
Expand Down
25 changes: 25 additions & 0 deletions src/vec/from_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::stream::{FromStream, IntoStream, Stream};

use std::pin::Pin;

impl<T: Send> FromStream<T> for Vec<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
{
let stream = stream.into_stream();

Pin::from(Box::new(async move {
pin_utils::pin_mut!(stream);

let mut out = vec![];
while let Some(item) = stream.next().await {
out.push(item);
}
out
}))
}
}
9 changes: 9 additions & 0 deletions src/vec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! The Rust core allocation and collections library
//!
//! This library provides smart pointers and collections for managing
//! heap-allocated values.
mod from_stream;

#[doc(inline)]
pub use std::vec::Vec;

0 comments on commit e6880e1

Please sign in to comment.