Skip to content

Commit

Permalink
[FLINK-4162] Fix Event Queue Serialization in Abstract(Keyed)CEPPatte…
Browse files Browse the repository at this point in the history
…rnOperator

Before, these were using StreamRecordSerializer, which does not serialize
timestamps. Now it uses MultiplexingStreamRecordSerializer.

This also extends the tests in CEPOperatorTest to test that timestamps
are correctly checkpointed/restored.
  • Loading branch information
aljoscha committed Jul 18, 2016
1 parent ac06146 commit 254379b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

import java.io.IOException;
Expand All @@ -46,7 +47,7 @@
abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
private static final long serialVersionUID = 7487334510746595640L;

private final StreamRecordSerializer<IN> streamRecordSerializer;
private final MultiplexingStreamRecordSerializer<IN> streamRecordSerializer;

// global nfa for all elements
private NFA<IN> nfa;
Expand All @@ -60,7 +61,7 @@ public AbstractCEPPatternOperator(
NFACompiler.NFAFactory<IN> nfaFactory) {
super(inputSerializer, isProcessingTime);

this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
this.nfa = nfaFactory.createNFA();
}

Expand Down Expand Up @@ -134,7 +135,8 @@ public void restoreState(StreamTaskState state) throws Exception {
priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());

for (int i = 0; i <numberPriorityQueueEntries; i++) {
priorityQueue.offer(streamRecordSerializer.deserialize(div));
StreamElement streamElement = streamRecordSerializer.deserialize(div);
priorityQueue.offer(streamElement.<IN>asRecord());
}

div.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

import java.io.IOException;
Expand Down Expand Up @@ -103,13 +103,17 @@ public void open() throws Exception {
null));
}

@SuppressWarnings("unchecked,rawtypes")
TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
(TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer());

if (priorityQueueOperatorState == null) {
priorityQueueOperatorState = getPartitionedState(
new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
new ValueStateDescriptor<>(
PRIORIRY_QUEUE_STATE_NAME,
new PriorityQueueSerializer<StreamRecord<IN>>(
new StreamRecordSerializer<IN>(getInputSerializer()),
new PriorityQueueStreamRecordFactory<IN>()),
new PriorityQueueSerializer<>(
streamRecordSerializer,
new PriorityQueueStreamRecordFactory<IN>()),
null));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -99,6 +100,94 @@ public Integer getKey(Event value) throws Exception {
harness.close();
}

@Test
public void testCEPOperatorCheckpointing() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;

@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
};

OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));

harness.open();

Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
Event endEvent= new Event(42, "end", 1.0);

harness.processElement(new StreamRecord<Event>(startEvent, 1));
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));

// simulate snapshot/restore with some elements in internal sorting queue
StreamTaskState snapshot = harness.snapshot(0, 0);

harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));

harness.setup();
harness.restore(snapshot, 1);
harness.open();

harness.processWatermark(new Watermark(Long.MIN_VALUE));

harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));

// if element timestamps are not correctly checkpointed/restored this will lead to
// a pruning time underflow exception in NFA
harness.processWatermark(new Watermark(2));

// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot2 = harness.snapshot(1, 1);

harness = new OneInputStreamOperatorTestHarness<>(
new CEPPatternOperator<>(
Event.createTypeSerializer(),
false,
new NFAFactory()));

harness.setup();
harness.restore(snapshot2, 2);
harness.open();

harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<Event>(endEvent, 5));

harness.processWatermark(new Watermark(Long.MAX_VALUE));

ConcurrentLinkedQueue<Object> result = harness.getOutput();

// watermark and the result
assertEquals(2, result.size());

Object resultObject = result.poll();
assertTrue(resultObject instanceof StreamRecord);
StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
assertTrue(resultRecord.getValue() instanceof Map);

@SuppressWarnings("unchecked")
Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();

assertEquals(startEvent, patternMap.get("start"));
assertEquals(middleEvent, patternMap.get("middle"));
assertEquals(endEvent, patternMap.get("end"));

harness.close();
}



@Test
public void testKeyedCEPOperatorCheckpointing() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
Expand Down Expand Up @@ -128,12 +217,33 @@ public Integer getKey(Event value) throws Exception {

harness.processElement(new StreamRecord<Event>(startEvent, 1));
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));

// simulate snapshot/restore with some elements in internal sorting queue
StreamTaskState snapshot = harness.snapshot(0, 0);

harness = new OneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
keySelector,
IntSerializer.INSTANCE,
new NFAFactory()));

harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
harness.setup();
harness.restore(snapshot, 1);
harness.open();

harness.processWatermark(new Watermark(Long.MIN_VALUE));

harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));

// if element timestamps are not correctly checkpointed/restored this will lead to
// a pruning time underflow exception in NFA
harness.processWatermark(new Watermark(2));

// simulate snapshot/restore
StreamTaskState snapshot = harness.snapshot(0, 0);
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot2 = harness.snapshot(1, 1);

harness = new OneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
Expand All @@ -145,7 +255,7 @@ public Integer getKey(Event value) throws Exception {

harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
harness.setup();
harness.restore(snapshot, 1);
harness.restore(snapshot2, 2);
harness.open();

harness.processElement(new StreamRecord<Event>(middleEvent, 3));
Expand All @@ -156,6 +266,7 @@ public Integer getKey(Event value) throws Exception {

ConcurrentLinkedQueue<Object> result = harness.getOutput();

// watermark and the result
assertEquals(2, result.size());

Object resultObject = result.poll();
Expand Down Expand Up @@ -203,7 +314,10 @@ public boolean filter(SubEvent value) throws Exception {
public boolean filter(Event value) throws Exception {
return value.getName().equals("end");
}
});
})
// add a window timeout to test whether timestamps of elements in the
// priority queue in CEP operator are correctly checkpointed/restored
.within(Time.milliseconds(10));

return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
}
Expand Down

0 comments on commit 254379b

Please sign in to comment.