Skip to content

Commit

Permalink
[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer
Browse files Browse the repository at this point in the history
This also renames MultiplexingStreamRecordSerializer to
StreamElementSerializer.
  • Loading branch information
aljoscha committed Oct 21, 2016
1 parent 7398fdb commit 71d2e3e
Show file tree
Hide file tree
Showing 19 changed files with 111 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -68,7 +68,7 @@
import static org.mockito.Mockito.when;

@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
@PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, StreamRecord.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class BoltWrapperTest extends AbstractTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

Expand All @@ -46,7 +46,7 @@
abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
private static final long serialVersionUID = 7487334510746595640L;

private final MultiplexingStreamRecordSerializer<IN> streamRecordSerializer;
private final StreamElementSerializer<IN> streamRecordSerializer;

// global nfa for all elements
private NFA<IN> nfa;
Expand All @@ -60,7 +60,7 @@ public AbstractCEPPatternOperator(
NFACompiler.NFAFactory<IN> nfaFactory) {
super(inputSerializer, isProcessingTime);

this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer);
this.nfa = nfaFactory.createNFA();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void open() throws Exception {

@SuppressWarnings("unchecked,rawtypes")
TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
(TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer());
(TypeSerializer) new StreamElementSerializer<>(getInputSerializer());

if (priorityQueueOperatorState == null) {
priorityQueueOperatorState = getPartitionedState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand All @@ -49,8 +50,8 @@
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

/**
* A {@code AllWindowedStream} represents a data stream where the stream of
Expand Down Expand Up @@ -269,8 +270,12 @@ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> functi
WindowOperator<Byte, T, Iterable<T>, R, W> operator;

if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down Expand Up @@ -357,8 +362,12 @@ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction,
OneInputStreamOperator<T, R> operator;

if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down Expand Up @@ -450,9 +459,12 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
OneInputStreamOperator<T, R> operator;

if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -56,8 +57,8 @@
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

/**
* A {@code WindowedStream} represents a data stream where elements are grouped by
Expand Down Expand Up @@ -290,8 +291,12 @@ public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> functi
WindowOperator<K, T, Iterable<T>, R, W> operator;

if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down Expand Up @@ -378,8 +383,12 @@ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction,
OneInputStreamOperator<T, R> operator;

if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down Expand Up @@ -471,9 +480,12 @@ public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R
OneInputStreamOperator<T, R> operator;

if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);

opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -48,8 +47,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
@SuppressWarnings("unchecked")
public RecordWriterOutput(
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
boolean enableMultiplexing) {
TypeSerializer<OUT> outSerializer) {

checkNotNull(recordWriter);

Expand All @@ -58,13 +56,8 @@ public RecordWriterOutput(
this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
(StreamRecordWriter<?>) recordWriter;

TypeSerializer<StreamElement> outRecordSerializer;
if (enableMultiplexing) {
outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
} else {
outRecordSerializer = (TypeSerializer<StreamElement>)
(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
}
TypeSerializer<StreamElement> outRecordSerializer =
new StreamElementSerializer<>(outSerializer);

if (outSerializer != null) {
serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
Expand Down Expand Up @@ -88,8 +87,7 @@ public StreamInputProcessor(
TypeSerializer<IN> inputSerializer,
StatefulTask checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableMultiplexing) throws IOException {
IOManager ioManager) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

Expand All @@ -107,15 +105,9 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
}

if (enableMultiplexing) {
MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);
} else {
StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
}

StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);

// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -95,8 +94,7 @@ public StreamTwoInputProcessor(
TypeSerializer<IN2> inputSerializer2,
StatefulTask checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableMultiplexing) throws IOException {
IOManager ioManager) throws IOException {

final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

Expand All @@ -114,25 +112,11 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
}

if (enableMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser);
}
else {
StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<>(inputSerializer1);
this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
}

if (enableMultiplexing) {
MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser);
}
else {
StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<>(inputSerializer2);
this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
}
StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1);

StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2);

// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import static java.util.Objects.requireNonNull;

/**
* Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal
* {@link TypeSerializer}, instead, this is only used at the stream task/operator level for
* transmitting StreamRecords and Watermarks.
* Serializer for {@link StreamRecord}, {@link Watermark} and {@link LatencyMarker}.
*
* <p>
* This does not behave like a normal {@link TypeSerializer}, instead, this is only used at the
* stream task/operator level for transmitting StreamRecords and Watermarks.
*
* @param <T> The type of value in the StreamRecord
*/
@Internal
public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
public final class StreamElementSerializer<T> extends TypeSerializer<StreamElement> {

private static final long serialVersionUID = 1L;

Expand All @@ -49,8 +51,8 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
private final TypeSerializer<T> typeSerializer;


public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
public StreamElementSerializer(TypeSerializer<T> serializer) {
if (serializer instanceof StreamElementSerializer) {
throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
}
this.typeSerializer = requireNonNull(serializer);
Expand All @@ -70,9 +72,9 @@ public boolean isImmutableType() {
}

@Override
public MultiplexingStreamRecordSerializer<T> duplicate() {
public StreamElementSerializer<T> duplicate() {
TypeSerializer<T> copy = typeSerializer.duplicate();
return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
return (copy == typeSerializer) ? this : new StreamElementSerializer<T>(copy);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -231,8 +233,8 @@ else if (tag == TAG_LATENCY_MARKER) {

@Override
public boolean equals(Object obj) {
if (obj instanceof MultiplexingStreamRecordSerializer) {
MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
if (obj instanceof StreamElementSerializer) {
StreamElementSerializer<?> other = (StreamElementSerializer<?>) obj;

return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
} else {
Expand All @@ -242,7 +244,7 @@ public boolean equals(Object obj) {

@Override
public boolean canEqual(Object obj) {
return obj instanceof MultiplexingStreamRecordSerializer;
return obj instanceof StreamElementSerializer;
}

@Override
Expand Down
Loading

0 comments on commit 71d2e3e

Please sign in to comment.