Skip to content

Commit

Permalink
report message gap, source gap and sink count in RealtimePlumber (apa…
Browse files Browse the repository at this point in the history
…che#3744)

* report message gap, source gap and sink count in RealtimePlumber

* report message gap, sink count in Appenderator

* add ingest/events/sourceGap in metrics.md

* remove source gap
  • Loading branch information
kaijianding authored and himanshug committed Dec 13, 2016
1 parent 07384d6 commit 4be3eb0
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource.|1~3|
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource.|Greater than 0, depends on the time carried in event |

Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class FireDepartmentMetrics
private final AtomicLong mergeCpuTime = new AtomicLong(0);
private final AtomicLong persistCpuTime = new AtomicLong(0);
private final AtomicLong handOffCount = new AtomicLong(0);
private final AtomicLong sinkCount = new AtomicLong(0);
private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
private final AtomicLong messageGap = new AtomicLong(0);

public void incrementProcessed()
{
Expand Down Expand Up @@ -103,6 +106,14 @@ public void incrementHandOffCount(){
handOffCount.incrementAndGet();
}

public void setSinkCount(long sinkCount){
this.sinkCount.set(sinkCount);
}

public void reportMessageMaxTimestamp(long messageMaxTimestamp){
this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get()));
}

public long processed()
{
return processedCount.get();
Expand Down Expand Up @@ -168,6 +179,20 @@ public long handOffCount()
return handOffCount.get();
}

public long sinkCount()
{
return sinkCount.get();
}

public long messageMaxTimestamp()
{
return messageMaxTimestamp.get();
}

public long messageGap()
{
return messageGap.get();
}

public FireDepartmentMetrics snapshot()
{
Expand All @@ -185,6 +210,9 @@ public FireDepartmentMetrics snapshot()
retVal.mergeCpuTime.set(mergeCpuTime.get());
retVal.persistCpuTime.set(persistCpuTime.get());
retVal.handOffCount.set(handOffCount.get());
retVal.sinkCount.set(sinkCount.get());
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.messageGap.set(System.currentTimeMillis() - messageMaxTimestamp.get());
return retVal;
}

Expand All @@ -210,6 +238,9 @@ public FireDepartmentMetrics merge(FireDepartmentMetrics other)
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
handOffCount.addAndGet(otherSnapshot.handOffCount());
sinkCount.addAndGet(otherSnapshot.sinkCount());
messageMaxTimestamp.set(Math.max(messageMaxTimestamp(), otherSnapshot.messageMaxTimestamp()));
messageGap.set(Math.max(messageGap(), otherSnapshot.messageGap()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previous.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
previousValues.put(fireDepartment, metrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public int add(
}

final Sink sink = getOrCreateSink(identifier);
metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
final int sinkRowsInMemoryAfterAdd;

Expand Down Expand Up @@ -269,6 +270,7 @@ private Sink getOrCreateSink(final SegmentIdentifier identifier)
}

sinks.put(identifier, retVal);
metrics.setSinkCount(sinks.size());
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal));
}

Expand Down Expand Up @@ -905,6 +907,7 @@ public Object apply(@Nullable Object input)

log.info("Removing sink for segment[%s].", identifier);
sinks.remove(identifier);
metrics.setSinkCount(sinks.size());
droppingSinks.remove(identifier);
sinkTimeline.remove(
sink.getInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ public Object startJob()
@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
final Sink sink = getSink(row.getTimestampFromEpoch());
long messageTimestamp = row.getTimestampFromEpoch();
final Sink sink = getSink(messageTimestamp);
metrics.reportMessageMaxTimestamp(messageTimestamp);
if (sink == null) {
return -1;
}
Expand Down Expand Up @@ -716,6 +718,7 @@ public int compare(File o1, File o2)
private void addSink(final Sink sink)
{
sinks.put(sink.getInterval().getStartMillis(), sink);
metrics.setSinkCount(sinks.size());
sinkTimeline.add(
sink.getInterval(),
sink.getVersion(),
Expand Down Expand Up @@ -850,6 +853,7 @@ protected void abandonSegment(final long truncatedTime, final Sink sink)
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
metrics.setSinkCount(sinks.size());
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
Expand Down

0 comments on commit 4be3eb0

Please sign in to comment.