Skip to content

Commit

Permalink
Add api endpoint attempt/get_combined_stats. (#7301)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Jun 21, 2023
1 parent a82d47b commit 1a38d65
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 1 deletion.
32 changes: 32 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2633,6 +2633,28 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/get_combined_stats:
post:
tags:
- attempt
- internal
summary: For retrieving combined stats for a single attempt
operationId: getAttemptCombinedStats
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/GetAttemptStatsRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/AttemptStats"
"404":
$ref: "#/components/responses/NotFoundResponse"
/v1/stream_statuses/list:
post:
summary: Gets a list of stream statuses filtered by parameters (with AND semantics).
Expand Down Expand Up @@ -5993,6 +6015,16 @@ components:
$ref: "#/components/schemas/AttemptNumber"
syncConfig:
$ref: "#/components/schemas/AttemptSyncConfig"
GetAttemptStatsRequestBody:
type: object
required:
- jobId
- attemptNumber
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
InternalOperationResult:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

package io.airbyte.commons.server.handlers;

import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SaveStatsRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.JobPersistence;
Expand All @@ -33,6 +35,24 @@ public AttemptHandler(final JobPersistence jobPersistence) {
this.jobPersistence = jobPersistence;
}

public AttemptStats getAttemptCombinedStats(final long jobId, final int attemptNo) throws IOException {
final SyncStats stats = jobPersistence.getAttemptCombinedStats(jobId, attemptNo);

if (stats == null) {
throw new IdNotFoundKnownException(
String.format("Could not find attempt stats for job_id: %d and attempt no: %d", jobId, attemptNo),
String.format("%d_%d", jobId, attemptNo));
}

return new AttemptStats()
.recordsEmitted(stats.getRecordsEmitted())
.bytesEmitted(stats.getBytesEmitted())
.bytesCommitted(stats.getBytesCommitted())
.recordsCommitted(stats.getRecordsCommitted())
.estimatedRecords(stats.getEstimatedRecords())
.estimatedBytes(stats.getEstimatedBytes());
}

public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) {
try {
jobPersistence.setAttemptTemporalWorkflowInfo(requestBody.getJobId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
Expand All @@ -21,6 +25,8 @@
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.JobPersistence;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -132,4 +138,33 @@ void testInternalHandlerSetsAttemptSyncConfig() throws Exception {
assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue());
}

@Test
void getAttemptCombinedStatsThrowsNotFound() throws Exception {
when(jobPersistence.getAttemptCombinedStats(anyLong(), anyInt())).thenReturn(null);

assertThrows(IdNotFoundKnownException.class, () -> handler.getAttemptCombinedStats(1L, 2));
}

@Test
void getAttemptCombinedStatsReturnsStats() throws Exception {
final var stats = new SyncStats();
stats.setRecordsEmitted(123L);
stats.setBytesEmitted(123L);
stats.setBytesCommitted(123L);
stats.setRecordsCommitted(123L);
stats.setEstimatedRecords(123L);
stats.setEstimatedBytes(123L);

when(jobPersistence.getAttemptCombinedStats(anyLong(), anyInt())).thenReturn(stats);

final AttemptStats result = handler.getAttemptCombinedStats(1L, 2);
assertEquals(stats.getRecordsEmitted(), result.getRecordsEmitted());
assertEquals(stats.getBytesEmitted(), result.getBytesEmitted());
assertEquals(stats.getBytesCommitted(), result.getBytesCommitted());
assertEquals(stats.getRecordsCommitted(), result.getRecordsCommitted());
assertEquals(stats.getEstimatedRecords(), result.getEstimatedRecords());
assertEquals(stats.getEstimatedBytes(), result.getEstimatedBytes());
assertNull(result.getStateMessagesEmitted()); // punting on this for now
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,17 @@ public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds
});
}

@Override
public SyncStats getAttemptCombinedStats(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(SYNC_STATS).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.orderBy(SYNC_STATS.UPDATED_AT.desc())
.fetchOne(getSyncStatsRecordMapper());
});
}

private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ record JobAttemptPair(long id, int attemptNumber) {}
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;

/**
* Retrieve only the combined stats for a single attempt.
*
* @return {@link AttemptStats}
*/
SyncStats getAttemptCombinedStats(long jobId, int attemptNumber) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;
Expand Down Expand Up @@ -169,7 +176,7 @@ record JobAttemptPair(long id, int attemptNumber) {}
Optional<String> getAttemptTemporalWorkflowId(long jobId, int attemptNumber) throws IOException;

/**
* When the output is a StandardSyncOutput, caller of this method should persiste
* When the output is a StandardSyncOutput, caller of this method should persist
* StandardSyncOutput#state in the configs database by calling
* ConfigRepository#updateConnectionState, which takes care of persisting the connection state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,33 @@ void testGetStatsForBadJobAttemptInput() throws IOException {
assertNotNull(jobPersistence.getAttemptStats(-1, -1));
}

@Test
@DisplayName("Combined stats can be retrieved without per stream stats.")
void testGetAttemptCombinedStats() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final var estimatedRecords = 1234L;
final var estimatedBytes = 5678L;
final var recordsEmitted = 9012L;
final var bytesEmitted = 3456L;
final var recordsCommitted = 7890L;
final var bytesCommitted = 1234L;

final var streamStats = List.of(
new StreamSyncStats().withStreamName("name1").withStreamNamespace("ns")
.withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L)));
jobPersistence.writeStats(
jobId, attemptNumber, estimatedRecords, estimatedBytes, recordsEmitted, bytesEmitted, recordsCommitted, bytesCommitted, streamStats);

final SyncStats stats = jobPersistence.getAttemptCombinedStats(jobId, attemptNumber);
assertEquals(estimatedRecords, stats.getEstimatedRecords());
assertEquals(estimatedBytes, stats.getEstimatedBytes());
assertEquals(recordsEmitted, stats.getRecordsEmitted());
assertEquals(bytesEmitted, stats.getBytesEmitted());
assertEquals(recordsCommitted, stats.getRecordsCommitted());
assertEquals(bytesCommitted, stats.getBytesCommitted());
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
package io.airbyte.server.apis;

import static io.airbyte.commons.auth.AuthRoleConstants.ADMIN;
import static io.airbyte.commons.auth.AuthRoleConstants.READER;

import io.airbyte.api.generated.AttemptApi;
import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.GetAttemptStatsRequestBody;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SaveStatsRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.auth.SecuredWorkspace;
import io.airbyte.commons.server.handlers.AttemptHandler;
import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors;
import io.micronaut.http.MediaType;
Expand All @@ -32,6 +36,17 @@ public AttemptApiController(final AttemptHandler attemptHandler) {
this.attemptHandler = attemptHandler;
}

@Override
@Post(uri = "/get_combined_stats",
processes = MediaType.APPLICATION_JSON)
@ExecuteOn(AirbyteTaskExecutors.IO)
@Secured({READER})
@SecuredWorkspace
public AttemptStats getAttemptCombinedStats(final GetAttemptStatsRequestBody getAttemptStatsRequestBody) {
return ApiHelper
.execute(() -> attemptHandler.getAttemptCombinedStats(getAttemptStatsRequestBody.getJobId(), getAttemptStatsRequestBody.getAttemptNumber()));
}

@Override
@Post(uri = "/save_stats",
processes = MediaType.APPLICATION_JSON)
Expand Down

0 comments on commit 1a38d65

Please sign in to comment.