Skip to content

Commit

Permalink
Add 'promise' versions of EventLoopFuture.{and,when}All{succeed,compl…
Browse files Browse the repository at this point in the history
…ete} (apple#1794)

Motivation:

Most NIO APIs have two versions: one accepting a promise (often
optional) and another which returns a future. Many applications and
frameworks built on top of NIO also use this pattern. To maximise
composability we should try to provide both versions where possible.

Modifications:

Add versions of `andAllSucceed`, `andAllComplete`, `whenAllSucceed`
and `whenAllComplete` which accept an `EventLoopPromise` in place of
an `EventLoop` and defer the implementation of the future versions to
the promise versions.

Result:

- Better composability.
- Fewer allocations if the caller already has a promise.
  • Loading branch information
glbrntt authored Apr 8, 2021
1 parent 9a224ae commit f635712
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 109 deletions.
120 changes: 95 additions & 25 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,24 @@ extension EventLoopFuture {
/// - futures: An array of homogenous `EventLoopFutures`s to wait for.
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will execute on.
/// - Returns: A new `EventLoopFuture` that waits for the other futures to succeed.
@inlinable
public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)
EventLoopFuture.andAllSucceed(futures, promise: promise)
return promise.futureResult
}

/// Succeeds the promise if all of the provided futures succeed. If any of the provided
/// futures fail then the `promise` will be failed -- even if some futures are yet to complete.
///
/// If the results of all futures should be collected use `andAllComplete` instead.
///
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFutures`s to wait for.
/// - promise: The `EventLoopPromise` to complete with the result of this call.
@inlinable
public static func andAllSucceed(_ futures: [EventLoopFuture<Value>], promise: EventLoopPromise<Void>) {
let eventLoop = promise.futureResult.eventLoop

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
Expand All @@ -1082,8 +1098,6 @@ extension EventLoopFuture {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds only if all of the provided futures succeed.
Expand All @@ -1095,25 +1109,45 @@ extension EventLoopFuture {
/// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures.
public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<[Value]> {
let promise = eventLoop.makePromise(of: Void.self)
let promise = eventLoop.makePromise(of: [Value].self)
EventLoopFuture.whenAllSucceed(futures, promise: promise)
return promise.futureResult
}

/// Completes the `promise` with the values of all `futures` if all provided futures succeed. If
/// any of the provided futures fail then `promise` will be failed.
///
/// If the _results of all futures should be collected use `andAllComplete` instead.
///
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFutures`s to wait for.
/// - promise: The `EventLoopPromise` to complete with the result of this call.
public static func whenAllSucceed(_ futures: [EventLoopFuture<Value>], promise: EventLoopPromise<[Value]>) {
let eventLoop = promise.futureResult.eventLoop
let reduced = eventLoop.makePromise(of: Void.self)

var results: [Value?] = .init(repeating: nil, count: futures.count)
let callback = { (index: Int, result: Value) in
results[index] = result
}

if eventLoop.inEventLoop {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
self._reduceSuccesses0(reduced, futures, eventLoop, onValue: callback)
} else {
eventLoop.execute {
self._reduceSuccesses0(promise, futures, eventLoop, onValue: callback)
self._reduceSuccesses0(reduced, futures, eventLoop, onValue: callback)
}
}

return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
return results.map { $0! }
reduced.futureResult.whenComplete { result in
switch result {
case .success:
// verify that all operations have been completed
assert(!results.contains(where: { $0 == nil }))
promise.succeed(results.map { $0! })
case .failure(let error):
promise.fail(error)
}
}
}

Expand All @@ -1122,10 +1156,11 @@ extension EventLoopFuture {
///
/// Once all the futures have succeed, the provided promise will succeed.
/// Once any future fails, the provided promise will fail.
private static func _reduceSuccesses0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping (Int, InputValue) -> Void) {
@inlinable
internal static func _reduceSuccesses0<InputValue>(_ promise: EventLoopPromise<Void>,
_ futures: [EventLoopFuture<InputValue>],
_ eventLoop: EventLoop,
onValue: @escaping (Int, InputValue) -> Void) {
eventLoop.assertInEventLoop()

var remainingCount = futures.count
Expand Down Expand Up @@ -1184,6 +1219,22 @@ extension EventLoopFuture {
@inlinable
public static func andAllComplete(_ futures: [EventLoopFuture<Value>], on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)
EventLoopFuture.andAllComplete(futures, promise: promise)
return promise.futureResult
}

/// Completes a `promise` when all of the provided `EventLoopFuture`s have completed.
///
/// The promise will always be succeeded, regardless of the outcome of the individual futures.
///
/// If the results are required, use `whenAllComplete` instead.
///
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFuture`s to wait for.
/// - promise: The `EventLoopPromise` to succeed when all futures have completed.
@inlinable
public static func andAllComplete(_ futures: [EventLoopFuture<Value>], promise: EventLoopPromise<Void>) {
let eventLoop = promise.futureResult.eventLoop

if eventLoop.inEventLoop {
self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in })
Expand All @@ -1192,8 +1243,6 @@ extension EventLoopFuture {
self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in })
}
}

return promise.futureResult
}

/// Returns a new `EventLoopFuture` that succeeds when all of the provided `EventLoopFuture`s complete.
Expand All @@ -1210,29 +1259,50 @@ extension EventLoopFuture {
@inlinable
public static func whenAllComplete(_ futures: [EventLoopFuture<Value>],
on eventLoop: EventLoop) -> EventLoopFuture<[Result<Value, Error>]> {
let promise = eventLoop.makePromise(of: Void.self)
let promise = eventLoop.makePromise(of: [Result<Value, Error>].self)
EventLoopFuture.whenAllComplete(futures, promise: promise)
return promise.futureResult
}

/// Completes a `promise` with the results of all provided `EventLoopFuture`s.
///
/// The promise will always be succeeded, regardless of the outcome of the futures.
///
/// - Parameters:
/// - futures: An array of homogenous `EventLoopFuture`s to gather results from.
/// - promise: The `EventLoopPromise` to complete with the result of the futures.
@inlinable
public static func whenAllComplete(_ futures: [EventLoopFuture<Value>],
promise: EventLoopPromise<[Result<Value, Error>]>) {
let eventLoop = promise.futureResult.eventLoop
let reduced = eventLoop.makePromise(of: Void.self)

var results: [Result<Value, Error>] = .init(repeating: .failure(OperationPlaceholderError()), count: futures.count)
let callback = { (index: Int, result: Result<Value, Error>) in
results[index] = result
}

if eventLoop.inEventLoop {
self._reduceCompletions0(promise, futures, eventLoop, onResult: callback)
self._reduceCompletions0(reduced, futures, eventLoop, onResult: callback)
} else {
eventLoop.execute {
self._reduceCompletions0(promise, futures, eventLoop, onResult: callback)
self._reduceCompletions0(reduced, futures, eventLoop, onResult: callback)
}
}

return promise.futureResult.map {
// verify that all operations have been completed
assert(!results.contains(where: {
guard case let .failure(error) = $0 else { return false }
return error is OperationPlaceholderError
}))
reduced.futureResult.whenComplete { result in
switch result {
case .success:
// verify that all operations have been completed
assert(!results.contains(where: {
guard case let .failure(error) = $0 else { return false }
return error is OperationPlaceholderError
}))
promise.succeed(results)

return results
case .failure(let error):
promise.fail(error)
}
}
}

Expand Down
Loading

0 comments on commit f635712

Please sign in to comment.