Skip to content

Commit

Permalink
[FLINK-20533][datadog] Add Histogram support
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Dec 15, 2020
1 parent 38e424f commit 313e20e
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/deployment/metric_reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ metrics.reporter.stsd.interval: 60 SECONDS
Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`,
will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`.

<span class="label label-info">Note</span> Histograms are exposed as a series of gauges following the naming convention of Datadog histograms (`<metric_name>.<aggregation>`).
The `min` aggregation is reported by default, whereas `sum` is not available.
In contrast to Datadog-provided Histograms the reported aggregations are not computed for a specific reporting interval.

Parameters:

- `apikey` - the Datadog API key
Expand Down
4 changes: 4 additions & 0 deletions docs/deployment/metric_reporters.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ metrics.reporter.stsd.interval: 60 SECONDS
Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`,
will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`.

<span class="label label-info">Note</span> Histograms are exposed as a series of gauges following the naming convention of Datadog histograms (`<metric_name>.<aggregation>`).
The `min` aggregation is reported by default, whereas `sum` is not available.
In contrast to Datadog-provided Histograms the reported aggregations are not computed for a specific reporting interval.

Parameters:

- `apikey` - the Datadog API key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.datadog;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;

import java.util.List;

/**
* Maps histograms to datadog gauges.
*
* <p>Note: We cannot map them to datadog histograms because the HTTP API does not support them.
*/
public class DHistogram {
@VisibleForTesting
static final String SUFFIX_AVG = ".avg";
@VisibleForTesting
static final String SUFFIX_COUNT = ".count";
@VisibleForTesting
static final String SUFFIX_MEDIAN = ".median";
@VisibleForTesting
static final String SUFFIX_95_PERCENTILE = ".95percentile";
@VisibleForTesting
static final String SUFFIX_MIN = ".min";
@VisibleForTesting
static final String SUFFIX_MAX = ".max";

private final Histogram histogram;

private final MetricMetaData metaDataAvg;
private final MetricMetaData metaDataCount;
private final MetricMetaData metaDataMedian;
private final MetricMetaData metaData95Percentile;
private final MetricMetaData metaDataMin;
private final MetricMetaData metaDataMax;

public DHistogram(Histogram histogram, String metricName, String host, List<String> tags, Clock clock) {
this.histogram = histogram;
this.metaDataAvg = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_AVG, host, tags, clock);
this.metaDataCount = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_COUNT, host, tags, clock);
this.metaDataMedian = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_MEDIAN, host, tags, clock);
this.metaData95Percentile = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_95_PERCENTILE, host, tags, clock);
this.metaDataMin = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_MIN, host, tags, clock);
this.metaDataMax = new MetricMetaData(MetricType.gauge, metricName + SUFFIX_MAX, host, tags, clock);
}

public void addTo(DSeries series) {
final HistogramStatistics statistics = histogram.getStatistics();

// this selection is based on https://docs.datadoghq.com/developers/metrics/types/?tab=histogram
// we only exclude 'sum' (which is optional), because we cannot compute it
// the semantics for count are also slightly different, because we don't reset it after a report
series.add(new StaticDMetric(statistics.getMean(), metaDataAvg));
series.add(new StaticDMetric(histogram.getCount(), metaDataCount));
series.add(new StaticDMetric(statistics.getQuantile(.5), metaDataMedian));
series.add(new StaticDMetric(statistics.getQuantile(.95), metaData95Percentile));
series.add(new StaticDMetric(statistics.getMin(), metaDataMin));
series.add(new StaticDMetric(statistics.getMax(), metaDataMax));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>();
private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>();
private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>();
private final Map<Histogram, DHistogram> histograms = new ConcurrentHashMap<>();

private DatadogHttpClient client;
private List<String> configTags;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
// Only consider rate
meters.put(m, new DMeter(m, name, host, tags, clock));
} else if (metric instanceof Histogram) {
LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
Histogram h = (Histogram) metric;
histograms.put(h, new DHistogram(h, name, host, tags, clock));
} else {
LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
Expand All @@ -102,7 +104,7 @@ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup
} else if (metric instanceof Meter) {
meters.remove(metric);
} else if (metric instanceof Histogram) {
// No Histogram is registered
histograms.remove(metric);
} else {
LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
"does not support this metric type.", metric.getClass().getName());
Expand Down Expand Up @@ -139,6 +141,7 @@ public void report() {
addGaugesAndUnregisterOnException(request);
counters.values().forEach(request::add);
meters.values().forEach(request::add);
histograms.values().forEach(histogram -> histogram.addTo(request));

int totalMetrics = request.getSeries().size();
int fromIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.datadog;

/**
* A {@link DMetric} that returns a fixed value.
*/
public class StaticDMetric extends DMetric {
private final Number value;

public StaticDMetric(Number value, MetricMetaData metaData) {
super(metaData);
this.value = value;
}

public Number getMetricValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.metrics.datadog;

import org.apache.flink.metrics.util.TestCounter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -135,6 +136,24 @@ public void serializeMeterWithoutHost() throws JsonProcessingException {
assertSerialization(DatadogHttpClient.serialize(series), new MetricAssertion(MetricType.gauge, false, "1.0"));
}

@Test
public void serializeHistogram() throws JsonProcessingException {
DHistogram h = new DHistogram(new TestHistogram(), METRIC, HOST, tags, () -> MOCKED_SYSTEM_MILLIS);

DSeries series = new DSeries();
h.addTo(series);

assertSerialization(
DatadogHttpClient.serialize(series),
new MetricAssertion(MetricType.gauge, true, "4.0", DHistogram.SUFFIX_AVG),
new MetricAssertion(MetricType.gauge, true, "1", DHistogram.SUFFIX_COUNT),
new MetricAssertion(MetricType.gauge, true, "0.5", DHistogram.SUFFIX_MEDIAN),
new MetricAssertion(MetricType.gauge, true, "0.95", DHistogram.SUFFIX_95_PERCENTILE),
new MetricAssertion(MetricType.gauge, true, "7", DHistogram.SUFFIX_MIN),
new MetricAssertion(MetricType.gauge, true, "6", DHistogram.SUFFIX_MAX)
);
}

private static void assertSerialization(String json, MetricAssertion... metricAssertions) throws JsonProcessingException {
final JsonNode series = MAPPER.readTree(json).get(DSeries.FIELD_NAME_SERIES);

Expand All @@ -147,7 +166,7 @@ private static void assertSerialization(String json, MetricAssertion... metricAs
} else {
assertThat(parsedJson.get(DMetric.FIELD_NAME_HOST), nullValue());
}
assertThat(parsedJson.get(DMetric.FIELD_NAME_METRIC).asText(), is(METRIC));
assertThat(parsedJson.get(DMetric.FIELD_NAME_METRIC).asText(), is(METRIC + metricAssertion.metricNameSuffix));
assertThat(parsedJson.get(DMetric.FIELD_NAME_TYPE).asText(), is(metricAssertion.expectedType.name()));
assertThat(parsedJson.get(DMetric.FIELD_NAME_POINTS).toString(), is(String.format("[[123,%s]]", metricAssertion.expectedValue)));
assertThat(parsedJson.get(DMetric.FIELD_NAME_TAGS).toString(), is(TAGS_AS_JSON));
Expand All @@ -158,11 +177,17 @@ private static final class MetricAssertion {
final MetricType expectedType;
final boolean expectHost;
final String expectedValue;
final String metricNameSuffix;

private MetricAssertion(MetricType expectedType, boolean expectHost, String expectedValue) {
this(expectedType, expectHost, expectedValue, "");
}

private MetricAssertion(MetricType expectedType, boolean expectHost, String expectedValue, String metricNameSuffix) {
this.expectedType = expectedType;
this.expectHost = expectHost;
this.expectedValue = expectedValue;
this.metricNameSuffix = metricNameSuffix;
}
}
}

0 comments on commit 313e20e

Please sign in to comment.