Skip to content

Commit

Permalink
[refactor] Use BoundedMultiInput instead of OperatorChain in InputPro…
Browse files Browse the repository at this point in the history
…cessors

I replaced the OperatorChain in StreamOneInputProcessor and other InputProcessors with a BoundedMultiInput. Moroever I made the OperatorChain implement BoundedMultiInput interface. It makes it easier to instantiate StreamOneInputProcessor in tests without the need to instantiate a whole OperatorChain.
  • Loading branch information
dawidwys committed Oct 12, 2020
1 parent b1317fa commit 2aa8345
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void run(final Object lockingObject,
// in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
// so we still need the following call to end the input
synchronized (lockingObject) {
operatorChain.endMainOperatorInput(1);
operatorChain.endInput(1);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,16 +45,16 @@ public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
private final StreamTaskInput<IN> input;
private final DataOutput<IN> output;

private final OperatorChain<?, ?> operatorChain;
private final BoundedMultiInput endOfInputAware;

public StreamOneInputProcessor(
StreamTaskInput<IN> input,
DataOutput<IN> output,
OperatorChain<?, ?> operatorChain) {
BoundedMultiInput endOfInputAware) {

this.input = checkNotNull(input);
this.output = checkNotNull(output);
this.operatorChain = checkNotNull(operatorChain);
this.endOfInputAware = checkNotNull(endOfInputAware);
}

@Override
Expand All @@ -67,7 +67,7 @@ public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);

if (status == InputStatus.END_OF_INPUT) {
operatorChain.endMainOperatorInput(input.getInputIndex() + 1);
endOfInputAware.endInput(input.getInputIndex() + 1);
}

return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -35,7 +36,6 @@
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -86,7 +86,7 @@ public StreamTwoInputProcessor(
TwoInputSelectionHandler inputSelectionHandler,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
OperatorChain<?, ?> operatorChain,
BoundedMultiInput endOfInputAware,
Counter numRecordsIn) {

this.inputSelectionHandler = checkNotNull(inputSelectionHandler);
Expand All @@ -107,7 +107,7 @@ record -> processRecord1(record, streamOperator),
new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()),
0),
output1,
operatorChain
endOfInputAware
);

StreamTaskNetworkOutput<IN2> output2 = new StreamTaskNetworkOutput<>(
Expand All @@ -125,7 +125,7 @@ record -> processRecord2(record, streamOperator),
new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()),
1),
output2,
operatorChain
endOfInputAware
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.SourceInputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -86,7 +87,7 @@
* main operator.
*/
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer, BoundedMultiInput {

private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);

Expand Down Expand Up @@ -381,7 +382,8 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
*
* @param inputId the input ID starts from 1 which indicates the first input.
*/
public void endMainOperatorInput(int inputId) throws Exception {
@Override
public void endInput(int inputId) throws Exception {
if (mainOperatorWrapper != null) {
mainOperatorWrapper.endOperatorInput(inputId);
}
Expand Down

0 comments on commit 2aa8345

Please sign in to comment.