Skip to content

Commit

Permalink
Separate metric (rather than attribute) for version_conflict indexing…
Browse files Browse the repository at this point in the history
… failure
  • Loading branch information
albertzaharovits committed Jan 7, 2025
1 parent 108645b commit b38a937
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +77,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
static final String STANDARD_INDEXING_COUNT = "es.indices.standard.indexing.total";
static final String STANDARD_INDEXING_TIME = "es.indices.standard.indexing.time";
static final String STANDARD_INDEXING_FAILURE = "es.indices.standard.indexing.failure.total";
static final String STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT = "es.indices.standard.indexing.failure.version_conflict.total";

static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total";
static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.size";
Expand All @@ -89,6 +91,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
static final String TIME_SERIES_INDEXING_COUNT = "es.indices.time_series.indexing.total";
static final String TIME_SERIES_INDEXING_TIME = "es.indices.time_series.indexing.time";
static final String TIME_SERIES_INDEXING_FAILURE = "es.indices.time_series.indexing.failure.total";
static final String TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT =
"es.indices.time_series.indexing.failure.version_conflict.total";

static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total";
static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.size";
Expand All @@ -102,6 +106,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
static final String LOGSDB_INDEXING_COUNT = "es.indices.logsdb.indexing.total";
static final String LOGSDB_INDEXING_TIME = "es.indices.logsdb.indexing.time";
static final String LOGSDB_INDEXING_FAILURE = "es.indices.logsdb.indexing.failure.total";
static final String LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT = "es.indices.logsdb.indexing.failure.version_conflict.total";

public void testIndicesMetrics() {
String indexNode = internalCluster().startNode();
Expand Down Expand Up @@ -130,16 +135,13 @@ public void testIndicesMetrics() {
STANDARD_INDEXING_COUNT,
equalTo(numStandardDocs),
STANDARD_INDEXING_TIME,
greaterThanOrEqualTo(0L)
greaterThanOrEqualTo(0L),
STANDARD_INDEXING_FAILURE,
equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexFailedCount()),
STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(indexing1.getIndexFailedDueToVersionConflictCount() - indexing0.getIndexFailedDueToVersionConflictCount())
)
);
assertIndexingFailureMetrics(
telemetry,
1,
STANDARD_INDEXING_FAILURE,
equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexFailedCount()),
equalTo(0L)
);

long numTimeSeriesIndices = randomIntBetween(1, 5);
long numTimeSeriesDocs = populateTimeSeriesIndices(numTimeSeriesIndices);
Expand All @@ -158,16 +160,13 @@ public void testIndicesMetrics() {
TIME_SERIES_INDEXING_COUNT,
equalTo(numTimeSeriesDocs),
TIME_SERIES_INDEXING_TIME,
greaterThanOrEqualTo(0L)
greaterThanOrEqualTo(0L),
TIME_SERIES_INDEXING_FAILURE,
equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexFailedCount()),
TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(indexing1.getIndexFailedDueToVersionConflictCount() - indexing0.getIndexFailedDueToVersionConflictCount())
)
);
assertIndexingFailureMetrics(
telemetry,
2,
TIME_SERIES_INDEXING_FAILURE,
equalTo(indexing2.getIndexFailedCount() - indexing1.getIndexFailedCount()),
equalTo(0L)
);

long numLogsdbIndices = randomIntBetween(1, 5);
long numLogsdbDocs = populateLogsdbIndices(numLogsdbIndices);
Expand All @@ -185,40 +184,52 @@ public void testIndicesMetrics() {
LOGSDB_INDEXING_COUNT,
equalTo(numLogsdbDocs),
LOGSDB_INDEXING_TIME,
greaterThanOrEqualTo(0L)
greaterThanOrEqualTo(0L),
LOGSDB_INDEXING_FAILURE,
equalTo(indexing3.getIndexFailedCount() - indexing2.getIndexFailedCount()),
LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(indexing3.getIndexFailedDueToVersionConflictCount() - indexing2.getIndexFailedDueToVersionConflictCount())
)
);
assertIndexingFailureMetrics(
telemetry,
3,
LOGSDB_INDEXING_FAILURE,
equalTo(indexing3.getIndexFailedCount() - indexing2.getIndexFailedCount()),
equalTo(0L)
);
// already collected indexing stats
collectThenAssertMetrics(
telemetry,
4,
Map<String, Matcher<Long>> zeroMatchers = new HashMap<>();
zeroMatchers.putAll(
Map.of(
STANDARD_INDEXING_COUNT,
equalTo(0L),
STANDARD_INDEXING_TIME,
equalTo(0L),

STANDARD_INDEXING_FAILURE,
equalTo(0L),
STANDARD_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(0L)
)
);
zeroMatchers.putAll(
Map.of(
TIME_SERIES_INDEXING_COUNT,
equalTo(0L),
TIME_SERIES_INDEXING_TIME,
equalTo(0L),

TIME_SERIES_INDEXING_FAILURE,
equalTo(0L),
TIME_SERIES_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(0L)
)
);
zeroMatchers.putAll(
Map.of(
LOGSDB_INDEXING_COUNT,
equalTo(0L),
LOGSDB_INDEXING_TIME,
equalTo(0L),
LOGSDB_INDEXING_FAILURE,
equalTo(0L),
LOGSDB_INDEXING_FAILURE_DUE_TO_VERSION_CONFLICT,
equalTo(0L)
)
);
assertIndexingFailureMetrics(telemetry, 4, STANDARD_INDEXING_FAILURE, equalTo(0L), equalTo(0L));
assertIndexingFailureMetrics(telemetry, 4, TIME_SERIES_INDEXING_FAILURE, equalTo(0L), equalTo(0L));
assertIndexingFailureMetrics(telemetry, 4, LOGSDB_INDEXING_FAILURE, equalTo(0L), equalTo(0L));
collectThenAssertMetrics(telemetry, 4, zeroMatchers);
String searchNode = internalCluster().startDataOnlyNode();
indicesService = internalCluster().getInstance(IndicesService.class, searchNode);
telemetry = internalCluster().getInstance(PluginsService.class, searchNode)
Expand Down Expand Up @@ -353,24 +364,6 @@ void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map<Stri
}
}

void assertIndexingFailureMetrics(
TestTelemetryPlugin telemetry,
int collectTimes,
String indexingFailureMetricsKey,
Matcher<Long> anyFailureMatcher,
Matcher<Long> versionConflictFailureMatcher
) {
List<Measurement> measurements = telemetry.getLongGaugeMeasurement(indexingFailureMetricsKey);
// there are 2 metrics collected ("any" and "version_conflicts") every collection time
assertThat(measurements, hasSize(collectTimes * 2));

String causeAttributeName = indexingFailureMetricsKey.replace("total", "cause");
assertThat(measurements.get(collectTimes * 2 - 2).attributes(), equalTo(Map.of(causeAttributeName, "any")));
assertThat((long) measurements.get(collectTimes * 2 - 2).value(), anyFailureMatcher);
assertThat(measurements.get(collectTimes * 2 - 1).attributes(), equalTo(Map.of(causeAttributeName, "version_conflict")));
assertThat((long) measurements.get(collectTimes * 2 - 1).value(), versionConflictFailureMatcher);
}

int populateStandardIndices(long numIndices) {
int totalDocs = 0;
for (int i = 0; i < numIndices; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ public void testZeroMetricsForVersionConflictsForNonIndexingOperations() {
// simulate async apm `polling` call for metrics
plugin.collect();

List<Measurement> measurements = plugin.getLongAsyncCounterMeasurement("es.indexing.indexing.failed.total");
// there are no indexing (version conflict) failures reported because only gets/updates/deletes generated the conflicts
// and those are not indexing operations
assertThat(measurements, iterableWithSize(2));
assertThat(measurements.get(0).value(), is(0L));
assertThat(measurements.get(0).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "any")));
assertThat(measurements.get(1).value(), is(0L));
assertThat(measurements.get(1).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "version_conflict")));
// and those are not "indexing" operations
var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total");
assertThat(indexingFailedTotal.getLong(), equalTo(0L));
var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric(
plugin::getLongAsyncCounterMeasurement,
"es.indexing.indexing.failed.version_conflict.total"
);
assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(0L));
}

public void testMetricsForIndexingVersionConflicts() {
Expand Down Expand Up @@ -255,12 +256,13 @@ public void testMetricsForIndexingVersionConflicts() {

plugin.collect();

List<Measurement> measurements = plugin.getLongAsyncCounterMeasurement("es.indexing.indexing.failed.total");
assertThat(measurements, iterableWithSize(2));
assertThat(measurements.get(0).value(), is(4L));
assertThat(measurements.get(0).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "any")));
assertThat(measurements.get(1).value(), is(3L));
assertThat(measurements.get(1).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "version_conflict")));
var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total");
assertThat(indexingFailedTotal.getLong(), equalTo(4L));
var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric(
plugin::getLongAsyncCounterMeasurement,
"es.indexing.indexing.failed.version_conflict.total"
);
assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(3L));
}

public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin {
Expand Down Expand Up @@ -317,14 +319,13 @@ public void testNodeIndexingMetricsArePublishing() {
var indexingCurrent = getSingleRecordedMetric(plugin::getLongGaugeMeasurement, "es.indexing.docs.current.total");
assertThat(indexingCurrent.getLong(), equalTo(0L));

{
List<Measurement> measurements = plugin.getLongAsyncCounterMeasurement("es.indexing.indexing.failed.total");
assertThat(measurements, iterableWithSize(2));
assertThat(measurements.get(0).value(), is(0L));
assertThat(measurements.get(0).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "any")));
assertThat(measurements.get(1).value(), is(0L));
assertThat(measurements.get(1).attributes(), is(Map.of("es.indexing.indexing.failed.cause", "version_conflict")));
}
var indexingFailedTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.indexing.failed.total");
assertThat(indexingFailedTotal.getLong(), equalTo(0L));
var indexingFailedDueToVersionConflictTotal = getSingleRecordedMetric(
plugin::getLongAsyncCounterMeasurement,
"es.indexing.indexing.failed.version_conflict.total"
);
assertThat(indexingFailedDueToVersionConflictTotal.getLong(), equalTo(0L));

var deletionTotal = getSingleRecordedMetric(plugin::getLongAsyncCounterMeasurement, "es.indexing.deletion.docs.total");
assertThat(deletionTotal.getLong(), equalTo((long) deletesCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public IndicesMetrics(MeterRegistry meterRegistry, IndicesService indicesService
}

private static List<AutoCloseable> registerAsyncMetrics(MeterRegistry registry, IndicesStatsCache cache) {
final int TOTAL_METRICS = 48;
final int TOTAL_METRICS = 52;
List<AutoCloseable> metrics = new ArrayList<>(TOTAL_METRICS);
for (IndexMode indexMode : IndexMode.values()) {
String name = indexMode.getName();
Expand Down Expand Up @@ -149,20 +149,19 @@ private static List<AutoCloseable> registerAsyncMetrics(MeterRegistry registry,
)
);
metrics.add(
registry.registerLongsGauge(
registry.registerLongGauge(
"es.indices." + name + ".indexing.failure.total",
"current indexing failures of " + name + " indices",
"unit",
() -> List.of(
diffGauge(
() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedCount(),
Map.of("es.indices." + name + ".indexing.failure.cause", "any")
).get(),
diffGauge(
() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedDueToVersionConflictCount(),
Map.of("es.indices." + name + ".indexing.failure.cause", "version_conflict")
).get()
)
diffGauge(() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedCount())
)
);
metrics.add(
registry.registerLongGauge(
"es.indices." + name + ".indexing.failure.version_conflict.total",
"current indexing failures due to version conflict of " + name + " indices",
"unit",
diffGauge(() -> cache.getOrRefresh().get(indexMode).indexing.getIndexFailedDueToVersionConflictCount())
)
);
}
Expand All @@ -171,15 +170,11 @@ private static List<AutoCloseable> registerAsyncMetrics(MeterRegistry registry,
}

static Supplier<LongWithAttributes> diffGauge(Supplier<Long> currentValue) {
return diffGauge(currentValue, Map.of());
}

static Supplier<LongWithAttributes> diffGauge(Supplier<Long> currentValue, Map<String, Object> attributes) {
final AtomicLong counter = new AtomicLong();
return () -> {
var curr = currentValue.get();
long prev = counter.getAndUpdate(v -> Math.max(curr, v));
return new LongWithAttributes(Math.max(0, curr - prev), attributes);
return new LongWithAttributes(Math.max(0, curr - prev));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,29 +350,33 @@ private void registerAsyncMetrics(MeterRegistry registry) {
);

metrics.add(
registry.registerLongsAsyncCounter(
registry.registerLongAsyncCounter(
"es.indexing.indexing.failed.total",
"Total number of failed indexing operations",
"operations",
() -> List.of(
new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(o -> o.getIndices())
.map(o -> o.getIndexing())
.map(o -> o.getTotal())
.map(o -> o.getIndexFailedCount())
.orElse(0L),
Map.of("es.indexing.indexing.failed.cause", "any")
),
new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(o -> o.getIndices())
.map(o -> o.getIndexing())
.map(o -> o.getTotal())
.map(o -> o.getIndexFailedDueToVersionConflictCount())
.orElse(0L),
Map.of("es.indexing.indexing.failed.cause", "version_conflict")
)
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(o -> o.getIndices())
.map(o -> o.getIndexing())
.map(o -> o.getTotal())
.map(o -> o.getIndexFailedCount())
.orElse(0L)
)
)
);

metrics.add(
registry.registerLongAsyncCounter(
"es.indexing.indexing.failed.version_conflict.total",
"Total number of failed indexing operations due to version conflict",
"operations",
() -> new LongWithAttributes(
Optional.ofNullable(stats.getOrRefresh())
.map(o -> o.getIndices())
.map(o -> o.getIndexing())
.map(o -> o.getTotal())
.map(o -> o.getIndexFailedDueToVersionConflictCount())
.orElse(0L)
)
)
);
Expand Down

0 comments on commit b38a937

Please sign in to comment.