From e5057b72c0749a75578665c4c86a47be33382b4a Mon Sep 17 00:00:00 2001 From: kl0u Date: Mon, 13 Mar 2017 20:36:57 +0100 Subject: [PATCH] [FLINK-6032] [cep] Clean-up operator state when not needed. The CEP operator now cleans the registered state for a key. This happens: 1) for the priority queue, when the queue is empty. 2) for the NFA, when its shared buffer is empty. 3) finally the key is removed from the watermark callback service if both the above are empty. --- .../java/org/apache/flink/cep/nfa/NFA.java | 93 ++-- .../AbstractKeyedCEPPatternOperator.java | 117 +++-- .../java/org/apache/flink/cep/CEPITCase.java | 2 +- .../java/org/apache/flink/cep/SubEvent.java | 12 + .../flink/cep/operator/CEPOperatorTest.java | 407 +++++++++++------- 5 files changed, 412 insertions(+), 219 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 0ff496ff18fbb..8d87fd8a85a40 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.windowing.time.Time; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -54,52 +55,82 @@ /** * Non-deterministic finite automaton implementation. *

- * The NFA processes input events which will chnage the internal state machine. Whenever a final - * state is reached, the matching sequence of events is emitted. + * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator} + * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. + * When an event gets processed, it updates the NFA's internal state machine. + *

+ * An event that belongs to a partially matched sequence is kept in an internal + * {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for + * this purpose. Events in the buffer are removed when all the matched sequences that + * contain them are: + *

    + *
  1. emitted (success)
  2. + *
  3. discarded (patterns containing NOT)
  4. + *
  5. timed-out (windowed patterns)
  6. + *
* * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". * - * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + * @see + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf * * @param Type of the processed events */ public class NFA implements Serializable { - private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$"); private static final long serialVersionUID = 2957674889294717265L; + private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$"); + private final NonDuplicatingTypeSerializer nonDuplicatingTypeSerializer; - // Buffer used to store the matched events + /** + * Buffer used to store the matched events. + */ private final SharedBuffer, T> sharedBuffer; - // Set of all NFA states + /** + * A set of all the valid NFA states, as returned by the + * {@link org.apache.flink.cep.nfa.compiler.NFACompiler NFACompiler}. + * These are directly derived from the user-specified pattern. + */ private final Set> states; - // Length of the window + /** + * The length of a windowed pattern, as specified using the + * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)} + * method. + */ private final long windowTime; + /** + * A flag indicating if we want timed-out patterns (in case of windowed patterns) + * to be emitted ({@code true}), or silently discarded ({@code false}). + */ private final boolean handleTimeout; // Current starting index for the next dewey version number private int startEventCounter; - // Current set of computation states within the state machine + /** + * Current set of {@link ComputationState computation states} within the state machine. + * These are the "active" intermediate states that are waiting for new matching + * events to transition to new valid states. + */ private transient Queue> computationStates; public NFA( - final TypeSerializer eventSerializer, - final long windowTime, - final boolean handleTimeout) { + final TypeSerializer eventSerializer, + final long windowTime, + final boolean handleTimeout) { this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); - computationStates = new LinkedList<>(); - - states = new HashSet<>(); - startEventCounter = 1; + this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + this.computationStates = new LinkedList<>(); + this.states = new HashSet<>(); + this.startEventCounter = 1; } public Set> getStates() { @@ -120,6 +151,17 @@ public void addState(final State state) { } } + /** + * Check if the NFA has finished processing all incoming data so far. That is + * when the buffer keeping the matches is empty. + * + * @return {@code true} if there are no elements in the {@link SharedBuffer}, + * {@code false} otherwise. + */ + public boolean isEmpty() { + return sharedBuffer.isEmpty(); + } + /** * Processes the next input event. If some of the computations reach a final state then the * resulting event sequences are returned. If computations time out and timeout handling is @@ -186,15 +228,13 @@ public Tuple2>, Collection, Long if(windowTime > 0L) { long pruningTimestamp = timestamp - windowTime; - // sanity check to guard against underflows - if (pruningTimestamp >= timestamp) { - throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" + - " either the window length is too long (" + windowTime + ") or that the timestamp has not been" + - " set correctly (e.g. Long.MIN_VALUE)."); - } + if (pruningTimestamp < timestamp) { + // the check is to guard against underflows - // remove all elements which are expired with respect to the window length - sharedBuffer.prune(pruningTimestamp); + // remove all elements which are expired + // with respect to the window length + sharedBuffer.prune(pruningTimestamp); + } } return Tuple2.of(result, timeoutResult); @@ -251,7 +291,7 @@ private Collection> computeNextStates( final T event, final long timestamp) { Stack> states = new Stack<>(); - ArrayList> resultingComputationStates = new ArrayList<>(); + List> resultingComputationStates = new ArrayList<>(); State state = computationState.getState(); states.push(state); @@ -381,7 +421,7 @@ private Collection> extractPatternMatches(final ComputationState< computationState.getTimestamp(), computationState.getVersion()); - ArrayList> result = new ArrayList<>(); + List> result = new ArrayList<>(); TypeSerializer serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); @@ -497,6 +537,7 @@ static String generateStateName(final String name, final int index) { * {@link TypeSerializer} for {@link NFA} that uses Java Serialization. */ public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; @Override diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index f534becb7948d..de7daeaa8bc2d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.operator; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -140,77 +141,58 @@ private void initializeState() { public void open() throws Exception { super.open(); - InternalWatermarkCallbackService watermarkCallbackService = getInternalWatermarkCallbackService(); + final InternalWatermarkCallbackService watermarkCallbackService = getInternalWatermarkCallbackService(); watermarkCallbackService.setWatermarkCallback( new OnWatermarkCallback() { @Override public void onWatermark(KEY key, Watermark watermark) throws IOException { - setCurrentKey(key); + // 1) get the queue of pending elements for the key and the corresponding NFA, + // 2) process the pending elements in event time order by feeding them in the NFA + // 3) advance the time to the current watermark, so that expired patterns are discarded. + // 4) update the stored state for the key, by only storing the new NFA and priority queue iff they + // have state to be used later. + + // STEP 1 PriorityQueue> priorityQueue = getPriorityQueue(); NFA nfa = getNFA(); - if (priorityQueue.isEmpty()) { - advanceTime(nfa, watermark.getTimestamp()); - } else { - while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) { - StreamRecord streamRecord = priorityQueue.poll(); - processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); - } + // STEP 2 + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) { + StreamRecord streamRecord = priorityQueue.poll(); + processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); } - updateNFA(nfa); + // STEP 3 + advanceTime(nfa, watermark.getTimestamp()); + + // STEP 4 updatePriorityQueue(priorityQueue); + updateNFA(nfa); + + if (priorityQueue.isEmpty() && nfa.isEmpty()) { + watermarkCallbackService.unregisterKeyFromWatermarkCallback(key); + } } }, keySerializer ); } - private NFA getNFA() throws IOException { - NFA nfa = nfaOperatorState.value(); - - if (nfa == null) { - nfa = nfaFactory.createNFA(); - - nfaOperatorState.update(nfa); - } - - return nfa; - } - - private void updateNFA(NFA nfa) throws IOException { - nfaOperatorState.update(nfa); - } - - private PriorityQueue> getPriorityQueue() throws IOException { - PriorityQueue> priorityQueue = priorityQueueOperatorState.value(); - - if (priorityQueue == null) { - priorityQueue = priorityQueueFactory.createPriorityQueue(); - - priorityQueueOperatorState.update(priorityQueue); - } - - return priorityQueue; - } - - private void updatePriorityQueue(PriorityQueue> queue) throws IOException { - priorityQueueOperatorState.update(queue); - } - @Override public void processElement(StreamRecord element) throws Exception { - getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); if (isProcessingTime) { // there can be no out of order elements in processing time NFA nfa = getNFA(); - processEvent(nfa, element.getValue(), System.currentTimeMillis()); + processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime()); updateNFA(nfa); + } else { + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + PriorityQueue> priorityQueue = getPriorityQueue(); // event time processing @@ -225,6 +207,32 @@ public void processElement(StreamRecord element) throws Exception { } } + private NFA getNFA() throws IOException { + NFA nfa = nfaOperatorState.value(); + return nfa != null ? nfa : nfaFactory.createNFA(); + } + + private void updateNFA(NFA nfa) throws IOException { + if (nfa.isEmpty()) { + nfaOperatorState.clear(); + } else { + nfaOperatorState.update(nfa); + } + } + + private PriorityQueue> getPriorityQueue() throws IOException { + PriorityQueue> priorityQueue = priorityQueueOperatorState.value(); + return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue(); + } + + private void updatePriorityQueue(PriorityQueue> queue) throws IOException { + if (queue.isEmpty()) { + priorityQueueOperatorState.clear(); + } else { + priorityQueueOperatorState.update(queue); + } + } + /** * Process the given event by giving it to the NFA and outputting the produced set of matched * event sequences. @@ -385,4 +393,25 @@ public int hashCode() { return getClass().hashCode(); } } + + ////////////////////// Testing Methods ////////////////////// + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { + setCurrentKey(key); + return priorityQueueOperatorState.value() != null; + } + + @VisibleForTesting + public int getPQSize(KEY key) throws IOException { + setCurrentKey(key); + PriorityQueue> pq = getPriorityQueue(); + return pq == null ? -1 : pq.size(); + } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 0f1f84564332e..58870174195d1 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -501,7 +501,7 @@ public String select(Map pattern) { result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); // the expected sequences of matching event ids - expected = "Left(1.0)\nRight(2.0,2.0,2.0)"; + expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)"; env.execute(); } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java index 31eff28b441b9..effb382de1086 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java @@ -30,6 +30,18 @@ public double getVolume() { return volume; } + @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 1899cb4aae0de..4ae74b9286099 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -49,7 +49,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; public class CEPOperatorTest extends TestLogger { @@ -58,24 +57,8 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { - KeySelector keySelector = new KeySelector() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + + OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); harness.open(); @@ -83,10 +66,7 @@ public Integer getKey(Event value) throws Exception { harness.processWatermark(expectedWatermark); - Object watermark = harness.getOutput().poll(); - - assertTrue(watermark instanceof Watermark); - assertEquals(expectedWatermark, watermark); + verifyWatermark(harness.getOutput().poll(), 42L); harness.close(); } @@ -94,24 +74,7 @@ public Integer getKey(Event value) throws Exception { @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { - KeySelector keySelector = new KeySelector() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); harness.open(); @@ -119,22 +82,14 @@ public Integer getKey(Event value) throws Exception { SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent= new Event(42, "end", 1.0); - harness.processElement(new StreamRecord(startEvent, 1)); - harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); harness.setup(); harness.initializeState(snapshot); @@ -142,29 +97,21 @@ public Integer getKey(Event value) throws Exception { harness.processWatermark(new Watermark(Long.MIN_VALUE)); - harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); // if element timestamps are not correctly checkpointed/restored this will lead to // a pruning time underflow exception in NFA - harness.processWatermark(new Watermark(2)); + harness.processWatermark(new Watermark(2L)); - harness.processElement(new StreamRecord(middleEvent, 3)); - harness.processElement(new StreamRecord(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord(endEvent, 5)); + harness.processElement(new StreamRecord(middleEvent, 3L)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L)); + harness.processElement(new StreamRecord<>(endEvent, 5L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); harness.setup(); harness.initializeState(snapshot2); @@ -172,22 +119,14 @@ public Integer getKey(Event value) throws Exception { harness.processWatermark(new Watermark(Long.MAX_VALUE)); - ConcurrentLinkedQueue result = harness.getOutput(); + // get and verify the output - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord resultRecord = (StreamRecord) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); + Queue result = harness.getOutput(); - @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); + assertEquals(2, result.size()); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + verifyWatermark(result.poll(), Long.MAX_VALUE); harness.close(); } @@ -199,24 +138,7 @@ public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - KeySelector keySelector = new KeySelector() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + OneInputStreamOperatorTestHarness> harness = getCepTestHarness(false); harness.setStateBackend(rocksDBStateBackend); @@ -226,22 +148,14 @@ public Integer getKey(Event value) throws Exception { SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent= new Event(42, "end", 1.0); - harness.processElement(new StreamRecord(startEvent, 1)); - harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -253,25 +167,17 @@ public Integer getKey(Event value) throws Exception { harness.processWatermark(new Watermark(Long.MIN_VALUE)); - harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); // if element timestamps are not correctly checkpointed/restored this will lead to // a pruning time underflow exception in NFA - harness.processWatermark(new Watermark(2)); + harness.processWatermark(new Watermark(2L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -280,28 +186,20 @@ public Integer getKey(Event value) throws Exception { harness.initializeState(snapshot2); harness.open(); - harness.processElement(new StreamRecord(middleEvent, 3)); - harness.processElement(new StreamRecord(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord(endEvent, 5)); + harness.processElement(new StreamRecord(middleEvent, 3L)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L)); + harness.processElement(new StreamRecord<>(endEvent, 5L)); harness.processWatermark(new Watermark(Long.MAX_VALUE)); - ConcurrentLinkedQueue result = harness.getOutput(); + // get and verify the output - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord resultRecord = (StreamRecord) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); + Queue result = harness.getOutput(); - @SuppressWarnings("unchecked") - Map patternMap = (Map) resultRecord.getValue(); + assertEquals(2, result.size()); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + verifyWatermark(result.poll(), Long.MAX_VALUE); harness.close(); } @@ -311,14 +209,8 @@ public Integer getKey(Event value) throws Exception { */ @Test public void testKeyedAdvancingTimeWithoutElements() throws Exception { - final KeySelector keySelector = new KeySelector() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; + final KeySelector keySelector = new TestKeySelector(); + final Event startEvent = new Event(42, "start", 1.0); final long watermarkTimestamp1 = 5L; final long watermarkTimestamp2 = 13L; @@ -349,7 +241,7 @@ public Integer getKey(Event value) throws Exception { Queue result = harness.getOutput(); - assertEquals(3, result.size()); + assertEquals(3L, result.size()); Object watermark1 = result.poll(); @@ -382,6 +274,225 @@ public Integer getKey(Event value) throws Exception { } } + @Test + public void testCEPOperatorCleanupEventTime() throws Exception { + + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + TestKeySelector keySelector = new TestKeySelector(); + KeyedCEPPatternOperator operator = getKeyedCepOpearator(false, keySelector); + OneInputStreamOperatorTestHarness> harness = getCepTestHarness(operator); + + harness.open(); + + harness.processWatermark(new Watermark(Long.MIN_VALUE)); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); + harness.processElement(new StreamRecord<>(startEventK2, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + harness.processElement(new StreamRecord(middleEvent1, 2L)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); + + // there must be 2 keys 42, 43 registered for the watermark callback + // all the seen elements must be in the priority queues but no NFA yet. + + assertEquals(2L, harness.numKeysForWatermarkCallback()); + assertEquals(4L, operator.getPQSize(42)); + assertEquals(1L, operator.getPQSize(43)); + assertTrue(!operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyNFA(43)); + + harness.processWatermark(new Watermark(2L)); + + verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE); + verifyWatermark(harness.getOutput().poll(), 2L); + + // still the 2 keys + // one element in PQ for 42 (the barfoo) as it arrived early + // for 43 the element entered the NFA and the PQ is empty + + assertEquals(2L, harness.numKeysForWatermarkCallback()); + assertTrue(operator.hasNonEmptyNFA(42)); + assertEquals(1L, operator.getPQSize(42)); + assertTrue(operator.hasNonEmptyNFA(43)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + harness.processElement(new StreamRecord<>(startEvent2, 4L)); + harness.processElement(new StreamRecord(middleEvent2, 5L)); + harness.processElement(new StreamRecord<>(endEvent1, 6L)); + harness.processWatermark(11L); + harness.processWatermark(12L); + + // now we have 1 key because the 43 expired and was removed. + // 42 is still there due to startEvent2 + assertEquals(1L, harness.numKeysForWatermarkCallback()); + assertTrue(operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyNFA(43)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1); + verifyWatermark(harness.getOutput().poll(), 11L); + verifyWatermark(harness.getOutput().poll(), 12L); + + harness.processElement(new StreamRecord(middleEvent3, 12L)); + harness.processElement(new StreamRecord<>(endEvent2, 13L)); + harness.processWatermark(20L); + harness.processWatermark(21L); + + assertTrue(!operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertEquals(0L, harness.numKeysForWatermarkCallback()); + + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + verifyWatermark(harness.getOutput().poll(), 20L); + verifyWatermark(harness.getOutput().poll(), 21L); + + harness.close(); + } + + @Test + public void testCEPOperatorCleanupProcessingTime() throws Exception { + + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + TestKeySelector keySelector = new TestKeySelector(); + KeyedCEPPatternOperator operator = getKeyedCepOpearator(true, keySelector); + OneInputStreamOperatorTestHarness> harness = getCepTestHarness(operator); + + harness.open(); + + harness.setProcessingTime(0L); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); + harness.processElement(new StreamRecord<>(startEventK2, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + harness.processElement(new StreamRecord(middleEvent1, 2L)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); + + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyPQ(43)); + assertTrue(operator.hasNonEmptyNFA(42)); + assertTrue(operator.hasNonEmptyNFA(43)); + + harness.setProcessingTime(3L); + + harness.processElement(new StreamRecord<>(startEvent2, 3L)); + harness.processElement(new StreamRecord(middleEvent2, 4L)); + harness.processElement(new StreamRecord<>(endEvent1, 5L)); + + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1); + + harness.setProcessingTime(11L); + + harness.processElement(new StreamRecord(middleEvent3, 11L)); + harness.processElement(new StreamRecord<>(endEvent2, 12L)); + + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + + harness.setProcessingTime(21L); + + assertTrue(operator.hasNonEmptyNFA(42)); + + harness.processElement(new StreamRecord<>(startEvent1, 21L)); + assertTrue(operator.hasNonEmptyNFA(42)); + + harness.setProcessingTime(49L); + + // TODO: 3/13/17 we have to have another event in order to clean up + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + + // the pattern expired + assertTrue(!operator.hasNonEmptyNFA(42)); + + assertEquals(0L, harness.numKeysForWatermarkCallback()); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + harness.close(); + } + + private void verifyWatermark(Object outputObject, long timestamp) { + assertTrue(outputObject instanceof Watermark); + assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); + } + + private void verifyPattern(Object outputObject, Event start, SubEvent middle, Event end) { + assertTrue(outputObject instanceof StreamRecord); + + StreamRecord resultRecord = (StreamRecord) outputObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap = (Map) resultRecord.getValue(); + assertEquals(start, patternMap.get("start")); + assertEquals(middle, patternMap.get("middle")); + assertEquals(end, patternMap.get("end")); + } + + private OneInputStreamOperatorTestHarness> getCepTestHarness(boolean isProcessingTime) throws Exception { + KeySelector keySelector = new TestKeySelector(); + + return new KeyedOneInputStreamOperatorTestHarness<>( + getKeyedCepOpearator(isProcessingTime, keySelector), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + private OneInputStreamOperatorTestHarness> getCepTestHarness( + KeyedCEPPatternOperator cepOperator) throws Exception { + KeySelector keySelector = new TestKeySelector(); + + return new KeyedOneInputStreamOperatorTestHarness<>( + cepOperator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + private KeyedCEPPatternOperator getKeyedCepOpearator( + boolean isProcessingTime, + KeySelector keySelector) { + + return new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + isProcessingTime, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()); + } + + private static class TestKeySelector implements KeySelector { + + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + } + private static class NFAFactory implements NFACompiler.NFAFactory { private static final long serialVersionUID = 1173020762472766713L;