From 9415eb55cb929358960cc98e96f79863a7955fcf Mon Sep 17 00:00:00 2001 From: Charles Date: Tue, 17 Jan 2023 16:13:06 -0800 Subject: [PATCH] add attempt metrics (#21286) --- .../io/airbyte/commons/util/MoreLists.java | 10 ++ .../airbyte/commons/util/MoreListsTest.java | 11 ++ .../io/airbyte/metrics/lib/MetricTags.java | 7 ++ .../metrics/lib/OssMetricsRegistry.java | 33 +++++- .../job/DefaultJobPersistence.java | 2 + .../persistence/job/models/Attempt.java | 7 ++ .../airbyte/persistence/job/models/Job.java | 6 + .../job/DefaultJobPersistenceTest.java | 2 + .../persistence/job/models/AttemptTest.java | 2 +- .../persistence/job/models/JobTest.java | 9 +- .../job/tracker/TrackingMetadataTest.java | 2 +- .../handlers/JobHistoryHandlerTest.java | 6 +- ...obCreationAndStatusUpdateActivityImpl.java | 110 +++++++++++++++++- ...obCreationAndStatusUpdateActivityTest.java | 12 +- 14 files changed, 200 insertions(+), 19 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/MoreLists.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/MoreLists.java index 3ede6f63f83..a737c526f55 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/MoreLists.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/MoreLists.java @@ -4,6 +4,7 @@ package io.airbyte.commons.util; +import com.google.common.base.Preconditions; import java.util.List; import java.util.stream.Stream; @@ -21,4 +22,13 @@ public static List concat(final List... lists) { return Stream.of(lists).flatMap(List::stream).toList(); } + public static T getOrNull(final List list, final int index) { + Preconditions.checkNotNull(list); + if (list.size() > index) { + return list.get(index); + } else { + return null; + } + } + } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/util/MoreListsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/util/MoreListsTest.java index 542e0ee75a4..452af43e8c7 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/util/MoreListsTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/util/MoreListsTest.java @@ -5,6 +5,8 @@ package io.airbyte.commons.util; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.List; import org.junit.jupiter.api.Test; @@ -19,4 +21,13 @@ void testConcat() { assertEquals(expected, actual); } + @Test + void testGetOrNull() { + assertThrows(NullPointerException.class, () -> MoreLists.getOrNull(null, 0)); + assertEquals(1, MoreLists.getOrNull(List.of(1, 2, 3), 0)); + assertEquals(2, MoreLists.getOrNull(List.of(1, 2, 3), 1)); + assertEquals(3, MoreLists.getOrNull(List.of(1, 2, 3), 2)); + assertNull(MoreLists.getOrNull(List.of(1, 2, 3), 3)); + } + } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 4ffd6e2081c..b4f3542e5c4 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -26,6 +26,13 @@ public class MetricTags { public static final String GEOGRAPHY = "geography"; public static final String UNKNOWN = "unknown"; + // the release stage of the highest release connector in the sync (GA > Beta > Alpha) + public static final String MAX_CONNECTOR_RELEASE_STATE = "max_connector_release_stage"; + // the release stage of the lowest release stage connector in the sync (GA > Beta > Alpha) + public static final String MIN_CONNECTOR_RELEASE_STATE = "min_connector_release_stage"; + public static final String ATTEMPT_OUTCOME = "attempt_outcome"; // succeeded|failed + public static final String ATTEMPT_NUMBER = "attempt_number"; // 0|1|2|3 + public static String getReleaseStage(final ReleaseStage stage) { return stage != null ? stage.getLiteral() : UNKNOWN; } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index 1fae89c965c..f59d32ec98a 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -5,6 +5,8 @@ package io.airbyte.metrics.lib; import com.google.api.client.util.Preconditions; +import java.util.Arrays; +import java.util.List; /** * Enum source of truth of all Airbyte metrics. Each enum value represent a metric and is linked to @@ -137,21 +139,48 @@ public enum OssMetricsRegistry implements MetricsRegistry { "number of records synced during replication"), RESET_REQUEST(MetricEmittingApps.WORKER, "reset_request", - "number of requested resets"); + "number of requested resets"), + + ATTEMPTS_CREATED( + MetricEmittingApps.WORKER, + "attempt_created", + "increments when a new attempt is created. one is emitted per attempt", + MetricTags.GEOGRAPHY, + MetricTags.ATTEMPT_NUMBER, + MetricTags.MIN_CONNECTOR_RELEASE_STATE, + MetricTags.MAX_CONNECTOR_RELEASE_STATE), + ATTEMPTS_COMPLETED( + MetricEmittingApps.WORKER, + "attempt_completed", + "increments when a new attempt is completed. one is emitted per attempt", + MetricTags.GEOGRAPHY, + MetricTags.ATTEMPT_NUMBER, + MetricTags.MIN_CONNECTOR_RELEASE_STATE, + MetricTags.MAX_CONNECTOR_RELEASE_STATE, + MetricTags.ATTEMPT_QUEUE, + MetricTags.ATTEMPT_OUTCOME, + MetricTags.FAILURE_ORIGIN, // only includes the first failure origin + MetricTags.FAILURE_TYPE); // only includes the first failure type private final MetricEmittingApp application; private final String metricName; private final String metricDescription; + // added this field to declare metric attributes, but we never read them. + @SuppressWarnings("FieldCanBeLocal") + private final List metricTags; + OssMetricsRegistry(final MetricEmittingApp application, final String metricName, - final String metricDescription) { + final String metricDescription, + final String... metricTags) { Preconditions.checkNotNull(metricDescription); Preconditions.checkNotNull(application); this.application = application; this.metricName = metricName; this.metricDescription = metricDescription; + this.metricTags = Arrays.asList(metricTags); } @Override diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index c5ea5d7b1cc..26b40fd3fc7 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -157,6 +157,7 @@ private static String jobSelectAndJoin(final String jobsSubquery) { + "attempts.log_path AS log_path,\n" + "attempts.output AS attempt_output,\n" + "attempts.status AS attempt_status,\n" + + "attempts.processing_task_queue AS processing_task_queue,\n" + "attempts.failure_summary AS attempt_failure_summary,\n" + "attempts.created_at AS attempt_created_at,\n" + "attempts.updated_at AS attempt_updated_at,\n" @@ -931,6 +932,7 @@ private static Attempt getAttemptFromRecord(final Record record) { Path.of(record.get("log_path", String.class)), record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class), Enums.toEnum(record.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(), + record.get("processing_task_queue", String.class), record.get("attempt_failure_summary", String.class) == null ? null : Jsons.deserialize(record.get("attempt_failure_summary", String.class), AttemptFailureSummary.class), getEpoch(record, "attempt_created_at"), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index a3dc08b076d..5b585e42e39 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -17,6 +17,7 @@ public class Attempt { private final long jobId; private final JobOutput output; private final AttemptStatus status; + private final String processingTaskQueue; private final AttemptFailureSummary failureSummary; private final Path logPath; private final long updatedAtInSecond; @@ -28,6 +29,7 @@ public Attempt(final int attemptNumber, final Path logPath, final @Nullable JobOutput output, final AttemptStatus status, + final String processingTaskQueue, final @Nullable AttemptFailureSummary failureSummary, final long createdAtInSecond, final long updatedAtInSecond, @@ -36,6 +38,7 @@ public Attempt(final int attemptNumber, this.jobId = jobId; this.output = output; this.status = status; + this.processingTaskQueue = processingTaskQueue; this.failureSummary = failureSummary; this.logPath = logPath; this.updatedAtInSecond = updatedAtInSecond; @@ -59,6 +62,10 @@ public AttemptStatus getStatus() { return status; } + public String getProcessingTaskQueue() { + return processingTaskQueue; + } + public Optional getFailureSummary() { return Optional.ofNullable(failureSummary); } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java index 549f58668a0..ada40c6ed08 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java @@ -126,6 +126,12 @@ public Optional getLastAttemptWithOutput() { .findFirst(); } + public Optional getLastAttempt() { + return getAttempts() + .stream() + .max(Comparator.comparing(Attempt::getCreatedAtInSecond)); + } + public boolean hasRunningAttempt() { return getAttempts().stream().anyMatch(a -> !Attempt.isAttemptInTerminalState(a)); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index a06422c25bd..0652e2a8251 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -152,6 +152,7 @@ private static Attempt createAttempt(final int id, final long jobId, final Attem null, status, null, + null, NOW.getEpochSecond(), NOW.getEpochSecond(), NOW.getEpochSecond()); @@ -165,6 +166,7 @@ private static Attempt createUnfinishedAttempt(final int id, final long jobId, f null, status, null, + null, NOW.getEpochSecond(), NOW.getEpochSecond(), null); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index badc1ac68d7..f8660bd4cb5 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1, 1L, null, null, attemptStatus, null, 0L, 0L, null); + return new Attempt(1, 1L, null, null, attemptStatus, null, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 4cdb4f15403..9dc147ae99b 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -43,7 +43,7 @@ void testHasRunningAttempt() { private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) { final List attempts = IntStream.range(0, attemptStatuses.length) - .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, idx, 0L, null)) + .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, null, idx, 0L, null)) .collect(Collectors.toList()); return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L); } @@ -71,6 +71,13 @@ void testGetLastFailedAttempt() { assertEquals(2, job.getLastFailedAttempt().get().getAttemptNumber()); } + @Test + void testGetLastAttempt() { + final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED, AttemptStatus.SUCCEEDED); + assertTrue(job.getLastAttempt().isPresent()); + assertEquals(3, job.getLastAttempt().get().getAttemptNumber()); + } + @Test void testValidateStatusTransitionFromPending() { final Job pendingJob = jobWithStatus(JobStatus.PENDING); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java index e8fa039484d..d8ffb69eaac 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java @@ -61,7 +61,7 @@ void testgenerateJobAttemptMetadataWithNulls() { final StandardSyncSummary standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); final StandardSyncOutput standardSyncOutput = new StandardSyncOutput().withStandardSyncSummary(standardSyncSummary); final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); - final Attempt attempt = new Attempt(0, 10L, Path.of("test"), jobOutput, AttemptStatus.SUCCEEDED, null, 100L, 100L, 99L); + final Attempt attempt = new Attempt(0, 10L, Path.of("test"), jobOutput, AttemptStatus.SUCCEEDED, null, null, 100L, 100L, 99L); final Job job = mock(Job.class); when(job.getAttempts()).thenReturn(List.of(attempt)); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 105e2fbb9c1..02bd7034f6f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps); } @BeforeEach @@ -442,11 +442,11 @@ void testEnumConversion() { @DisplayName("Should return attempt normalization info for the job") void testGetAttemptNormalizationStatuses() throws IOException { - AttemptNormalizationStatus databaseReadResult = new AttemptNormalizationStatus(1, Optional.of(10L), /* hasNormalizationFailed= */ false); + final AttemptNormalizationStatus databaseReadResult = new AttemptNormalizationStatus(1, Optional.of(10L), /* hasNormalizationFailed= */ false); when(jobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(databaseReadResult)); - AttemptNormalizationStatusReadList expectedStatus = new AttemptNormalizationStatusReadList().attemptNormalizationStatuses( + final AttemptNormalizationStatusReadList expectedStatus = new AttemptNormalizationStatusReadList().attemptNormalizationStatuses( List.of(new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).hasNormalizationFailed(false).recordsCommitted(10L))); assertEquals(expectedStatus, jobHistoryHandler.getAttemptNormalizationStatuses(new JobIdRequestBody().id(JOB_ID))); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 37a9f764762..295112266ec 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -20,6 +20,7 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; +import io.airbyte.commons.util.MoreLists; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; import io.airbyte.config.Configs.WorkerEnvironment; @@ -65,6 +66,9 @@ import jakarta.inject.Singleton; import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -198,15 +202,16 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) try { final long jobId = input.getJobId(); ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId)); - final Job createdJob = jobPersistence.getJob(jobId); + final Job job = jobPersistence.getJob(jobId); - final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob); + final WorkerRun workerRun = temporalWorkerRunFactory.create(job); final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME); - final int persistedAttemptId = jobPersistence.createAttempt(jobId, logFilePath); + final int persistedAttemptNumber = jobPersistence.createAttempt(jobId, logFilePath); emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId); + emitAttemptCreatedEvent(job, persistedAttemptNumber); LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot()); - return new AttemptCreationOutput(persistedAttemptId); + return new AttemptCreationOutput(persistedAttemptNumber); } catch (final IOException e) { throw new RetryableException(e); } @@ -470,6 +475,89 @@ private void failNonTerminalJobs(final UUID connectionId) { } } + private static final int MAX_ATTEMPTS = 3; + private static final Map RELEASE_STAGE_ORDER = Map.of( + ReleaseStage.custom, 1, + ReleaseStage.alpha, 2, + ReleaseStage.beta, 3, + ReleaseStage.generally_available, 4); + private static final Comparator RELEASE_STAGE_COMPARATOR = Comparator.comparingInt(RELEASE_STAGE_ORDER::get); + + private static List orderByReleaseStageAsc(final List releaseStages) { + final List copiedList = new ArrayList<>(releaseStages); + copiedList.sort(RELEASE_STAGE_COMPARATOR); + return copiedList; + } + + /** + * Extract the attempt number from an attempt. If the number is anonymous (not 0,1,2,3) for some + * reason return null. We don't want to accidentally have high cardinality here because of a bug. + * + * @param attemptNumber - attemptNumber to parse + * @return extract attempt number or null + */ + private static String parseAttemptNumberOrNull(final int attemptNumber) { + if (attemptNumber > MAX_ATTEMPTS) { + return null; + } else { + return Integer.toString(attemptNumber); + } + } + + private void emitAttemptEvent(final OssMetricsRegistry metric, final Job job, final int attemptNumber) throws IOException { + emitAttemptEvent(metric, job, attemptNumber, Collections.emptyList()); + } + + private void emitAttemptEvent(final OssMetricsRegistry metric, + final Job job, + final int attemptNumber, + final List additionalAttributes) + throws IOException { + final List releaseStages = configRepository.getJobIdToReleaseStages(job.getId()); + final var releaseStagesOrdered = orderByReleaseStageAsc(releaseStages); + final var connectionId = job.getScope() == null ? null : UUID.fromString(job.getScope()); + final var geography = configRepository.getGeographyForConnection(connectionId); + + final List baseMetricAttributes = List.of( + new MetricAttribute(MetricTags.GEOGRAPHY, geography == null ? null : geography.toString()), + new MetricAttribute(MetricTags.ATTEMPT_NUMBER, parseAttemptNumberOrNull(attemptNumber)), + new MetricAttribute(MetricTags.MIN_CONNECTOR_RELEASE_STATE, MetricTags.getReleaseStage(MoreLists.getOrNull(releaseStagesOrdered, 0))), + new MetricAttribute(MetricTags.MAX_CONNECTOR_RELEASE_STATE, MetricTags.getReleaseStage(MoreLists.getOrNull(releaseStagesOrdered, 1)))); + + final MetricAttribute[] allMetricAttributes = MoreLists + .concat(baseMetricAttributes, additionalAttributes) + .toArray(new MetricAttribute[baseMetricAttributes.size() + additionalAttributes.size()]); + MetricClientFactory.getMetricClient().count(metric, 1, allMetricAttributes); + } + + private void emitAttemptCreatedEvent(final Job job, final int attemptNumber) throws IOException { + emitAttemptEvent(OssMetricsRegistry.ATTEMPTS_CREATED, job, attemptNumber); + } + + private void emitAttemptCompletedEvent(final Job job, final Attempt attempt) throws IOException { + final Optional failureOrigin = attempt.getFailureSummary().flatMap(summary -> summary.getFailures() + .stream() + .map(FailureReason::getFailureOrigin) + .filter(Objects::nonNull) + .map(FailureOrigin::name) + .findFirst()); + + final Optional failureType = attempt.getFailureSummary().flatMap(summary -> summary.getFailures() + .stream() + .map(FailureReason::getFailureType) + .filter(Objects::nonNull) + .map(MetricTags::getFailureType) + .findFirst()); + + final List additionalAttributes = List.of( + new MetricAttribute(MetricTags.ATTEMPT_OUTCOME, attempt.getStatus().toString()), + new MetricAttribute(MetricTags.FAILURE_ORIGIN, failureOrigin.orElse(null)), + new MetricAttribute(MetricTags.FAILURE_TYPE, failureType.orElse(null)), + new MetricAttribute(MetricTags.ATTEMPT_QUEUE, attempt.getProcessingTaskQueue())); + + emitAttemptEvent(OssMetricsRegistry.ATTEMPTS_COMPLETED, job, attempt.getAttemptNumber(), additionalAttributes); + } + private void emitJobIdToReleaseStagesMetric(final OssMetricsRegistry metric, final long jobId) throws IOException { final var releaseStages = configRepository.getJobIdToReleaseStages(jobId); if (releaseStages == null || releaseStages.isEmpty()) { @@ -484,10 +572,22 @@ private void emitJobIdToReleaseStagesMetric(final OssMetricsRegistry metric, fin } } - private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus status) { + private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus status) throws IOException { + emitAttemptCompletedEventIfAttemptPresent(job); jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } + private void emitAttemptCompletedEventIfAttemptPresent(final Job job) throws IOException { + if (job == null) { + return; + } + + final Optional lastAttempt = job.getLastAttempt(); + if (lastAttempt.isPresent()) { + emitAttemptCompletedEvent(job, lastAttempt.get()); + } + } + private void trackCompletionForInternalFailure(final Long jobId, final UUID connectionId, final Integer attemptId, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 4f34adfb858..ccdb8447fd6 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -234,7 +234,7 @@ void createAttempt() throws IOException { @Test void isLastJobOrAttemptFailureTrueTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -254,7 +254,7 @@ void isLastJobOrAttemptFailureTrueTest() throws Exception { @Test void isLastJobOrAttemptFailureFalseTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.FAILED, 4L, 4L, 5L); @@ -273,9 +273,9 @@ void isLastJobOrAttemptFailureFalseTest() throws Exception { @Test void isLastJobOrAttemptFailurePreviousAttemptFailureTest() throws Exception { - final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, 2L, 3L, 3L); + final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int activeAttemptNumber = 1; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -505,9 +505,9 @@ void setJobCancelledWrapException() throws IOException { @Test void ensureCleanJobState() throws IOException { - final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, 2L, 3L, 3L); + final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int runningAttemptNumber = 1; - final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, 4L, 5L, null); + final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job runningJob = new Job(1, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(failedAttempt, runningAttempt), JobStatus.RUNNING, 2L, 2L, 3L);