Skip to content

Commit

Permalink
[FLINK-6032] [cep] Clean-up operator state when not needed.
Browse files Browse the repository at this point in the history
The CEP operator now cleans the registered state for a
key. This happens:
1) for the priority queue, when the queue is empty.
2) for the NFA, when its shared buffer is empty.
3) finally the key is removed from the watermark
   callback service if both the above are empty.
  • Loading branch information
kl0u committed Mar 16, 2017
1 parent 97f1788 commit e5057b7
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -54,52 +55,82 @@
/**
* Non-deterministic finite automaton implementation.
* <p>
* The NFA processes input events which will chnage the internal state machine. Whenever a final
* state is reached, the matching sequence of events is emitted.
* The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator}
* keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
* When an event gets processed, it updates the NFA's internal state machine.
* <p>
* An event that belongs to a partially matched sequence is kept in an internal
* {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for
* this purpose. Events in the buffer are removed when all the matched sequences that
* contain them are:
* <ol>
* <li>emitted (success)</li>
* <li>discarded (patterns containing NOT)</li>
* <li>timed-out (windowed patterns)</li>
* </ol>
*
* The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
*
* @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
* @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
* https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
*
* @param <T> Type of the processed events
*/
public class NFA<T> implements Serializable {

private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$");
private static final long serialVersionUID = 2957674889294717265L;

private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$");

private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;

// Buffer used to store the matched events
/**
* Buffer used to store the matched events.
*/
private final SharedBuffer<State<T>, T> sharedBuffer;

// Set of all NFA states
/**
* A set of all the valid NFA states, as returned by the
* {@link org.apache.flink.cep.nfa.compiler.NFACompiler NFACompiler}.
* These are directly derived from the user-specified pattern.
*/
private final Set<State<T>> states;

// Length of the window
/**
* The length of a windowed pattern, as specified using the
* {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)}
* method.
*/
private final long windowTime;

/**
* A flag indicating if we want timed-out patterns (in case of windowed patterns)
* to be emitted ({@code true}), or silently discarded ({@code false}).
*/
private final boolean handleTimeout;

// Current starting index for the next dewey version number
private int startEventCounter;

// Current set of computation states within the state machine
/**
* Current set of {@link ComputationState computation states} within the state machine.
* These are the "active" intermediate states that are waiting for new matching
* events to transition to new valid states.
*/
private transient Queue<ComputationState<T>> computationStates;

public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
final boolean handleTimeout) {
final TypeSerializer<T> eventSerializer,
final long windowTime,
final boolean handleTimeout) {

this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
computationStates = new LinkedList<>();

states = new HashSet<>();
startEventCounter = 1;
this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
this.computationStates = new LinkedList<>();
this.states = new HashSet<>();
this.startEventCounter = 1;
}

public Set<State<T>> getStates() {
Expand All @@ -120,6 +151,17 @@ public void addState(final State<T> state) {
}
}

/**
* Check if the NFA has finished processing all incoming data so far. That is
* when the buffer keeping the matches is empty.
*
* @return {@code true} if there are no elements in the {@link SharedBuffer},
* {@code false} otherwise.
*/
public boolean isEmpty() {
return sharedBuffer.isEmpty();
}

/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
Expand Down Expand Up @@ -186,15 +228,13 @@ public Tuple2<Collection<Map<String, T>>, Collection<Tuple2<Map<String, T>, Long
if(windowTime > 0L) {
long pruningTimestamp = timestamp - windowTime;

// sanity check to guard against underflows
if (pruningTimestamp >= timestamp) {
throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" +
" either the window length is too long (" + windowTime + ") or that the timestamp has not been" +
" set correctly (e.g. Long.MIN_VALUE).");
}
if (pruningTimestamp < timestamp) {
// the check is to guard against underflows

// remove all elements which are expired with respect to the window length
sharedBuffer.prune(pruningTimestamp);
// remove all elements which are expired
// with respect to the window length
sharedBuffer.prune(pruningTimestamp);
}
}

return Tuple2.of(result, timeoutResult);
Expand Down Expand Up @@ -251,7 +291,7 @@ private Collection<ComputationState<T>> computeNextStates(
final T event,
final long timestamp) {
Stack<State<T>> states = new Stack<>();
ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>();
List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
State<T> state = computationState.getState();

states.push(state);
Expand Down Expand Up @@ -381,7 +421,7 @@ private Collection<Map<String, T>> extractPatternMatches(final ComputationState<
computationState.getTimestamp(),
computationState.getVersion());

ArrayList<Map<String, T>> result = new ArrayList<>();
List<Map<String, T>> result = new ArrayList<>();

TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();

Expand Down Expand Up @@ -497,6 +537,7 @@ static String generateStateName(final String name, final int index) {
* {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
*/
public static class Serializer<T> extends TypeSerializer<NFA<T>> {

private static final long serialVersionUID = 1L;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.cep.operator;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -140,77 +141,58 @@ private void initializeState() {
public void open() throws Exception {
super.open();

InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService();
final InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService();

watermarkCallbackService.setWatermarkCallback(
new OnWatermarkCallback<KEY>() {

@Override
public void onWatermark(KEY key, Watermark watermark) throws IOException {
setCurrentKey(key);

// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and priority queue iff they
// have state to be used later.

// STEP 1
PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
NFA<IN> nfa = getNFA();

if (priorityQueue.isEmpty()) {
advanceTime(nfa, watermark.getTimestamp());
} else {
while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
StreamRecord<IN> streamRecord = priorityQueue.poll();
processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
}
// STEP 2
while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
StreamRecord<IN> streamRecord = priorityQueue.poll();
processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
}

updateNFA(nfa);
// STEP 3
advanceTime(nfa, watermark.getTimestamp());

// STEP 4
updatePriorityQueue(priorityQueue);
updateNFA(nfa);

if (priorityQueue.isEmpty() && nfa.isEmpty()) {
watermarkCallbackService.unregisterKeyFromWatermarkCallback(key);
}
}
},
keySerializer
);
}

private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();

if (nfa == null) {
nfa = nfaFactory.createNFA();

nfaOperatorState.update(nfa);
}

return nfa;
}

private void updateNFA(NFA<IN> nfa) throws IOException {
nfaOperatorState.update(nfa);
}

private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();

if (priorityQueue == null) {
priorityQueue = priorityQueueFactory.createPriorityQueue();

priorityQueueOperatorState.update(priorityQueue);
}

return priorityQueue;
}

private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
priorityQueueOperatorState.update(queue);
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));

if (isProcessingTime) {
// there can be no out of order elements in processing time
NFA<IN> nfa = getNFA();
processEvent(nfa, element.getValue(), System.currentTimeMillis());
processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
updateNFA(nfa);

} else {
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));

PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();

// event time processing
Expand All @@ -225,6 +207,32 @@ public void processElement(StreamRecord<IN> element) throws Exception {
}
}

private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
return nfa != null ? nfa : nfaFactory.createNFA();
}

private void updateNFA(NFA<IN> nfa) throws IOException {
if (nfa.isEmpty()) {
nfaOperatorState.clear();
} else {
nfaOperatorState.update(nfa);
}
}

private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue();
}

private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
if (queue.isEmpty()) {
priorityQueueOperatorState.clear();
} else {
priorityQueueOperatorState.update(queue);
}
}

/**
* Process the given event by giving it to the NFA and outputting the produced set of matched
* event sequences.
Expand Down Expand Up @@ -385,4 +393,25 @@ public int hashCode() {
return getClass().hashCode();
}
}

////////////////////// Testing Methods //////////////////////

@VisibleForTesting
public boolean hasNonEmptyNFA(KEY key) throws IOException {
setCurrentKey(key);
return nfaOperatorState.value() != null;
}

@VisibleForTesting
public boolean hasNonEmptyPQ(KEY key) throws IOException {
setCurrentKey(key);
return priorityQueueOperatorState.value() != null;
}

@VisibleForTesting
public int getPQSize(KEY key) throws IOException {
setCurrentKey(key);
PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue();
return pq == null ? -1 : pq.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public String select(Map<String, Event> pattern) {
result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);

// the expected sequences of matching event ids
expected = "Left(1.0)\nRight(2.0,2.0,2.0)";
expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";

env.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ public double getVolume() {
return volume;
}

@Override
public boolean equals(Object obj) {
return obj instanceof SubEvent &&
super.equals(obj) &&
((SubEvent) obj).volume == volume;
}

@Override
public int hashCode() {
return super.hashCode() + (int) volume;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down
Loading

0 comments on commit e5057b7

Please sign in to comment.