Skip to content

Commit

Permalink
KYLO-737 Kylo NiFi Reporting Task doesnt suppress rapid fire events c…
Browse files Browse the repository at this point in the history
…orrectly if generated downstream
  • Loading branch information
scottreisdorf committed May 26, 2017
1 parent 818fd19 commit 27c79c2
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ public void logStats() {
*/
public void cacheAndBuildFlowFileGraph(ProvenanceEventRecordDTO event) {

// Get the FlowFile from the Cache. It is LoadingCache so if the file is new the Cache will create it
// Get the FlowFile from the Cache.
FeedFlowFileGuavaCache flowFileCache = flowFileGuavaCache;

//An event is the very first in the flow if it is a CREATE or RECEIVE event and if there are no Parent flow files
//This indicates the start of a Job.
if (ProvenanceEventUtil.isFirstEvent(event) && (event.getParentUuids() == null || (event.getParentUuids() != null && event.getParentUuids().isEmpty()))) {
//
if (ProvenanceEventUtil.isFirstEvent(event) && event.getParentUuids() == null || (event.getParentUuids() != null && event.getParentUuids().isEmpty())) {
//we only need to store references to the root feed flow file.
FeedFlowFile flowFile = null;
if (flowFileCache.isCached(event.getFlowFileUuid())) {
flowFile = flowFileCache.getEntry(event.getFlowFileUuid());
} else {
flowFile = new FeedFlowFile(event.getFlowFileUuid());
flowFileCache.add(event.getFlowFileUuid(), flowFile);
flowFile.setFirstEvent(event);
event.setIsStartOfJob(true);

}
flowFile.setFirstEvent(event);
event.setFeedFlowFile(flowFile);
event.setIsStartOfJob(true);
}

FeedFlowFile feedFlowFile = null;
Expand Down Expand Up @@ -125,7 +127,9 @@ public void cacheAndBuildFlowFileGraph(ProvenanceEventRecordDTO event) {
}

if (feedFlowFile == null) {
log.error("Unable to find feed flow file in cache!!!! for {} ", event);
//this is sometimes ok.
//it is observed that sometimes a CONTENT_MODIFIED event will come in before the CREATE/RECEIVE provenance Event.
//this will result in the CONTENT_MODIFIED not able to find the feed flow file. That is ok since the CREATE event will come in and pick up the feed flow file
throw new FeedFlowFileNotFoundException("Unable to find Feed Flow File for event " + event.getEventId() + ", Processor: " + event.getComponentId());
}
event.setFeedFlowFile(feedFlowFile);
Expand Down Expand Up @@ -170,7 +174,7 @@ public void cacheAndBuildFlowFileGraph(ProvenanceEventRecordDTO event) {
if (event.isEndingFlowFileEvent() && feedFlowFile.isFeedComplete()) {
event.setIsEndOfJob(true);
event.setIsFinalJobEvent(true);
log.info("Ending the Job for Feed {} and flowfile: {}. Event: {} ", event.getFeedName(), event.getFlowFileUuid(), event);
// log.info("Ending the Job for Feed {} and flowfile: {}. Event: {} ", event.getFeedName(), event.getFlowFileUuid(), event);
}

eventCounter.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ private void notifyError(String destination, Object payload, String errorMessge)
public void writeStats(AggregatedFeedProcessorStatisticsHolder stats) {
try {
if (stats.getEventCount().get() > 0) {
logger.info("SENDING AGGREGATED STAT to JMS {} ", stats);
sendJmsMessage.sendSerializedObjectToQueue(Queues.PROVENANCE_EVENT_STATS_QUEUE, stats);
AggregationEventProcessingStats.addStreamingEvents(stats.getEventCount().intValue());
notifySuccess(Queues.PROVENANCE_EVENT_STATS_QUEUE, stats);
Expand All @@ -111,12 +110,12 @@ public void writeStats(AggregatedFeedProcessorStatisticsHolder stats) {
*/
public void writeBatchEvents(ProvenanceEventRecordDTOHolder events) {
try {
logger.info("SENDING Events to JMS {} ", events);
logger.info("SENDING Batch Events to JMS {} ", events);
sendJmsMessage.sendSerializedObjectToQueue(Queues.FEED_MANAGER_QUEUE, events);
AggregationEventProcessingStats.addBatchEvents(events.getEvents().size());
notifySuccess(Queues.FEED_MANAGER_QUEUE, events);
} catch (Exception e) {
logger.error("Error writing sending JMS ", e);
logger.error("Error sending Batch Events to JMS ", e);
notifyError(Queues.FEED_MANAGER_QUEUE, events, e.getMessage());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,33 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Provenance events for Feeds not marked as "streaming" will be processed by this class in the KyloReportingTask of NiFi
*/
public class BatchFeedProcessorEvents implements Serializable {

private static final Logger log = LoggerFactory.getLogger(BatchFeedProcessorEvents.class);

/**
* Map to determine if the events coming in are rapid fire. if so they wil be suppressed based upon the supplied {@code maxEventsPerSecond} allowed
* Map of Time -> FeedFlowFileId, # events per sec received
*/
Map<DateTime, Set<ProvenanceEventRecordDTO>> startingJobEventsBySecond = new HashMap<>();
Map<DateTime, Map<String, AtomicInteger>> jobEventsBySecond = new HashMap<>();

AtomicInteger suppressedEventCount = new AtomicInteger(0);

/**
* The name of the feed. Derived from the process group {category}.{feed}
*/
private String feedName;
private String processorName;

/**
* the last DateTime stored in the jobEventsBySecond
*/
private DateTime lastEventTimeBySecond;

/**
* The Processor Id
*/
Expand All @@ -66,6 +77,8 @@ public class BatchFeedProcessorEvents implements Serializable {
* Collection of events that will be sent to jms
*/
private Set<ProvenanceEventRecordDTO> jmsEvents = new LinkedHashSet<>();

private Set<String> uniqueBatchEvents = new HashSet<>();
/**
* The max number of starting job events for the given feed and processor allowed to pass through per second This parameter is passed in via the constructor
*/
Expand Down Expand Up @@ -103,6 +116,16 @@ public boolean add(ProvenanceEventRecordDTO event) {

}

/**
* Unique key describing the event for ops manager
*
* @param event the provenance event
* @return the unique key
*/
private String batchEventKey(ProvenanceEventRecordDTO event) {
return feedName + "-" + processorId + "-" + processorName + "-" + event.getJobFlowFileId() + event.isStartOfJob() + event.isEndOfJob();
}

/**
* Check to see if we are getting events too fast to be considered a batch. If so suppress the events so just a few go through and the rest generate statistics.
*
Expand All @@ -112,19 +135,26 @@ public boolean add(ProvenanceEventRecordDTO event) {
private boolean isSuppressEvent(ProvenanceEventRecordDTO event) {
if (event.isStream() || event.getFeedFlowFile().isStream()) {
event.setStream(true);
log.warn(" Event {} has been suppressed from Kylo Ops Manager. Its parent starting event was detected as a stream for feed {} and processor: {} ", event, maxEventsPerSecond, feedName,
processorName);
return true;
} else if (event.isStartOfJob()) {
} else {
String jobEventsBySecondKey = event.getJobFlowFileId();
//if we ware the starting event of the job change the key to be the Feed name for detection of streaming events
if (event.isStartOfJob()) {
jobEventsBySecondKey = feedName;
}
DateTime time = event.getEventTime().withMillisOfSecond(0);
startingJobEventsBySecond.computeIfAbsent(time, key -> new HashSet<ProvenanceEventRecordDTO>()).add(event);
if (startingJobEventsBySecond.get(time).size() > maxEventsPerSecond) {
event.getFeedFlowFile().setStream(true);
lastEventTimeBySecond = time;
jobEventsBySecond.computeIfAbsent(time, key -> new HashMap<String, AtomicInteger>()).computeIfAbsent(jobEventsBySecondKey, flowFileId -> new AtomicInteger(0)).incrementAndGet();
//suppress if its not the ending event
if (!event.isFinalJobEvent() && jobEventsBySecond.get(time).get(jobEventsBySecondKey).get() > maxEventsPerSecond) {
if (event.isStartOfJob()) {
event.getFeedFlowFile().setStream(true);
}
event.setStream(true);
log.warn(" Event {} has been suppressed from Kylo Ops Manager. more than {} events per second were detected for feed {} and processor: {} ", event, maxEventsPerSecond, feedName,
processorName);
suppressedEventCount.incrementAndGet();
return true;
}

}
return false;
}
Expand All @@ -145,8 +175,12 @@ public boolean addEvent(ProvenanceEventRecordDTO event) {
}

event.setIsBatchJob(true);
jmsEvents.add(event);

String batchKey = batchEventKey(event);
if (!uniqueBatchEvents.contains(batchKey)) {
uniqueBatchEvents.add(batchKey);
jmsEvents.add(event);
}
lastEventTime = event.getEventTime();
return true;
}
Expand All @@ -164,11 +198,21 @@ public List<ProvenanceEventRecordDTO> collectEventsToBeSentToJmsQueue() {
try {
events = new ArrayList<>(jmsEvents);
jmsEvents.clear();
uniqueBatchEvents.clear();
} finally {

}
lastCollectionTime = DateTime.now();
startingJobEventsBySecond.clear();
if (suppressedEventCount.get() > 0) {
log.debug(" {} events have been suppressed from Kylo Ops Manager. more than {} events per second were detected for feed {} and processor: {} ", suppressedEventCount.get(),
maxEventsPerSecond, feedName,
processorName);
}
suppressedEventCount.set(0);
//safely remove everything in map before this time
if (lastEventTimeBySecond != null) {
jobEventsBySecond.entrySet().removeIf(entry -> entry.getKey().isBefore(lastEventTimeBySecond));
}
return events == null ? Collections.emptyList() : events;
}

Expand Down

0 comments on commit 27c79c2

Please sign in to comment.