Skip to content

Commit

Permalink
Make MetricsRegistry as an interface (airbytehq#12487)
Browse files Browse the repository at this point in the history
Make MetricsRegistry as an interface
  • Loading branch information
xiaohansong authored May 4, 2022
1 parent f18f046 commit 341ef12
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void count(final MetricsRegistry metric, final double amt, final S
}

log.info("publishing count, name: {}, value: {}, tags: {}", metric, amt, tags);
statsDClient.count(metric.metricName, amt, tags);
statsDClient.count(metric.getMetricName(), amt, tags);
}
}

Expand All @@ -91,7 +91,7 @@ public static void gauge(final MetricsRegistry metric, final double val, final S
}

log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.metricName, val, tags);
statsDClient.gauge(metric.getMetricName(), val, tags);
}
}

Expand All @@ -118,7 +118,7 @@ public static void recordTimeLocal(final MetricsRegistry metric, final double va
}

log.info("recording histogram, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.histogram(metric.metricName, val, tags);
statsDClient.histogram(metric.getMetricName(), val, tags);
}
}

Expand All @@ -139,7 +139,7 @@ public static void recordTimeGlobal(final MetricsRegistry metric, final double v
}

log.info("recording distribution, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.distribution(metric.metricName, val, tags);
statsDClient.distribution(metric.getMetricName(), val, tags);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,106 +4,16 @@

package io.airbyte.metrics.lib;

import com.google.api.client.util.Preconditions;

/**
* Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to
* an application and contains a description to make it easier to understand.
*
* Each object of the enum actually represent a metric, so the Registry name is misleading. The
* reason 'Registry' is in the name is to emphasize this enum's purpose as a source of truth for all
* metrics. This also helps code readability i.e. AirbyteMetricsRegistry.metricA.
*
* Metric Name Convention (adapted from
* https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/):
* <p>
* - Use lowercase. Metric names are case sensitive.
* <p>
* - Use underscore to delimit names with multiple words.
* <p>
* - No spaces. This makes the metric confusing to read.
* <p>
* - Avoid numbers. This makes the metric confusing to read. Numbers should only be used as a
* <p>
* - Add units at name end if applicable. This is especially relevant for time units.
* <p>
* - Include the time period in the name if the metric is meant to be run at a certain interval.
* Interface representing metrics collected an Airbyte Application. This interface is present as
* Java doesn't support enum inheritance as of Java 17.
*/
public enum MetricsRegistry {

ATTEMPT_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_created_by_release_stage",
"increments when a new attempt is created. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_failed_by_release_stage",
"increments when an attempt fails. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_FAILURE_ORIGIN(
MetricEmittingApps.WORKER,
"attempt_failed_by_failure_origin",
"increments for every failure origin a failed attempt has. since a failure can have multiple origins, a single failure can be counted more than once. tagged by failure origin."),
ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_succeeded_by_release_stage",
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
EST_NUM_METRICS_EMITTED_BY_REPORTER(
MetricEmittingApps.METRICS_REPORTER,
"est_num_metrics_emitted_by_reporter",
"estimated metrics emitted by the reporter in the last interval. this is estimated since the count is not precise."),
JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_running_job_age_secs",
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");
public interface MetricsRegistry {

public final MetricEmittingApp application;
public final String metricName;
public final String metricDescription;
MetricEmittingApp getApplication();

MetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);
String getMetricName();

this.application = application;
this.metricName = metricName;
this.metricDescription = metricDescription;
}
String getMetricDescription();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import com.google.api.client.util.Preconditions;

/**
* Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to
* an application and contains a description to make it easier to understand.
*
* Each object of the enum actually represent a metric, so the Registry name is misleading. The
* reason 'Registry' is in the name is to emphasize this enum's purpose as a source of truth for all
* metrics. This also helps code readability i.e. AirbyteMetricsRegistry.metricA.
*
* Metric Name Convention (adapted from
* https://docs.datadoghq.com/developers/guide/what-best-practices-are-recommended-for-naming-metrics-and-tags/):
* <p>
* - Use lowercase. Metric names are case sensitive.
* <p>
* - Use underscore to delimit names with multiple words.
* <p>
* - No spaces. This makes the metric confusing to read.
* <p>
* - Avoid numbers. This makes the metric confusing to read. Numbers should only be used as a
* <p>
* - Add units at name end if applicable. This is especially relevant for time units.
* <p>
* - Include the time period in the name if the metric is meant to be run at a certain interval.
*/
public enum OssMetricsRegistry implements MetricsRegistry {

ATTEMPT_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_created_by_release_stage",
"increments when a new attempt is created. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_failed_by_release_stage",
"increments when an attempt fails. attempts are double counted as this is tagged by release stage."),
ATTEMPT_FAILED_BY_FAILURE_ORIGIN(
MetricEmittingApps.WORKER,
"attempt_failed_by_failure_origin",
"increments for every failure origin a failed attempt has. since a failure can have multiple origins, a single failure can be counted more than once. tagged by failure origin."),
ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"attempt_succeeded_by_release_stage",
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
EST_NUM_METRICS_EMITTED_BY_REPORTER(
MetricEmittingApps.METRICS_REPORTER,
"est_num_metrics_emitted_by_reporter",
"estimated metrics emitted by the reporter in the last interval. this is estimated since the count is not precise."),
JOB_CANCELLED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_cancelled_by_release_stage",
"increments when a job is cancelled. jobs are double counted as this is tagged by release stage."),
JOB_CREATED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_created_by_release_stage",
"increments when a new job is created. jobs are double counted as this is tagged by release stage."),
JOB_FAILED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_failed_by_release_stage",
"increments when a job fails. jobs are double counted as this is tagged by release stage."),
JOB_SUCCEEDED_BY_RELEASE_STAGE(
MetricEmittingApps.WORKER,
"job_succeeded_by_release_stage",
"increments when a job succeeds. jobs are double counted as this is tagged by release stage."),
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
NUM_ACTIVE_CONN_PER_WORKSPACE(
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_running_job_age_secs",
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");

private final MetricEmittingApp application;
private final String metricName;
private final String metricDescription;

OssMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) {
Preconditions.checkNotNull(metricDescription);
Preconditions.checkNotNull(application);

this.application = application;
this.metricName = metricName;
this.metricDescription = metricDescription;
}

@Override
public MetricEmittingApp getApplication() {
return application;
}

@Override
public String getMetricName() {
return metricName;
}

@Override
public String getMetricDescription() {
return metricDescription;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void tearDown() {
public void testPublishTrueNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", false));
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand All @@ -30,15 +30,15 @@ public void testPublishTrueNoEmitError() {
public void testPublishFalseNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration("localhost", "1000", true));
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

@Test
@DisplayName("there should be no exception if we attempt to emit metrics without initializing")
public void testNoInitializeNoEmitError() {
Assertions.assertDoesNotThrow(() -> {
DogStatsDMetricSingleton.gauge(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.MetricsRegistry;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,31 +24,31 @@ public enum ToEmit {

NUM_PENDING_JOBS(countMetricEmission(() -> {
final var pendingJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfPendingJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
})),
NUM_RUNNING_JOBS(countMetricEmission(() -> {
final var runningJobs = ReporterApp.configDatabase.query(MetricQueries::numberOfRunningJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
})),
OLDEST_RUNNING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestRunningJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
})),
OLDEST_PENDING_JOB_AGE_SECS(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::oldestPendingJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
DogStatsDMetricSingleton.gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
})),
NUM_ACTIVE_CONN_PER_WORKSPACE(countMetricEmission(() -> {
final var age = ReporterApp.configDatabase.query(MetricQueries::numberOfActiveConnPerWorkspace);
for (long count : age) {
DogStatsDMetricSingleton.percentile(MetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
DogStatsDMetricSingleton.percentile(OssMetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
}
})),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(countMetricEmission(() -> {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
DogStatsDMetricSingleton.recordTimeGlobal(
MetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
}
}), 1, TimeUnit.HOURS);

Expand All @@ -72,7 +72,7 @@ private static Runnable countMetricEmission(Procedure metricQuery) {
return () -> {
try {
metricQuery.call();
DogStatsDMetricSingleton.count(MetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
DogStatsDMetricSingleton.count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
} catch (Exception e) {
log.error("Exception querying database for metric: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricsRegistry;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
Expand Down Expand Up @@ -555,7 +555,7 @@ public KubePodProcess(final boolean isOrchestrator,
final boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p);
return isReady || KubePodResourceHelper.isTerminal(p);
}, 20, TimeUnit.MINUTES);
DogStatsDMetricSingleton.recordTimeGlobal(MetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start);
DogStatsDMetricSingleton.recordTimeGlobal(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, System.currentTimeMillis() - start);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
Expand Down
Loading

0 comments on commit 341ef12

Please sign in to comment.