Skip to content

Commit

Permalink
[FLINK-4660] Allow ProcessFunction on DataStream
Browse files Browse the repository at this point in the history
Introduce new ProcessOperator for this. Rename the pre-existing
ProcessOperator to KeyedProcessOperator.
  • Loading branch information
aljoscha committed Mar 6, 2017
1 parent 82eddca commit 0228676
Show file tree
Hide file tree
Showing 10 changed files with 747 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
Expand All @@ -59,6 +60,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
Expand Down Expand Up @@ -555,6 +557,60 @@ public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMappe

}

/**
* Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
processFunction,
ProcessFunction.class,
false,
true,
getType(),
Utils.getCallLocationName(),
true);

return process(processFunction, outType);
}

/**
* Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {

ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));

return transform("Process", outputType, operator);
}

/**
* Applies a Filter transformation on a {@link DataStream}. The
* transformation calls a {@link FilterFunction} for each element of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
Expand Down Expand Up @@ -187,6 +187,7 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
*
* @return The transformed {@link DataStream}.
*/
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

Expand Down Expand Up @@ -219,13 +220,14 @@ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFu
*
* @return The transformed {@link DataStream}.
*/
@Override
@Internal
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {

ProcessOperator<KEY, T, R> operator =
new ProcessOperator<>(clean(processFunction));
KeyedProcessOperator<KEY, T, R> operator =
new KeyedProcessOperator<>(clean(processFunction));

return transform("Process", outputType, operator);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

@Internal
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {

private static final long serialVersionUID = 1L;

private transient TimestampedCollector<OUT> collector;

private transient ContextImpl<IN, OUT> context;

private transient OnTimerContextImpl<IN, OUT> onTimerContext;

public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
super(function);

chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);

InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

TimerService timerService = new SimpleTimerService(internalTimerService);

context = new ContextImpl<>(userFunction, timerService);
onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
}

@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}

@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}

private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {

private final TimerService timerService;

private StreamRecord<IN> element;

ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}

@Override
public Long timestamp() {
checkState(element != null);

if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}

@Override
public TimerService timerService() {
return timerService;
}
}

private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{

private final TimerService timerService;

private TimeDomain timeDomain;

private InternalTimer<?, VoidNamespace> timer;

OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}

@Override
public TimeDomain timeDomain() {
checkState(timeDomain != null);
return timeDomain;
}

@Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
}

@Override
public TimerService timerService() {
return timerService;
}
}
}
Loading

0 comments on commit 0228676

Please sign in to comment.