Skip to content

Commit

Permalink
Revert "[FLINK-22881] Revert toggling IDLE/ACTIVE on records in IDLE …
Browse files Browse the repository at this point in the history
…state"

This reverts commit 2c260b5.
  • Loading branch information
xintongsong committed Jun 22, 2021
1 parent 55eb22b commit 8758b21
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.util.OutputTag;
Expand All @@ -53,7 +54,7 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str

private final WatermarkGauge watermarkGauge = new WatermarkGauge();

private StreamStatus announcedStatus = StreamStatus.ACTIVE;
private final AnnouncedStatus announcedStatus = new AnnouncedStatus(StreamStatus.ACTIVE);

@SuppressWarnings("unchecked")
public RecordWriterOutput(
Expand Down Expand Up @@ -97,9 +98,10 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
}

private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);

try {
// record could've been generated somewhere in the pipeline even though an IDLE status was
// emitted. It might've originated from a timer or just a wrong behaving operator
try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) {
serializationDelegate.setInstance(record);
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
Expand All @@ -108,14 +110,12 @@ private <X> void pushToRecordWriter(StreamRecord<X> record) {

@Override
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}

watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);

try {
// watermark could've been generated somewhere in the pipeline even though an IDLE status
// was emitted. It might've originated from a periodic watermark generator or just a wrong
// behaving operator
try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
Expand All @@ -124,14 +124,18 @@ public void emitWatermark(Watermark mark) {

@Override
public void emitStreamStatus(StreamStatus streamStatus) {
if (!announcedStatus.equals(streamStatus)) {
announcedStatus = streamStatus;
serializationDelegate.setInstance(streamStatus);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
if (!announcedStatus.getCurrentStatus().equals(streamStatus)) {
announcedStatus.setCurrentStatus(streamStatus);
writeStreamStatus(streamStatus);
}
}

private void writeStreamStatus(StreamStatus streamStatus) {
serializationDelegate.setInstance(streamStatus);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamstatus;

import org.apache.flink.annotation.Internal;

import java.util.function.Consumer;

/**
* {@link StreamStatus#IDLE} requires that no records nor watermarks travel through the branch. In
* order to keep the older behaviour that records could've been generated down the pipeline even
* though the sources were idle we go through a short ACTIVE/IDLE loop. This is a helper class that
* lets you easily flip the status around a code block.
*/
@Internal
public final class AnnouncedStatus {
private StreamStatus currentStatus;

public AnnouncedStatus(StreamStatus currentStatus) {
this.currentStatus = currentStatus;
}

public StreamStatus getCurrentStatus() {
return currentStatus;
}

public void setCurrentStatus(StreamStatus currentStatus) {
this.currentStatus = currentStatus;
}

/**
* Makes sure that the last emitted StreamStatus was ACTIVE.
*
* <p>Example usage:
*
* <pre>{@code
* try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) {
* serializationDelegate.setInstance(record);
* recordWriter.emit(serializationDelegate);
* } catch (Exception e) {
* throw new RuntimeException(e.getMessage(), e);
* }
* }</pre>
*
* @param statusConsumer a consumer which sends the status downstream
*/
public AutoCloseable ensureActive(Consumer<StreamStatus> statusConsumer) {
if (currentStatus.isIdle()) {
statusConsumer.accept(StreamStatus.ACTIVE);
return () -> statusConsumer.accept(StreamStatus.IDLE);
}
return () -> {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>>
protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
@Nullable protected final OutputTag<T> outputTag;
@Nullable protected final AutoCloseable closeable;
protected StreamStatus announcedStatus = StreamStatus.ACTIVE;

public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable OutputTag<T> outputTag) {
this(operator, (OperatorMetricGroup) operator.getMetricGroup(), outputTag, operator::close);
Expand Down Expand Up @@ -107,9 +106,6 @@ protected <X> void pushToOperator(StreamRecord<X> record) {

@Override
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}
try {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
input.processWatermark(mark);
Expand Down Expand Up @@ -145,13 +141,10 @@ public Gauge<Long> getWatermarkGauge() {

@Override
public void emitStreamStatus(StreamStatus streamStatus) {
if (!announcedStatus.equals(streamStatus)) {
announcedStatus = streamStatus;
try {
input.processStreamStatus(streamStatus);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
try {
input.processStreamStatus(streamStatus);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,12 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
// make source active once again, emit a watermark and go idle again.
addSourceRecords(testHarness, 1, initialTime + 10);

// FLIP-27 sources do not emit active status on new records, we wrap a record with
// ACTIVE/IDLE sequence
expectedOutput.add(StreamStatus.ACTIVE);
expectedOutput.add(
new StreamRecord<>("" + (initialTime + 10), TimestampAssigner.NO_TIMESTAMP));
expectedOutput.add(StreamStatus.IDLE);
expectedOutput.add(StreamStatus.ACTIVE); // activate source on new watermark
expectedOutput.add(new Watermark(initialTime + 10)); // forward W from source
expectedOutput.add(StreamStatus.IDLE); // go idle after reading all records
Expand Down
Loading

0 comments on commit 8758b21

Please sign in to comment.