Skip to content

Commit

Permalink
Add ingest/input/bytes metric and Kafka consumer metrics. (apache#14582)
Browse files Browse the repository at this point in the history
* Add ingest/input/bytes metric and Kafka consumer metrics.

New metrics:

1) ingest/input/bytes. Equivalent to processedBytes in the task reports.

2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
   metric "bytes-consumed-total". Only emitted for Kafka tasks.

3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
   metric "records-consumed-total". Only emitted for Kafka tasks.

* Fix anchor.

* Fix KafkaConsumerMonitor.

* Interface updates.

* Doc changes.

* Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java

Co-authored-by: Benedict Jin <[email protected]>

---------

Co-authored-by: Benedict Jin <[email protected]>
  • Loading branch information
gianm and asdf2014 authored Jul 20, 2023
1 parent f7348d7 commit bac5ef3
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 102 deletions.
6 changes: 4 additions & 2 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,12 @@ batch ingestion emit the following metrics. These metrics are deltas for each em

|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/events/processedWithError`|Number of events processed with some partial errors per emission period. Events processed with partial errors are counted towards both this metric and `ingest/events/processed`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`, or `windowPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.kafka;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class KafkaConsumerMonitor extends AbstractMonitor
{
private volatile boolean stopAfterNext = false;

// Kafka metric name -> Druid metric name
private static final Map<String, String> METRICS =
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total", "kafka/consumer/recordsConsumed")
.build();
private static final String TOPIC_TAG = "topic";
private static final Set<String> TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG);

private final KafkaConsumer<?, ?> consumer;
private final Map<String, AtomicLong> counters = new HashMap<>();

public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
{
this.consumer = consumer;
}

@Override
public boolean doMonitor(final ServiceEmitter emitter)
{
for (final Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
final MetricName metricName = entry.getKey();

if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) {
final String topic = metricName.tags().get(TOPIC_TAG);
final long newValue = ((Number) entry.getValue().metricValue()).longValue();
final long priorValue =
counters.computeIfAbsent(metricName.name(), ignored -> new AtomicLong())
.getAndSet(newValue);

if (newValue != priorValue) {
final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
emitter.emit(builder.build(METRICS.get(metricName.name()), newValue - priorValue));
}
}
}

return !stopAfterNext;
}

public void stopAfterNextEmit()
{
stopAfterNext = true;
}

private static boolean isTopicMetric(final MetricName metricName)
{
// Certain metrics are emitted both as grand totals and broken down by topic; we want to ignore the grand total and
// only look at the per-topic metrics. See https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
&& !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
Expand Down Expand Up @@ -97,7 +98,7 @@ protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> create
}

@Override
protected KafkaRecordSupplier newTaskRecordSupplier()
protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Expand All @@ -107,7 +108,14 @@ protected KafkaRecordSupplier newTaskRecordSupplier()

props.put("auto.offset.reset", "none");

return new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());
final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());

if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
}

return recordSupplier;
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -60,6 +61,7 @@
public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
private final KafkaConsumerMonitor monitor;
private boolean closed;

public KafkaRecordSupplier(
Expand All @@ -77,6 +79,7 @@ public KafkaRecordSupplier(
)
{
this.consumer = consumer;
this.monitor = new KafkaConsumerMonitor(consumer);
}

@Override
Expand Down Expand Up @@ -190,13 +193,23 @@ public Set<Integer> getPartitionIds(String stream)
});
}

/**
* Returns a Monitor that emits Kafka consumer metrics.
*/
public Monitor monitor()
{
return monitor;
}

@Override
public void close()
{
if (closed) {
return;
}
closed = true;

monitor.stopAfterNextEmit();
consumer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -152,7 +151,6 @@
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final long POLL_RETRY_MS = 100;
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new Header()
{
Expand Down
Loading

0 comments on commit bac5ef3

Please sign in to comment.