diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index e0659da8095b3..a7d60a6e0a8fc 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -43,8 +43,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Assert; import org.junit.Test; @@ -68,7 +68,7 @@ import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class}) +@PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, StreamRecord.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class BoltWrapperTest extends AbstractTest { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index 10bb6ffbd40c0..455e86438a1d4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -46,7 +46,7 @@ abstract public class AbstractCEPPatternOperator extends AbstractCEPBasePatternOperator { private static final long serialVersionUID = 7487334510746595640L; - private final MultiplexingStreamRecordSerializer streamRecordSerializer; + private final StreamElementSerializer streamRecordSerializer; // global nfa for all elements private NFA nfa; @@ -60,7 +60,7 @@ public AbstractCEPPatternOperator( NFACompiler.NFAFactory nfaFactory) { super(inputSerializer, isProcessingTime); - this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); + this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer); this.nfa = nfaFactory.createNFA(); } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 07e26627cda31..c3898c38f8e2d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.IOException; @@ -108,7 +108,7 @@ public void open() throws Exception { @SuppressWarnings("unchecked,rawtypes") TypeSerializer> streamRecordSerializer = - (TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer()); + (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); if (priorityQueueOperatorState == null) { priorityQueueOperatorState = getPartitionedState( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 6b09f3c4abe16..e77b5c8ac8218 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -49,8 +50,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * A {@code AllWindowedStream} represents a data stream where the stream of @@ -269,8 +270,12 @@ public SingleOutputStreamOperator apply(AllWindowFunction functi WindowOperator, R, W> operator; if (evictor != null) { - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -357,8 +362,12 @@ public SingleOutputStreamOperator apply(ReduceFunction reduceFunction, OneInputStreamOperator operator; if (evictor != null) { - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -450,9 +459,12 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction operator; if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index ae9861904405e..15ec5f1b63a43 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -56,8 +57,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * A {@code WindowedStream} represents a data stream where elements are grouped by @@ -290,8 +291,12 @@ public SingleOutputStreamOperator apply(WindowFunction functi WindowOperator, R, W> operator; if (evictor != null) { - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -378,8 +383,12 @@ public SingleOutputStreamOperator apply(ReduceFunction reduceFunction, OneInputStreamOperator operator; if (evictor != null) { - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); + + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; @@ -471,9 +480,12 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction operator; if (evictor != null) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer> streamRecordSerializer = + (TypeSerializer>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); - ListStateDescriptor> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + ListStateDescriptor> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 9f046f659a1bd..c3ef46494f2b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -28,9 +28,8 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,8 +47,7 @@ public class RecordWriterOutput implements Output> { @SuppressWarnings("unchecked") public RecordWriterOutput( StreamRecordWriter>> recordWriter, - TypeSerializer outSerializer, - boolean enableMultiplexing) { + TypeSerializer outSerializer) { checkNotNull(recordWriter); @@ -58,13 +56,8 @@ public RecordWriterOutput( this.recordWriter = (StreamRecordWriter>) (StreamRecordWriter) recordWriter; - TypeSerializer outRecordSerializer; - if (enableMultiplexing) { - outRecordSerializer = new MultiplexingStreamRecordSerializer(outSerializer); - } else { - outRecordSerializer = (TypeSerializer) - (TypeSerializer) new StreamRecordSerializer(outSerializer); - } + TypeSerializer outRecordSerializer = + new StreamElementSerializer<>(outSerializer); if (outSerializer != null) { serializationDelegate = new SerializationDelegate(outRecordSerializer); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 47e55dcc54c13..92b155639ed69 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -42,9 +42,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. @@ -88,8 +87,7 @@ public StreamInputProcessor( TypeSerializer inputSerializer, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager, - boolean enableMultiplexing) throws IOException { + IOManager ioManager) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -107,15 +105,9 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer); - this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); - } else { - StreamRecordSerializer ser = new StreamRecordSerializer(inputSerializer); - this.deserializationDelegate = (NonReusingDeserializationDelegate) - (NonReusingDeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); - } - + StreamElementSerializer ser = new StreamElementSerializer<>(inputSerializer); + this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); + // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a25c1a195e229..660f07e80c6db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -39,8 +39,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import java.io.IOException; import java.util.Arrays; @@ -95,8 +94,7 @@ public StreamTwoInputProcessor( TypeSerializer inputSerializer2, StatefulTask checkpointedTask, CheckpointingMode checkpointMode, - IOManager ioManager, - boolean enableMultiplexing) throws IOException { + IOManager ioManager) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -114,25 +112,11 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { this.barrierHandler.registerCheckpointEventHandler(checkpointedTask); } - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer1); - this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser); - } - else { - StreamRecordSerializer ser = new StreamRecordSerializer<>(inputSerializer1); - this.deserializationDelegate1 = (DeserializationDelegate) - (DeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); - } - - if (enableMultiplexing) { - MultiplexingStreamRecordSerializer ser = new MultiplexingStreamRecordSerializer<>(inputSerializer2); - this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser); - } - else { - StreamRecordSerializer ser = new StreamRecordSerializer<>(inputSerializer2); - this.deserializationDelegate2 = (DeserializationDelegate) - (DeserializationDelegate) new NonReusingDeserializationDelegate<>(ser); - } + StreamElementSerializer ser1 = new StreamElementSerializer<>(inputSerializer1); + this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1); + + StreamElementSerializer ser2 = new StreamElementSerializer<>(inputSerializer2); + this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2); // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java similarity index 88% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 95e3ebda69931..66d32da40f344 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -29,14 +29,16 @@ import static java.util.Objects.requireNonNull; /** - * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal - * {@link TypeSerializer}, instead, this is only used at the stream task/operator level for - * transmitting StreamRecords and Watermarks. + * Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}. + * + *

+ * This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the + * stream task/operator level for transmitting StreamRecords and Watermarks. * * @param The type of value in the StreamRecord */ @Internal -public final class MultiplexingStreamRecordSerializer extends TypeSerializer { +public final class StreamElementSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; @@ -49,8 +51,8 @@ public final class MultiplexingStreamRecordSerializer extends TypeSerializer< private final TypeSerializer typeSerializer; - public MultiplexingStreamRecordSerializer(TypeSerializer serializer) { - if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) { + public StreamElementSerializer(TypeSerializer serializer) { + if (serializer instanceof StreamElementSerializer) { throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); } this.typeSerializer = requireNonNull(serializer); @@ -70,9 +72,9 @@ public boolean isImmutableType() { } @Override - public MultiplexingStreamRecordSerializer duplicate() { + public StreamElementSerializer duplicate() { TypeSerializer copy = typeSerializer.duplicate(); - return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer(copy); + return (copy == typeSerializer) ? this : new StreamElementSerializer(copy); } // ------------------------------------------------------------------------ @@ -231,8 +233,8 @@ else if (tag == TAG_LATENCY_MARKER) { @Override public boolean equals(Object obj) { - if (obj instanceof MultiplexingStreamRecordSerializer) { - MultiplexingStreamRecordSerializer other = (MultiplexingStreamRecordSerializer) obj; + if (obj instanceof StreamElementSerializer) { + StreamElementSerializer other = (StreamElementSerializer) obj; return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); } else { @@ -242,7 +244,7 @@ public boolean equals(Object obj) { @Override public boolean canEqual(Object obj) { - return obj instanceof MultiplexingStreamRecordSerializer; + return obj instanceof StreamElementSerializer; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java deleted file mode 100644 index 71b43fe82b38e..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecordWARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import java.io.IOException; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; - -/** - * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with - * the element. - * - *

- * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also - * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same - * stream with {@link StreamRecord StreamRecords}. - * - * @see MultiplexingStreamRecordSerializer - * - * @param The type of value in the {@link StreamRecord} - */ -@Internal -public final class StreamRecordSerializer extends TypeSerializer> { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer typeSerializer; - - - public StreamRecordSerializer(TypeSerializer serializer) { - if (serializer instanceof StreamRecordSerializer) { - throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); - } - this.typeSerializer = Preconditions.checkNotNull(serializer); - } - - public TypeSerializer getContainedTypeSerializer() { - return this.typeSerializer; - } - - // ------------------------------------------------------------------------ - // General serializer and type utils - // ------------------------------------------------------------------------ - - @Override - public StreamRecordSerializer duplicate() { - TypeSerializer serializerCopy = typeSerializer.duplicate(); - return serializerCopy == typeSerializer ? this : new StreamRecordSerializer(serializerCopy); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public int getLength() { - return typeSerializer.getLength(); - } - - // ------------------------------------------------------------------------ - // Type serialization, copying, instantiation - // ------------------------------------------------------------------------ - - @Override - public StreamRecord createInstance() { - try { - return new StreamRecord(typeSerializer.createInstance()); - } catch (Exception e) { - throw new RuntimeException("Cannot instantiate StreamRecord.", e); - } - } - - @Override - public StreamRecord copy(StreamRecord from) { - return from.copy(typeSerializer.copy(from.getValue())); - } - - @Override - public StreamRecord copy(StreamRecord from, StreamRecord reuse) { - from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse); - return reuse; - } - - @Override - public void serialize(StreamRecord value, DataOutputView target) throws IOException { - typeSerializer.serialize(value.getValue(), target); - } - - @Override - public StreamRecord deserialize(DataInputView source) throws IOException { - return new StreamRecord(typeSerializer.deserialize(source)); - } - - @Override - public StreamRecord deserialize(StreamRecord reuse, DataInputView source) throws IOException { - T element = typeSerializer.deserialize(reuse.getValue(), source); - reuse.replace(element); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - typeSerializer.copy(source, target); - } - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - if (obj instanceof StreamRecordSerializer) { - StreamRecordSerializer other = (StreamRecordSerializer) obj; - - return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof StreamRecordSerializer; - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 97546b83551da..2e73e42483eaf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -45,8 +45,7 @@ public void init() throws Exception { inputProcessor = new StreamInputProcessor(inputGates, inSerializer, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - isSerializingMixedStream()); + getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7342b6d86988b..d02b066e9cd2e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -77,7 +77,6 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); - final boolean enableMultiplexing = containingTask.isSerializingMixedStream(); headOperator = configuration.getStreamOperator(userCodeClassloader); @@ -99,7 +98,7 @@ public OperatorChain(StreamTask containingTask, AccumulatorRegistry.Rep RecordWriterOutput streamOutput = createStreamOutput( outEdge, chainedConfigs.get(outEdge.getSourceId()), i, - containingTask.getEnvironment(), enableMultiplexing, reporter, containingTask.getName()); + containingTask.getEnvironment(), reporter, containingTask.getName()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); @@ -305,7 +304,7 @@ private static Output> createChainedOperator( private static RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, - Environment taskEnvironment, boolean enableMultiplexing, + Environment taskEnvironment, AccumulatorRegistry.Reporter reporter, String taskName) { TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); @@ -322,7 +321,7 @@ private static RecordWriterOutput createStreamOutput( output.setReporter(reporter); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup()); - return new RecordWriterOutput<>(output, outSerializer, enableMultiplexing); + return new RecordWriterOutput<>(output, outSerializer); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index eb5fde71e3ff7..77efc7b60a96f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -457,14 +457,6 @@ boolean isSerializingTimestamps() { return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime; } - /** - * Check if the tasks is sending a mixed stream (of watermarks, latency marks and records) - * @return true if stream contains more than just records - */ - protected boolean isSerializingMixedStream() { - return isSerializingTimestamps() || getExecutionConfig().isLatencyTrackingEnabled(); - } - // ------------------------------------------------------------------------ // Access to properties and utilities // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index bc80607baebc5..d695781977625 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -70,8 +70,7 @@ public void init() throws Exception { inputDeserializer1, inputDeserializer2, this, configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - isSerializingMixedStream()); + getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 1187fe6e205a8..322a0f01383f0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -86,7 +86,7 @@ private void setupInputChannels() throws IOException, InterruptedException { final int channelIndex = i; final RecordSerializer> recordSerializer = new SpanningRecordSerializer>(); final SerializationDelegate delegate = (SerializationDelegate) (SerializationDelegate) - new SerializationDelegate(new MultiplexingStreamRecordSerializer(serializer)); + new SerializationDelegate(new StreamElementSerializer(serializer)); inputQueues[channelIndex] = new ConcurrentLinkedQueue>(); inputChannels[channelIndex] = new TestInputChannel(inputGate, i); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java index 8f3af15247c73..2e3d09081c02f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -39,8 +40,8 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -66,9 +67,12 @@ public void testCountTrigger() throws Exception { TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer>> streamRecordSerializer = + (TypeSerializer>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); - ListStateDescriptor>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + ListStateDescriptor>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator, Tuple2, GlobalWindow> operator = new EvictingWindowOperator<>( @@ -135,8 +139,12 @@ public void testCountTriggerWithApply() throws Exception { TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - ListStateDescriptor>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer>> streamRecordSerializer = + (TypeSerializer>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator, Tuple2, GlobalWindow> operator = new EvictingWindowOperator<>( @@ -203,8 +211,12 @@ public void testTumblingWindowWithApply() throws Exception { TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); - ListStateDescriptor>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig()))); + @SuppressWarnings({"unchecked", "rawtypes"}) + TypeSerializer>> streamRecordSerializer = + (TypeSerializer>>) new StreamElementSerializer(inputType.createSerializer(new ExecutionConfig())); + + ListStateDescriptor>> stateDesc = + new ListStateDescriptor<>("window-contents", streamRecordSerializer); EvictingWindowOperator, Tuple2, TimeWindow> operator = new EvictingWindowOperator<>( TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java similarity index 86% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java index 1f0bf5ae1fd30..0f42a659bc552 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class MultiplexingStreamRecordSerializerTest { +public class StreamElementSerializerTest { @Test public void testDeepDuplication() { @@ -48,20 +48,20 @@ public void testDeepDuplication() { when(serializer1.duplicate()).thenReturn(serializer2); - MultiplexingStreamRecordSerializer streamRecSer = - new MultiplexingStreamRecordSerializer(serializer1); + StreamElementSerializer streamRecSer = + new StreamElementSerializer(serializer1); assertEquals(serializer1, streamRecSer.getContainedTypeSerializer()); - MultiplexingStreamRecordSerializer copy = streamRecSer.duplicate(); + StreamElementSerializer copy = streamRecSer.duplicate(); assertNotEquals(copy, streamRecSer); assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer()); } @Test public void testBasicProperties() { - MultiplexingStreamRecordSerializer streamRecSer = - new MultiplexingStreamRecordSerializer(LongSerializer.INSTANCE); + StreamElementSerializer streamRecSer = + new StreamElementSerializer(LongSerializer.INSTANCE); assertFalse(streamRecSer.isImmutableType()); assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass()); @@ -70,8 +70,8 @@ public void testBasicProperties() { @Test public void testSerialization() throws Exception { - final MultiplexingStreamRecordSerializer serializer = - new MultiplexingStreamRecordSerializer(StringSerializer.INSTANCE); + final StreamElementSerializer serializer = + new StreamElementSerializer(StringSerializer.INSTANCE); StreamRecord withoutTimestamp = new StreamRecord<>("test 1 2 分享基督耶穌的愛給們,開拓雙贏!"); assertEquals(withoutTimestamp, serializeAndDeserialize(withoutTimestamp, serializer)); @@ -92,7 +92,7 @@ public void testSerialization() throws Exception { @SuppressWarnings("unchecked") private static X serializeAndDeserialize( X record, - MultiplexingStreamRecordSerializer serializer) throws IOException { + StreamElementSerializer serializer) throws IOException { DataOutputSerializer output = new DataOutputSerializer(32); serializer.serialize(record, output); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java deleted file mode 100644 index bdeb55259cea9..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.streamrecord; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; -import org.junit.Test; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class StreamRecordSerializerTest { - - @Test - public void testDeepDuplication() { - try { - @SuppressWarnings("unchecked") - TypeSerializer serializer1 = (TypeSerializer) mock(TypeSerializer.class); - @SuppressWarnings("unchecked") - TypeSerializer serializer2 = (TypeSerializer) mock(TypeSerializer.class); - - when(serializer1.duplicate()).thenReturn(serializer2); - - StreamRecordSerializer streamRecSer = new StreamRecordSerializer(serializer1); - assertEquals(serializer1, streamRecSer.getContainedTypeSerializer()); - - StreamRecordSerializer copy = streamRecSer.duplicate(); - assertNotEquals(copy, streamRecSer); - assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testBasicProperties() { - try { - StreamRecordSerializer streamRecSer = new StreamRecordSerializer(LongSerializer.INSTANCE); - - assertFalse(streamRecSer.isImmutableType()); - assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass()); - assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength()); - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testDeserializedValuesHaveNoTimestamps() throws Exception { - final StreamRecord original = new StreamRecord<>(42L); - - StreamRecordSerializer streamRecSer = new StreamRecordSerializer(LongSerializer.INSTANCE); - - DataOutputSerializer buffer = new DataOutputSerializer(16); - streamRecSer.serialize(original, buffer); - - DataInputDeserializer input = new DataInputDeserializer(buffer.getByteArray(), 0, buffer.length()); - StreamRecord result = streamRecSer.deserialize(input); - - assertFalse(result.hasTimestamp()); - assertEquals(original, result); - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index cbb5a9ddf4dca..ce62624eead35 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.junit.Assert; @@ -110,7 +110,7 @@ public StreamTaskTestHarness(AbstractInvokable task, TypeInformation output streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); outputSerializer = outputType.createSerializer(executionConfig); - outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer(outputSerializer); + outputStreamRecordSerializer = new StreamElementSerializer(outputSerializer); } public TimeServiceProvider getTimerService() {