Skip to content

Commit

Permalink
added some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mkawalec committed Sep 29, 2022
1 parent 9a4bfda commit c8c56df
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/deluge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use std::future::Future;

/// A stream of unevaluated futures eventually returning an element of the stream.
///
/// An executor such as `collect` or `collect_par` controls how these futures are evaluated.
/// If a `None` is returned for a given element of a collection, it means that
/// element was filtered out earlier in the processing chain and should be omitted.
///
/// If `None` is returned from the call to `next`, the Deluge has ran out of items to provide.
/// Calling `next` again will be unsafe and may lead to panics.
pub trait Deluge<'a> {
type Item: Send;
type Output: Future<Output = Option<Self::Item>> + 'a;
Expand Down
89 changes: 82 additions & 7 deletions src/deluge_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ use crate::ops::*;
impl<'a, T> DelugeExt<'a> for T where T: Deluge<'a> {}

pub trait DelugeExt<'a>: Deluge<'a> {
/// Transforms each element by applying an asynchronous function `f` to it
///
/// # Examples
///
/// ```
/// let result = deluge::iter([1, 2, 3, 4])
/// .map(|x| async move { x * 2 })
/// .collect::<Vec<usize>>(None)
/// .await;
///
/// assert_eq!(vec![2, 4, 6, 8], result);
/// ```
fn map<Fut, F>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> Fut + Send + 'a,
Expand All @@ -16,6 +28,18 @@ pub trait DelugeExt<'a>: Deluge<'a> {
Map::new(self, f)
}

/// Leaves the elements for which `f` returns a promise evaluating to `true`.
///
/// # Examples
///
/// ```
/// let result = (0..10).into_deluge()
/// .filter(|x| async move { idx % 2 == 0 })
/// .collect::<Vec<usize>>()
/// .await;
///
/// assert_eq!(vec![0, 2, 4, 6, 8], result);
/// ```
fn filter<F>(self, f: F) -> Filter<Self, F>
where
for<'b> F: XFn<'b, &'b Self::Item, bool> + Send + 'b,
Expand All @@ -24,6 +48,30 @@ pub trait DelugeExt<'a>: Deluge<'a> {
Filter::new(self, f)
}

/// Concurrently accummulates values in the accummulator. The degree of concurrency
/// can either be unlimited (the default) or limited depending on the requirements.
///
/// # Examples
///
/// Unlimited concurrency:
///
/// ```
/// let result = (0..100).into_deluge()
/// .fold(None, 0, |acc, x| async move { acc + x })
/// .await;
///
/// assert_eq!(result, 4950);
/// ```
///
/// Concurrency limited to at most ten futures evaluated at once:
///
/// ```
/// let result = (0..100).into_deluge()
/// .fold(10, 0, |acc, x| async move { acc + x })
/// .await;
///
/// assert_eq!(result, 4950);
/// ```
fn fold<Acc, F, Fut>(
self,
concurrency: impl Into<Option<usize>>,
Expand All @@ -38,13 +86,21 @@ pub trait DelugeExt<'a>: Deluge<'a> {
Fold::new(self, concurrency, acc, f)
}

fn take(self, how_many: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, how_many)
}

/// Accummulates values in an accummulator with futures evaluated in parallel.
/// The number of workers spawned and concurrency for each worker can be controlled.
/// By default the number of workers equals the number of logical cpus
/// and concurrency for each worker is the total number of futures to evaluate
/// divided by the number of available workers.
///
/// # Examples
///
/// ```
/// let result = (0..100).into_deluge()
/// .fold_par(None, None, 0, |acc, x| async move { acc + x })
/// .await;
///
/// assert_eq!(result, 4950);
/// ```
#[cfg(feature = "parallel")]
fn fold_par<Acc, F, Fut>(
self,
Expand All @@ -61,6 +117,25 @@ pub trait DelugeExt<'a>: Deluge<'a> {
FoldPar::new(self, worker_count, worker_concurrency, acc, f)
}

/// Consumes at most `how_many` elements from the Deluge, ignoring the rest.
///
/// # Examples
///
/// ```
/// let result = (0..100).into_deluge()
/// .take(1)
/// .fold(None, 0, |acc, x| async move { acc + x })
/// .await;
///
/// assert_eq!(0, result);
/// ```
fn take(self, how_many: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, how_many)
}

fn collect<C>(self, concurrency: impl Into<Option<usize>>) -> Collect<'a, Self, C>
where
C: Default + Extend<Self::Item>,
Expand Down
14 changes: 6 additions & 8 deletions src/ops/collect_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,13 @@ impl<'a, Del: Deluge<'a>, C: Default> CollectPar<'a, Del, C> {
// 2. Each worker starts with worker_concurrency futures
// and steals from the central place as needed

// We need it to proove to the compilter that our worker body is a `FnOnce`
fn make_fn_once<T, F: FnOnce() -> T>(f: F) -> F {
f
}

// No need to provide initial work, the worker should pull
// it from `outstanding_futures` by itself
fn create_worker<'a, Del: Deluge<'a> + 'a>(
outstanding_futures: OutstandingFutures<'a, Del>,
completed_channel: mpsc::Sender<CompletedItem<'a, Del>>,
concurrency: NonZeroUsize,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
println!("Creating a worker");
Box::pin(async move {
println!("Worker is alive");
let mut evaluated_futures = FuturesUnordered::new();

let run_worker = make_fn_once(|| async {
Expand Down Expand Up @@ -261,3 +253,9 @@ where
}
}
}


// Useful for when we need to prove to the compiler that our worker body is a `FnOnce`
fn make_fn_once<T, F: FnOnce() -> T>(f: F) -> F {
f
}

0 comments on commit c8c56df

Please sign in to comment.