Skip to content

Commit

Permalink
[FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture
Browse files Browse the repository at this point in the history
Complete renaming AsyncCollector -> ResultFuture

This closes apache#4243.
  • Loading branch information
zhangminglei authored and tillrohrmann committed Aug 22, 2017
1 parent 9077e51 commit 40cec17
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 121 deletions.
20 changes: 10 additions & 10 deletions docs/dev/stream/operators/asyncio.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Assuming one has an asynchronous client for the target database, three parts are
with asynchronous I/O against the database:

- An implementation of `AsyncFunction` that dispatches the requests
- A *callback* that takes the result of the operation and hands it to the `AsyncCollector`
- A *callback* that takes the result of the operation and hands it to the `ResultFuture`
- Applying the async I/O operation on a DataStream as a transformation

The following code example illustrates the basic pattern:
Expand Down Expand Up @@ -104,16 +104,16 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
}

@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
// the callback simply forwards the result to the result future
resultFuture.thenAccept( (String result) -> {

asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
});
}
Expand Down Expand Up @@ -142,15 +142,15 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
override def asyncInvoke(str: String, resultFutre: ResultFuture[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the result
val resultFuture: Future[String] = client.query(str)

// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
// the callback simply forwards the result to the result future
resultFuture.onSuccess {
case result: String => asyncCollector.collect(Iterable((str, result)));
case result: String => resultFuture.complete(Iterable((str, result)));
}
}
}
Expand All @@ -166,8 +166,8 @@ val resultStream: DataStream[(String, String)] =
</div>
</div>

**Important note**: The `AsyncCollector` is completed with the first call of `AsyncCollector.collect`.
All subsequent `collect` calls will be ignored.
**Important note**: The `ResultFuture` is completed with the first call of `ResultFuture.complete`.
All subsequent `complete` calls will be ignored.

The following two parameters control the asynchronous operations:

Expand Down Expand Up @@ -229,7 +229,7 @@ asynchronous requests in checkpoints and restores/re-triggers the requests when

For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the
callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
the result to the `AsyncCollector`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.

A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -178,7 +178,7 @@ public void close() throws Exception {
}

@Override
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception {
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
this.executorService.submit(new Runnable() {
@Override
public void run() {
Expand All @@ -188,13 +188,13 @@ public void run() {
Thread.sleep(sleep);

if (random.nextFloat() < failRatio) {
collector.collect(new Exception("wahahahaha..."));
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
collector.collect(
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
collector.collect(new ArrayList<String>(0));
resultFuture.complete(new ArrayList<String>(0));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncCollector
import org.apache.flink.streaming.api.scala.async.ResultFuture

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -38,9 +38,9 @@ object AsyncIOExample {
val input = env.addSource(new SimpleSource())

val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
(input, collector: AsyncCollector[Int]) =>
(input, collector: ResultFuture[Int]) =>
Future {
collector.collect(Seq(input))
collector.complete(Seq(input))
} (ExecutionContext.global)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,28 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;

import java.io.Serializable;

/**
* A function to trigger Async I/O operation.
*
* <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
* the result can be collected by calling {@link AsyncCollector#collect}. For each async
* the result can be collected by calling {@link ResultFuture#complete}. For each async
* operation, its context is stored in the operator immediately after invoking
* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
*
* <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
* <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by
* {@link AsyncCollector#collect(Throwable)}.
* {@link ResultFuture#completeExceptionally(Throwable)}.
*
* <p>Callback example usage:
*
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
*
* public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
* HBaseCallback cb = new HBaseCallback(collector);
* public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
* HBaseCallback cb = new HBaseCallback(result);
* Get get = new Get(Bytes.toBytes(row));
* hbase.asyncGet(get, cb);
* }
Expand All @@ -54,16 +53,16 @@
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
*
* public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
* public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
* Get get = new Get(Bytes.toBytes(row));
* ListenableFuture<Result> future = hbase.asyncGet(get);
* Futures.addCallback(future, new FutureCallback<Result>() {
* public void onSuccess(Result result) {
* List<String> ret = process(result);
* collector.collect(ret);
* result.complete(ret);
* }
* public void onFailure(Throwable thrown) {
* collector.collect(thrown);
* result.completeExceptionally(thrown);
* }
* });
* }
Expand All @@ -80,9 +79,9 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param collector to collect the result data
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.async.collector;
package org.apache.flink.streaming.api.functions.async;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Collection;

/**
* {@link AsyncCollector} collects data / error in user codes while processing async i/o.
* {@link ResultFuture} collects data / error in user codes while processing async i/o.
*
* @param <OUT> Output type
*/
@PublicEvolving
public interface AsyncCollector<OUT> {
public interface ResultFuture<OUT> {
/**
* Set result.
* Completes the result future with a collection of result objects.
*
* <p>Note that it should be called for exactly one time in the user code.
* Calling this function for multiple times will cause data lose.
Expand All @@ -39,12 +39,12 @@ public interface AsyncCollector<OUT> {
*
* @param result A list of results.
*/
void collect(Collection<OUT> result);
void complete(Collection<OUT> result);

/**
* Set error.
* Completes the result future exceptionally with an exception.
*
* @param error A Throwable object.
*/
void collect(Throwable error);
void completeExceptionally(Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -85,7 +84,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
}

@Override
public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

// -----------------------------------------------------------------------------------------
// Wrapper classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
Expand Down Expand Up @@ -57,7 +57,7 @@

/**
* The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
* the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}.
* the operator creates an {@link ResultFuture} which is passed to an {@link AsyncFunction}.
* Within the async function, the user can complete the async collector arbitrarily. Once the async
* collector has been completed, the result is emitted by the operator's emitter to downstream
* operators.
Expand Down Expand Up @@ -209,7 +209,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
streamRecordBufferEntry.collect(
streamRecordBufferEntry.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Emitter(

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

this.timestampedCollector = new TimestampedCollector<>(this.output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
* as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
* as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The
* async function completes this class with a collection of results.
*
* @param <OUT> Type of the asynchronous collection result
*/
@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

/** Timestamp information. */
private final boolean hasTimestamp;
Expand Down Expand Up @@ -74,12 +74,12 @@ protected CompletableFuture<Collection<OUT>> getFuture() {
}

@Override
public void collect(Collection<OUT> result) {
public void complete(Collection<OUT> result) {
resultFuture.complete(result);
}

@Override
public void collect(Throwable error) {
public void completeExceptionally(Throwable error) {
resultFuture.completeExceptionally(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;

import org.junit.Test;

Expand All @@ -55,7 +54,7 @@ public void testIterationRuntimeContext() throws Exception {
private static final long serialVersionUID = -2023923961609455894L;

@Override
public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op
}
};
Expand Down Expand Up @@ -94,7 +93,7 @@ public void testRuntimeContext() throws Exception {
private static final long serialVersionUID = 1707630162838967972L;

@Override
public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op
}
};
Expand Down
Loading

0 comments on commit 40cec17

Please sign in to comment.