Skip to content

Commit

Permalink
alert when resetting offsets (apache#3931)
Browse files Browse the repository at this point in the history
* alert when resetting offsets

* add more data to alerts
  • Loading branch information
pjain1 authored and fjy committed Feb 13, 2017
1 parent c1eee9b commit 1f263fe
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1039,10 +1039,11 @@ private void possiblyResetOffsetsOrWait(
}
}

private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox) throws IOException
private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
throws IOException
{
Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
for (Map.Entry<TopicPartition, Long> outOfRangePartition: outOfRangePartitions.entrySet()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
}
boolean result = taskToolbox.getTaskActionClient()
Expand All @@ -1056,7 +1057,9 @@ private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartiti
));

if (result) {
log.warn("Successfully sent the reset request for partitions [%s], waiting to be killed", partitionOffsetMap.keySet());
log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
.addData("partitions", partitionOffsetMap.keySet())
.emit();
// wait for being killed by supervisor
try {
Thread.sleep(Long.MAX_VALUE);
Expand Down

0 comments on commit 1f263fe

Please sign in to comment.