Skip to content

Commit

Permalink
Allow when_all and when_all_succeed to take function arguments
Browse files Browse the repository at this point in the history
The issue with the existing interface is that it is easy to create
some of the futures that one wants to wait for and then hit an
exception. For example, in

when_all(create_fut(), create_fut());

one call to create_fut might return a future, but the next one can
throw.

By passing functions that return futures to when_all and
when_all_succeed, we can convert exceptions into failed futures.

Signed-off-by: Rafael Ávila de Espíndola <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
espindola authored and tgrabiec committed Apr 1, 2019
1 parent ce4e1a2 commit 771996f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 21 deletions.
65 changes: 44 additions & 21 deletions include/seastar/core/future-util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -788,26 +788,44 @@ concept bool AllAreFutures = impl::is_tuple_of_futures<std::tuple<Futs...>>::val

)

template<typename Fut, std::enable_if_t<is_future<Fut>::value, int> = 0>
auto futurize_apply_if_func(Fut&& fut) {
return std::forward<Fut>(fut);
}

template<typename Func, std::enable_if_t<!is_future<Func>::value, int> = 0>
auto futurize_apply_if_func(Func&& func) {
return futurize_apply(std::forward<Func>(func));
}

/// Wait for many futures to complete, capturing possible errors (variadic version).
///
/// Given a variable number of futures as input, wait for all of them
/// to resolve (either successfully or with an exception), and return
/// them as a tuple so individual values or exceptions can be examined.
///
/// \param futs futures to wait for
/// \return an \c std::tuple<> of all the futures in the input; when
/// ready, all contained futures will be ready as well.
template <typename... Futs>
GCC6_CONCEPT( requires seastar::AllAreFutures<Futs...> )
inline
future<std::tuple<Futs...>>
when_all(Futs&&... futs) {
when_all_impl(Futs&&... futs) {
namespace si = internal;
using state = si::when_all_state<si::identity_futures_tuple<Futs...>, Futs...>;
return state::wait_all(std::forward<Futs>(futs)...);
}

/// Wait for many futures to complete, capturing possible errors (variadic version).
///
/// Each future can be passed directly, or a function that returns a
/// future can be given instead.
///
/// If any function throws, an exceptional future is created for it.
///
/// Returns a tuple of futures so individual values or exceptions can be
/// examined.
///
/// \param fut_or_funcs futures or functions that return futures
/// \return an \c std::tuple<> of all futures returned; when ready,
/// all contained futures will be ready as well.
template <typename... FutOrFuncs>
inline auto when_all(FutOrFuncs&&... fut_or_funcs) {
return when_all_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
}

/// \cond internal
namespace internal {

Expand Down Expand Up @@ -1274,23 +1292,28 @@ struct extract_values_from_futures_vector<future<>> {

}

/// Wait for many futures to complete (variadic version).
///
/// Given a variable number of futures as input, wait for all of them
/// to resolve, and return a future containing the values of each individual
/// resolved future.
/// In case any of the given futures fails one of the exceptions is returned
/// by this function as a failed future.
///
/// \param futures futures to wait for
/// \return future containing values of input futures
template<typename... Futures>
GCC6_CONCEPT( requires seastar::AllAreFutures<Futures...> )
inline auto when_all_succeed(Futures&&... futures) {
inline auto when_all_succeed_impl(Futures&&... futures) {
using state = internal::when_all_state<internal::extract_values_from_futures_tuple<Futures...>, Futures...>;
return state::wait_all(std::forward<Futures>(futures)...);
}

/// Wait for many futures to complete (variadic version).
///
/// Each future can be passed directly, or a function that returns a
/// future can be given instead.
///
/// If any function throws, or if the returned future fails, one of
/// the exceptions is returned by this function as a failed future.
///
/// \param fut_or_funcs futures or functions that return futures
/// \return future containing values of futures returned by funcs
template <typename... FutOrFuncs>
inline auto when_all_succeed(FutOrFuncs&&... fut_or_funcs) {
return when_all_succeed_impl(futurize_apply_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
}

/// Wait for many futures to complete (iterator version).
///
/// Given a range of futures as input, wait for all of them
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/futures_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,49 @@ SEASTAR_TEST_CASE(test_when_allx) {
return when_all(later(), later(), make_ready_future()).discard_result();
}

// A noncopyable and nonmovable struct
struct non_copy_non_move {
non_copy_non_move(non_copy_non_move&&) = delete;
non_copy_non_move(const non_copy_non_move&) = delete;
};

SEASTAR_TEST_CASE(test_when_all_functions) {
auto f = [x = non_copy_non_move{}] {
(void)x;
return make_ready_future<int>(42);
};
return when_all(f, [] {
throw 42;
return make_ready_future<>();
}, later()).then([] (std::tuple<future<int>, future<>, future<>> res) {
BOOST_REQUIRE_EQUAL(std::get<0>(res).get0(), 42);

BOOST_REQUIRE(std::get<1>(res).available());
BOOST_REQUIRE(std::get<1>(res).failed());
std::get<1>(res).ignore_ready_future();

BOOST_REQUIRE(std::get<2>(res).available());
BOOST_REQUIRE(!std::get<2>(res).failed());
return make_ready_future<>();
});
}

SEASTAR_TEST_CASE(test_when_all_succeed_functions) {
auto f = [x = non_copy_non_move{}] {
(void)x;
return make_ready_future<int>(42);
};
return when_all_succeed(f, [] {
throw 42;
return make_ready_future<>();
}, later()).then_wrapped([] (future<int> res) {
BOOST_REQUIRE(res.available());
BOOST_REQUIRE(res.failed());
res.ignore_ready_future();
return make_ready_future<>();
});
}

template<typename E, typename... T>
static void check_failed_with(future<T...>&& f) {
BOOST_REQUIRE(f.failed());
Expand Down

0 comments on commit 771996f

Please sign in to comment.