Skip to content

Commit

Permalink
future-util: add max_concurrent_for_each
Browse files Browse the repository at this point in the history
And a corresponding unit test.

max_concurrent_for_each is similar to parallel_for_each,
just limiting parallelism to `max_concurrent` parallel
invocations of `func`.

Test: futures(dev)
Signed-off-by: Benny Halevy <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
bhalevy authored and avikivity committed Sep 13, 2020
1 parent d57e66c commit 8933f76
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 2 deletions.
23 changes: 21 additions & 2 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,9 @@ seastar::future<> do_for_all(std::vector<int> numbers) {
});
}
```

## parallel_for_each
Parallel for each is a high concurrency variant of `do_for_each`. When using `parallel_for_each`, all iterations are queued simultaneously - which means that there's no guarantee in which order they finish their operations.
```cpp
seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files) {
return seastar::parallel_for_each(files, [] (seastar::file f) {
Expand All @@ -852,11 +851,31 @@ seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::fi
}
```
`parallel_for_each` is a powerful tool, as it allows spawning many tasks in parallel. It can be a great performance gain, but there are also caveats. First of all, too high concurrency may be troublesome - the details can be found in chapter **Limiting parallelism of loops**.

To restrict the concurrency of `parallel_for_each` by an integer number, use `max_concurrent_for_each` that is described below.
More details about dealing with parallelism can be found in chapter **Limiting parallelism of loops**.

Secondly, take note that the order in which iterations will be executed within a `parallel_for_each` loop is arbitrary - if a strict ordering is needed, consider using `do_for_each` instead.

TODO: map_reduce, as a shortcut (?) for parallel_for_each which needs to produce some results (e.g., logical_or of boolean results), so we don't need to create a lw_shared_ptr explicitly (or do_with).

TODO: See seastar commit "input_stream: Fix possible infinite recursion in consume()" for an example on why recursion is a possible, but bad, replacement for repeat(). See also my comment on https://groups.google.com/d/msg/seastar-dev/CUkLVBwva3Y/3DKGw-9aAQAJ on why Seastar's iteration primitives should be used over tail call optimization.

## max_concurrent_for_each
Max concurrent for each is a variant of `parallel_for_each` with restricted parallelism.
It accepts an additional parameter - `max_concurrent` - with which, up to `max_concurrent` iterations are queued simultaneously, with no guarantee in which order they finish their operations.

```cpp
seastar::future<> flush_all_files(seastar::lw_shared_ptr<std::vector<seastar::file>> files, size_t max_concurrent) {
return seastar::max_concurrent_for_each(files, max_concurrent, [] (seastar::file f) {
return f.flush();
});
}
```
Determining the maximum concurrency limit is out of the scope of this document.
It should typically be derived from the actual capabilities of the system the software is running on, like the number of parallel execution units or I/O channels, so to optimize utilization of resources without overwhelming the system.
# when_all: Waiting for multiple futures
Above we've seen `parallel_for_each()`, which starts a number of asynchronous operations, and then waits for all to complete. Seastar has another idiom, `when_all()`, for waiting for several already-existing futures to complete.
Expand Down
106 changes: 106 additions & 0 deletions include/seastar/core/loop.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/task.hh>
#include <seastar/util/bool_class.hh>
#include <seastar/core/semaphore.hh>

namespace seastar {

Expand Down Expand Up @@ -604,6 +605,111 @@ parallel_for_each(Range&& range, Func&& func) noexcept {
return futurize_invoke(impl, std::forward<Range>(range), std::forward<Func>(func));
}

/// Run a maximum of \c max_concurrent tasks in parallel (iterator version).
///
/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
/// the range, and return a future<> that resolves when all the functions
/// complete. \c func should return a future<> that indicates when it is
/// complete. Up to \c max_concurrent invocations are performed in parallel.
/// This does not allow the range to refer to stack objects. The caller
/// must ensure that the range outlives the call to max_concurrent_for_each
/// so it can be iterated in the background.
///
/// \param begin an \c InputIterator designating the beginning of the range
/// \param end an \c InputIterator designating the end of the range
/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
/// \param func Function to invoke with each element in the range (returning
/// a \c future<>)
/// \return a \c future<> that resolves when all the function invocations
/// complete. If one or more return an exception, the return value
/// contains one of the exceptions.
template <typename Iterator, typename Func>
SEASTAR_CONCEPT( requires requires (Func f, Iterator i) { { f(*i++) } -> std::same_as<future<>>; } )
inline
future<>
max_concurrent_for_each(Iterator begin, Iterator end, size_t max_concurrent, Func&& func) noexcept {
struct state {
Iterator begin;
Iterator end;
Func func;
size_t max_concurrent;
semaphore sem;
std::exception_ptr err;

state(Iterator begin_, Iterator end_, size_t max_concurrent_, Func func_)
: begin(std::move(begin_))
, end(std::move(end_))
, func(std::move(func_))
, max_concurrent(max_concurrent_)
, sem(max_concurrent_)
, err()
{ }
};

assert(max_concurrent > 0);

try {
return do_with(state(std::move(begin), std::move(end), max_concurrent, std::forward<Func>(func)), [] (state& s) {
return do_until([&s] { return s.begin == s.end; }, [&s] {
return s.sem.wait().then([&s] () mutable noexcept {
// Possibly run in background and signal _sem when the task is done.
// The background tasks are waited on using _sem.
(void)futurize_invoke(s.func, *s.begin++).then_wrapped([&s] (future<> fut) {
if (fut.failed()) {
auto e = fut.get_exception();;
if (!s.err) {
s.err = std::move(e);
}
}
s.sem.signal();
});
});
}).then([&s] {
// Wait for any background task to finish
// and signal and semaphore
return s.sem.wait(s.max_concurrent);
}).then([&s] {
if (!s.err) {
return make_ready_future<>();
}
return seastar::make_exception_future<>(std::move(s.err));
});
});
} catch (...) {
return current_exception_as_future();
}
}

/// Run a maximum of \c max_concurrent tasks in parallel (range version).
///
/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
/// the range, and return a future<> that resolves when all the functions
/// complete. \c func should return a future<> that indicates when it is
/// complete. Up to \c max_concurrent invocations are performed in parallel.
/// This does not allow the range to refer to stack objects. The caller
/// must ensure that the range outlives the call to max_concurrent_for_each
/// so it can be iterated in the background.
///
/// \param begin an \c InputIterator designating the beginning of the range
/// \param end an \c InputIterator designating the end of the range
/// \param max_concurrent maximum number of concurrent invocations of \c func, must be greater than zero.
/// \param func Function to invoke with each element in the range (returning
/// a \c future<>)
/// \return a \c future<> that resolves when all the function invocations
/// complete. If one or more return an exception, the return value
/// contains one of the exceptions.
template <typename Range, typename Func>
SEASTAR_CONCEPT( requires std::ranges::range<Range> && requires (Func f, Range r) { { f(*r.begin()) } -> std::same_as<future<>>; } )
inline
future<>
max_concurrent_for_each(Range&& range, size_t max_concurrent, Func&& func) noexcept {
try {
return max_concurrent_for_each(std::begin(range), std::end(range), max_concurrent, std::forward<Func>(func));
} catch (...) {
return current_exception_as_future();
}
}

/// @}

} // namespace seastar
80 changes: 80 additions & 0 deletions tests/unit/futures_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1502,3 +1502,83 @@ SEASTAR_THREAD_TEST_CASE(test_with_gate) {
BOOST_REQUIRE(gate_closed_errors);
BOOST_REQUIRE(!other_errors);
}

SEASTAR_THREAD_TEST_CASE(test_max_concurrent_for_each) {
BOOST_TEST_MESSAGE("empty range");
max_concurrent_for_each(std::vector<int>(), 3, [] (int) {
BOOST_FAIL("should not reach");
return make_exception_future<>(std::bad_function_call());
}).get();

auto range = boost::copy_range<std::vector<int>>(boost::irange(1, 8));

BOOST_TEST_MESSAGE("iterator");
auto sum = 0;
max_concurrent_for_each(range.begin(), range.end(), 3, [&sum] (int v) {
sum += v;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("const iterator");
sum = 0;
max_concurrent_for_each(range.cbegin(), range.cend(), 3, [&sum] (int v) {
sum += v;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("reverse iterator");
sum = 0;
max_concurrent_for_each(range.rbegin(), range.rend(), 3, [&sum] (int v) {
sum += v;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("immediate result");
sum = 0;
max_concurrent_for_each(range, 3, [&sum] (int v) {
sum += v;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("suspend");
sum = 0;
max_concurrent_for_each(range, 3, [&sum] (int v) {
return later().then([&sum, v] {
sum += v;
});
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("throw immediately");
sum = 0;
BOOST_CHECK_EXCEPTION(max_concurrent_for_each(range, 3, [&sum] (int v) {
sum += v;
if (v == 1) {
throw 5;
}
}).get(), int, [] (int v) { return v == 5; });
BOOST_REQUIRE_EQUAL(sum, 28);

BOOST_TEST_MESSAGE("throw after suspension");
sum = 0;
BOOST_CHECK_EXCEPTION(max_concurrent_for_each(range, 3, [&sum] (int v) {
return later().then([&sum, v] {
sum += v;
if (v == 2) {
throw 5;
}
});
}).get(), int, [] (int v) { return v == 5; });

BOOST_TEST_MESSAGE("concurrency higher than vector length");
sum = 0;
max_concurrent_for_each(range, range.size() + 3, [&sum] (int v) {
sum += v;
return make_ready_future<>();
}).get();
BOOST_REQUIRE_EQUAL(sum, 28);
}

0 comments on commit 8933f76

Please sign in to comment.