Skip to content

Commit

Permalink
[FLINK-6255] [cep] Remove PatternStream.getSideOutput().
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed May 17, 2017
1 parent 02ea418 commit 05ad87f
Show file tree
Hide file tree
Showing 12 changed files with 13 additions and 268 deletions.
41 changes: 1 addition & 40 deletions docs/dev/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,46 +806,7 @@ in event time.

To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes
*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
seen watermark. Late elements are not further processed but they can be redirected to a [side output]
({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.

To access the stream of late elements, you first need to specify that you want to get the late data using
`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
so, the late elements will be silently dropped. Then, you can get the side-output stream using the
`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in
the `.sideOutputLateData(OutputTag)`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

PatternStream<T> patternStream = CEP.pattern(...)
.sideOutputLateData(lateOutputTag);

// main output with matches
DataStream<O> result = patternStream.select(...)

// side output containing the late events
DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val lateOutputTag = OutputTag[T]("late-data")

val patternStream: PatternStream[T] = CEP.pattern(...)
.sideOutputLateData(lateOutputTag)

// main output with matches
val result = patternStream.select(...)

// side output containing the late events
val lateStream = patternStream.getSideOutput(lateOutputTag)
{% endhighlight %}
</div>
</div>
seen watermark. Late elements are not further processed.

## Examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.pattern.{Pattern => JPattern}
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
import org.apache.flink.util.{Collector, OutputTag}
import org.apache.flink.util.Collector
import org.apache.flink.types.{Either => FEither}
import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
import java.lang.{Long => JLong}

import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern

Expand All @@ -47,23 +46,8 @@ import scala.collection.mutable
*/
class PatternStream[T](jPatternStream: JPatternStream[T]) {

private[flink] var lateDataOutputTag: OutputTag[T] = null

private[flink] def wrappedPatternStream = jPatternStream


/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. The
* CEP library assumes correctness of the watermark, so an element is considered late if its
* timestamp is smaller than the last received watermark.
*/
@PublicEvolving
def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = {
jPatternStream.sideOutputLateData(outputTag)
lateDataOutputTag = outputTag
this
}

def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])

def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
Expand Down Expand Up @@ -110,8 +94,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {

val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern(),
lateDataOutputTag)
jPatternStream.getPattern())

val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
Expand Down Expand Up @@ -176,8 +159,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
: DataStream[Either[L, R]] = {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern(),
lateDataOutputTag
jPatternStream.getPattern()
)

val cleanedSelect = cleanClosure(patternFlatSelectFunction)
Expand Down Expand Up @@ -338,17 +320,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {

flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
}

/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @param tag The tag identifying a specific side output.
*/
@PublicEvolving
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
asScalaStream(jPatternStream.getSideOutput(tag))
}
}

object PatternStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import java.util.List;
import java.util.Map;
Expand All @@ -54,19 +52,6 @@ public class PatternStream<T> {

private final Pattern<T, ?> pattern;

/**
* A reference to the created pattern stream used to get
* the registered side outputs, e.g late elements side output.
*/
private SingleOutputStreamOperator<?> patternStream;

/**
* {@link OutputTag} to use for late arriving events. Elements for which
* {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
* be emitted to this.
*/
private OutputTag<T> lateDataOutputTag;

PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
Expand All @@ -80,22 +65,6 @@ public DataStream<T> getInputStream() {
return inputStream;
}

/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. The
* CEP library assumes correctness of the watermark, so an element is considered late if its
* timestamp is smaller than the last received watermark.
*/
public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
Preconditions.checkArgument(lateDataOutputTag == null,
"The late side output tag has already been initialized to " + lateDataOutputTag + ".");
Preconditions.checkArgument(patternStream == null,
"The late side output tag has to be set before calling select() or flatSelect().");

this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
return this;
}

/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
Expand Down Expand Up @@ -137,8 +106,7 @@ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R
*/
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;
CEPOperatorUtils.createPatternStream(inputStream, pattern);

return patternStream.map(
new PatternSelectMapper<>(
Expand Down Expand Up @@ -169,8 +137,7 @@ public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
final PatternSelectFunction<T, R> patternSelectFunction) {

SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
Expand Down Expand Up @@ -240,8 +207,7 @@ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunct
*/
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;
CEPOperatorUtils.createPatternStream(inputStream, pattern);

return patternStream.flatMap(
new PatternFlatSelectMapper<>(
Expand Down Expand Up @@ -273,8 +239,7 @@ public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {

SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
this.patternStream = patternStream;
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
Expand Down Expand Up @@ -304,18 +269,6 @@ public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
).returns(outTypeInfo);
}

/**
* Gets the {@link DataStream} that contains the elements that are emitted from an operation
* into the side output with the given {@link OutputTag}.
*
* @param sideOutputTag The tag identifying a specific side output.
*/
public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
"To have the late element side output, you have to first define the main output using select() or flatSelect().");
return patternStream.getSideOutput(sideOutputTag);
}

/**
* Wrapper for a {@link PatternSelectFunction}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Migration;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
Expand Down Expand Up @@ -98,13 +97,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>

private transient InternalTimerService<VoidNamespace> timerService;

/**
* {@link OutputTag} to use for late arriving events. Elements for which
* {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
* be emitted to this.
*/
private final OutputTag<IN> lateDataOutputTag;

/**
* The last seen watermark. This will be used to
* decide if an incoming element is late or not.
Expand All @@ -123,7 +115,6 @@ public AbstractKeyedCEPPatternOperator(
final KeySelector<IN, KEY> keySelector,
final TypeSerializer<KEY> keySerializer,
final NFACompiler.NFAFactory<IN> nfaFactory,
final OutputTag<IN> lateDataOutputTag,
final boolean migratingFromOldKeyedOperator) {

this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
Expand All @@ -132,7 +123,6 @@ public AbstractKeyedCEPPatternOperator(
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);

this.lateDataOutputTag = lateDataOutputTag;
this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
}

Expand Down Expand Up @@ -203,8 +193,6 @@ public void processElement(StreamRecord<IN> element) throws Exception {
priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
} else {
sideOutputLateElement(element);
}
}
}
Expand Down Expand Up @@ -266,18 +254,6 @@ private void updateLastSeenWatermark(long timestamp) {
this.lastWatermark = timestamp;
}

/**
* Puts the provided late element in the dedicated side output,
* if the user has specified one.
*
* @param element The late element.
*/
private void sideOutputLateElement(StreamRecord<IN> element) {
if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
}
}

private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
return nfa != null ? nfa : nfaFactory.createNFA();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;
Expand All @@ -49,7 +48,7 @@ public class CEPOperatorUtils {
* @return Data stream containing fully matched event sequences stored in a {@link Map}. The
* events are indexed by their associated names of the pattern.
*/
public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

// check whether we use processing time
Expand All @@ -76,7 +75,6 @@ public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatt
keySelector,
keySerializer,
nfaFactory,
lateDataOutputTag,
true));
} else {

Expand All @@ -92,7 +90,6 @@ public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatt
keySelector,
keySerializer,
nfaFactory,
lateDataOutputTag,
false
)).forceNonParallel();
}
Expand All @@ -110,7 +107,7 @@ public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatt
* a {@link Either} instance.
*/
public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> createTimeoutPatternStream(
DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
DataStream<T> inputStream, Pattern<T, ?> pattern) {

final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());

Expand Down Expand Up @@ -142,7 +139,6 @@ public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T
keySelector,
keySerializer,
nfaFactory,
lateDataOutputTag,
true));
} else {

Expand All @@ -158,7 +154,6 @@ public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T
keySelector,
keySerializer,
nfaFactory,
lateDataOutputTag,
false
)).forceNonParallel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;

import java.util.Collection;
import java.util.Iterator;
Expand All @@ -48,10 +47,9 @@ public KeyedCEPPatternOperator(
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {

super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.flink.util.OutputTag;

import java.util.Collection;
import java.util.List;
Expand All @@ -48,10 +47,9 @@ public TimeoutKeyedCEPPatternOperator(
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {

super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}

@Override
Expand Down
Loading

0 comments on commit 05ad87f

Please sign in to comment.