Skip to content

Commit

Permalink
KAFKA-7560; PushHttpMetricsReporter should not convert metric value t…
Browse files Browse the repository at this point in the history
…o double

Currently PushHttpMetricsReporter will convert value from KafkaMetric.metricValue() to double. This will not work for non-numerical metrics such as version in AppInfoParser whose value can be string. This has caused issue for PushHttpMetricsReporter which in turn caused system test kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail.

Since we allow metric value to be object, PushHttpMetricsReporter should also read metric value as object and pass it to the http server.

Author: Dong Lin <[email protected]>

Reviewers: Manikumar Reddy O <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#5886 from lindong28/KAFKA-7560
  • Loading branch information
lindong28 authored and ewencp committed Nov 7, 2018
1 parent 7b5ffa0 commit df0faee
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ public void run() {
samples = new ArrayList<>(metrics.size());
for (KafkaMetric metric : metrics.values()) {
MetricName name = metric.metricName();
double value = (Double) metric.metricValue();
samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
samples.add(new MetricValue(name.name(), name.group(), name.tags(), metric.metricValue()));
}
}

Expand Down Expand Up @@ -212,9 +211,9 @@ public void run() {
} else {
log.info("Finished reporting metrics with response code {}", responseCode);
}
} catch (Exception e) {
log.error("Error reporting metrics", e);
throw new KafkaException("Failed to report current metrics", e);
} catch (Throwable t) {
log.error("Error reporting metrics", t);
throw new KafkaException("Failed to report current metrics", t);
} finally {
if (connection != null) {
connection.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -184,32 +185,40 @@ public void testMetricValues() throws Exception {
KafkaMetric metric1 = new KafkaMetric(
new Object(),
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
new ImmutableValue(1.0),
new ImmutableValue<>(1.0),
null,
time
);
KafkaMetric newMetric1 = new KafkaMetric(
new Object(),
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
new ImmutableValue(-1.0),
new ImmutableValue<>(-1.0),
null,
time
);
KafkaMetric metric2 = new KafkaMetric(
new Object(),
new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
new ImmutableValue(2.0),
new ImmutableValue<>(2.0),
null,
time
);
KafkaMetric metric3 = new KafkaMetric(
new Object(),
new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
new ImmutableValue(3.0),
new ImmutableValue<>(3.0),
null,
time
);
reporter.init(Arrays.asList(metric1, metric2));
KafkaMetric metric4 = new KafkaMetric(
new Object(),
new MetricName("name4", "group4", "desc4", Collections.singletonMap("key4", "value4")),
new ImmutableValue<>("value4"),
null,
time
);

reporter.init(Arrays.asList(metric1, metric2, metric4));
reporter.metricChange(newMetric1); // added in init, modified
reporter.metricChange(metric3); // added by change
reporter.metricRemoval(metric2); // added in init, deleted by removal
Expand All @@ -222,9 +231,12 @@ public void testMetricValues() throws Exception {
// We should be left with the modified version of metric1 and metric3
JsonNode metrics = payload.get("metrics");
assertTrue(metrics.isArray());
assertEquals(2, metrics.size());
assertEquals(3, metrics.size());
List<JsonNode> metricsList = Arrays.asList(metrics.get(0), metrics.get(1), metrics.get(2));
// Sort metrics based on name so that we can verify the value for each metric below
metricsList.sort((m1, m2) -> m1.get("name").textValue().compareTo(m2.get("name").textValue()));

JsonNode m1 = metrics.get(0);
JsonNode m1 = metricsList.get(0);
assertEquals("name1", m1.get("name").textValue());
assertEquals("group1", m1.get("group").textValue());
JsonNode m1Tags = m1.get("tags");
Expand All @@ -233,7 +245,7 @@ public void testMetricValues() throws Exception {
assertEquals("value1", m1Tags.get("key1").textValue());
assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);

JsonNode m3 = metrics.get(1);
JsonNode m3 = metricsList.get(1);
assertEquals("name3", m3.get("name").textValue());
assertEquals("group3", m3.get("group").textValue());
JsonNode m3Tags = m3.get("tags");
Expand All @@ -242,6 +254,15 @@ public void testMetricValues() throws Exception {
assertEquals("value3", m3Tags.get("key3").textValue());
assertEquals(3.0, m3.get("value").doubleValue(), 0.0);

JsonNode m4 = metricsList.get(2);
assertEquals("name4", m4.get("name").textValue());
assertEquals("group4", m4.get("group").textValue());
JsonNode m4Tags = m4.get("tags");
assertTrue(m4Tags.isObject());
assertEquals(1, m4Tags.size());
assertEquals("value4", m4Tags.get("key4").textValue());
assertEquals("value4", m4.get("value").textValue());

reporter.close();

verifyAll();
Expand Down Expand Up @@ -318,15 +339,15 @@ private void expectClose() throws Exception {
EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
}

private static class ImmutableValue implements Measurable {
private final double value;
static class ImmutableValue<T> implements Gauge<T> {
private final T value;

public ImmutableValue(double value) {
public ImmutableValue(T value) {
this.value = value;
}

@Override
public double measure(MetricConfig config, long now) {
public T value(MetricConfig config, long now) {
return value;
}
}
Expand Down

0 comments on commit df0faee

Please sign in to comment.