Skip to content

Commit

Permalink
chore(broker): add last exported metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisKujawa committed Aug 28, 2020
1 parent da6ea07 commit 5520b4e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ public ExporterDirector(final ExporterDirectorContext context) {

this.logStream = Objects.requireNonNull(context.getLogStream());
final int partitionId = logStream.getPartitionId();
this.recordExporter = new RecordExporter(containers, partitionId);
this.metrics = new ExporterMetrics(partitionId);
this.recordExporter = new RecordExporter(metrics, containers, partitionId);
this.exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
this.recordWrapStrategy = new EndlessRetryStrategy(actor);

this.zeebeDb = context.getZeebeDb();

this.metrics = new ExporterMetrics(partitionId);
}

public ActorFuture<Void> startAsync(final ActorScheduler actorScheduler) {
Expand Down Expand Up @@ -314,13 +312,18 @@ private static class RecordExporter {
private final RecordMetadata rawMetadata = new RecordMetadata();
private final List<ExporterContainer> containers;
private final TypedEventImpl typedEvent;
private final ExporterMetrics exporterMetrics;

private boolean shouldExport;
private int exporterIndex;

RecordExporter(final List<ExporterContainer> containers, final int partitionId) {
RecordExporter(
final ExporterMetrics exporterMetrics,
final List<ExporterContainer> containers,
final int partitionId) {
this.containers = containers;
typedEvent = new TypedEventImpl(partitionId);
this.exporterMetrics = exporterMetrics;
}

void wrap(final LoggedEvent rawEvent) {
Expand Down Expand Up @@ -355,6 +358,7 @@ public boolean export() {
}

exporterIndex++;
exporterMetrics.setLastExportedPosition(container.getId(), typedEvent.getPosition());
} catch (final Exception ex) {
container
.context
Expand Down Expand Up @@ -424,6 +428,7 @@ public void updateLastExportedRecordPosition(final long position) {
actor.run(
() -> {
state.setPosition(getId(), position);
metrics.setLastUpdatedExportedPosition(getId(), position);
this.position = position;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.broker.exporter.stream;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.zeebe.protocol.record.ValueType;

public final class ExporterMetrics {
Expand All @@ -20,6 +21,22 @@ public final class ExporterMetrics {
.labelNames("action", "partition", "valueType")
.register();

private static final Gauge LAST_EXPORTED_POSITION =
Gauge.build()
.namespace("zeebe")
.name("exporter_last_exported_position")
.help("The last exported position by exporter and partition.")
.labelNames("exporter", "partition")
.register();

private static final Gauge LAST_UPDATED_EXPORTED_POSITION =
Gauge.build()
.namespace("zeebe")
.name("exporter_last_updated_exported_position")
.help("The last exported position which was also updated/commited by the exporter.")
.labelNames("exporter", "partition")
.register();

private final String partitionIdLabel;

public ExporterMetrics(final int partitionId) {
Expand All @@ -37,4 +54,12 @@ public void eventExported(final ValueType valueType) {
public void eventSkipped(final ValueType valueType) {
event("skipped", valueType);
}

public void setLastUpdatedExportedPosition(final String exporter, final long position) {
LAST_UPDATED_EXPORTED_POSITION.labels(exporter, partitionIdLabel).set(position);
}

public void setLastExportedPosition(final String exporter, final long position) {
LAST_EXPORTED_POSITION.labels(exporter, partitionIdLabel).set(position);
}
}

0 comments on commit 5520b4e

Please sign in to comment.