Skip to content

Commit

Permalink
Reapply the connection status change (#8288)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Aug 11, 2023
1 parent daee0c2 commit abba686
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 0 deletions.
62 changes: 62 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/status:
post:
tags:
- connection
summary: Get the status of multiple connections
operationId: getConnectionStatuses
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionStatusesRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionStatusesRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/get:
post:
tags:
Expand Down Expand Up @@ -5024,6 +5047,45 @@ components:
$ref: "#/components/schemas/NonBreakingChangesPreference"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
ConnectionStatusesRequestBody:
type: object
required:
- connectionIds
properties:
connectionIds:
type: array
items:
$ref: "#/components/schemas/ConnectionId"
ConnectionStatusesRead:
type: array
items:
$ref: "#/components/schemas/ConnectionStatusRead"
ConnectionStatusRead:
type: object
required:
- connectionId
- lastSyncJobStatus
- lastSuccessfulSync
- nextSync
- isRunning
- isLastCompletedJobReset
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
lastSyncJobStatus:
$ref: "#/components/schemas/JobStatus"
lastSuccessfulSync:
type: integer
format: int64
nextSync:
type: integer
format: int64
isRunning:
type: boolean
isLastCompletedJobReset:
type: boolean
failureType:
$ref: "#/components/schemas/FailureType"
SchemaChange:
enum:
- no_change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionStatusRead;
import io.airbyte.api.model.generated.ConnectionStatusesRequestBody;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.DestinationSearch;
Expand All @@ -48,8 +50,12 @@
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.FieldSelectionData;
import io.airbyte.config.Geography;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.ScheduleData;
Expand All @@ -70,6 +76,7 @@
import io.airbyte.persistence.job.JobNotifier;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
import io.airbyte.persistence.job.models.JobWithStatusAndTimestamp;
Expand Down Expand Up @@ -117,6 +124,7 @@ public class ConnectionsHandler {
private final JobNotifier jobNotifier;
private final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
private final Integer maxFailedJobsInARowBeforeConnectionDisable;
private final int maxJobLookback = 10;

@Inject
public ConnectionsHandler(
Expand Down Expand Up @@ -842,4 +850,42 @@ public ConnectionReadList listConnectionsForActorDefinition(final ActorDefinitio
return new ConnectionReadList().connections(connectionReads);
}

public List<ConnectionStatusRead> getConnectionStatuses(
ConnectionStatusesRequestBody connectionStatusesRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
List<UUID> connectionIds = connectionStatusesRequestBody.getConnectionIds();
List<ConnectionStatusRead> result = new ArrayList<>();
for (UUID connectionId : connectionIds) {
List<Job> jobs = jobPersistence.listJobs(Set.of(JobConfig.ConfigType.SYNC, JobConfig.ConfigType.RESET_CONNECTION), connectionId.toString(),
maxJobLookback, 0);
boolean isRunning = jobs.stream().anyMatch(job -> JobStatus.NON_TERMINAL_STATUSES.contains(job.getStatus()));

Optional<Job> lastJob = jobs.stream().filter(job -> JobStatus.TERMINAL_STATUSES.contains(job.getStatus())).findFirst();
Optional<JobStatus> lastSyncStatus = lastJob.map(job -> job.getStatus());

Optional<Job> lastSuccessfulJob = jobs.stream().filter(job -> job.getStatus() == JobStatus.SUCCEEDED).findFirst();
Optional<Long> lastSuccessTimestamp = lastSuccessfulJob.map(job -> job.getUpdatedAtInSecond());

ConnectionStatusRead connectionStatus = new ConnectionStatusRead()
.connectionId(connectionId)
.isRunning(isRunning)
.lastSyncJobStatus(Enums.convertTo(lastSyncStatus.orElse(null),
io.airbyte.api.model.generated.JobStatus.class))
.lastSuccessfulSync(lastSuccessTimestamp.orElse(null))
.nextSync(null)
.isLastCompletedJobReset(lastJob.map(job -> job.getConfigType() == ConfigType.RESET_CONNECTION).orElse(false));
Optional<FailureType> failureType =
lastJob.flatMap(Job::getLastFailedAttempt)
.flatMap(Attempt::getFailureSummary)
.flatMap(s -> s.getFailures().stream().findFirst())
.map(FailureReason::getFailureType);
if (failureType.isPresent() && lastJob.get().getStatus() == JobStatus.FAILED) {
connectionStatus.setFailureType(Enums.convertTo(failureType.get(), io.airbyte.api.model.generated.FailureType.class));
}
result.add(connectionStatus);
}

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.airbyte.api.model.generated.ConnectionScheduleType;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionStatusRead;
import io.airbyte.api.model.generated.ConnectionStatusesRequestBody;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationSearch;
import io.airbyte.api.model.generated.DestinationSyncMode;
Expand All @@ -57,13 +59,16 @@
import io.airbyte.commons.server.helpers.ConnectionHelpers;
import io.airbyte.commons.server.scheduler.EventRunner;
import io.airbyte.config.ActorType;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Cron;
import io.airbyte.config.DataType;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FieldSelectionData;
import io.airbyte.config.Geography;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
Expand All @@ -82,6 +87,8 @@
import io.airbyte.persistence.job.JobNotifier;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.AttemptStatus;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
import io.airbyte.persistence.job.models.JobWithStatusAndTimestamp;
Expand Down Expand Up @@ -1684,6 +1691,31 @@ void testDiffDifferentDestinationSyncMode() {
assertEquals(Set.of(new StreamDescriptor().name(STREAM1)), changedSd);
}

@Test
void testConnectionStatus()
throws JsonValidationException, ConfigNotFoundException, IOException {
UUID connectionId = UUID.randomUUID();
AttemptFailureSummary failureSummary = new AttemptFailureSummary();
failureSummary.setFailures(List.of(new FailureReason().withFailureOrigin(FailureReason.FailureOrigin.DESTINATION)));
Attempt failedAttempt = new Attempt(0, 0, null, null, null, AttemptStatus.FAILED, null, failureSummary, 0, 0, 0L);
List<Job> jobs = List.of(
new Job(0L, JobConfig.ConfigType.SYNC, connectionId.toString(), null, null, JobStatus.RUNNING, 1001L, 1000L, 1002L),
new Job(0L, JobConfig.ConfigType.SYNC, connectionId.toString(), null, List.of(failedAttempt), JobStatus.FAILED, 901L, 900L, 902L),
new Job(0L, JobConfig.ConfigType.SYNC, connectionId.toString(), null, null, JobStatus.SUCCEEDED, 801L, 800L, 802L));
when(jobPersistence.listJobs(Set.of(JobConfig.ConfigType.SYNC, JobConfig.ConfigType.RESET_CONNECTION), connectionId.toString(), 10, 0))
.thenReturn(jobs);
ConnectionStatusesRequestBody req = new ConnectionStatusesRequestBody().connectionIds(List.of(connectionId));
List<ConnectionStatusRead> status = connectionsHandler.getConnectionStatuses(req);
assertEquals(1, status.size());

ConnectionStatusRead connectionStatus = status.get(0);
assertEquals(connectionId, connectionStatus.getConnectionId());
assertEquals(Enums.convertTo(JobStatus.FAILED, io.airbyte.api.model.generated.JobStatus.class), connectionStatus.getLastSyncJobStatus());
assertEquals(802L, connectionStatus.getLastSuccessfulSync());
assertEquals(true, connectionStatus.getIsRunning());
assertEquals(null, connectionStatus.getNextSync());
}

private AirbyteStreamAndConfiguration getStreamAndConfig(final String name, final AirbyteStreamConfiguration config) {
return new AirbyteStreamAndConfiguration()
.config(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionSearch;
import io.airbyte.api.model.generated.ConnectionStatusRead;
import io.airbyte.api.model.generated.ConnectionStatusesRequestBody;
import io.airbyte.api.model.generated.ConnectionStreamRequestBody;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.InternalOperationResult;
Expand All @@ -35,6 +37,7 @@
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.micronaut.security.rules.SecurityRule;
import java.util.List;

@SuppressWarnings("MissingJavadocType")
@Controller("/api/v1/connections")
Expand Down Expand Up @@ -133,6 +136,15 @@ public ConnectionRead getConnection(@Body final ConnectionIdRequestBody connecti
return ApiHelper.execute(() -> connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()));
}

@Override
@Post(uri = "/connections/status")
@Secured({READER})
@SecuredWorkspace
@ExecuteOn(AirbyteTaskExecutors.IO)
public List<ConnectionStatusRead> getConnectionStatuses(@Body final ConnectionStatusesRequestBody connectionStatusesRequestBody) {
return ApiHelper.execute(() -> connectionsHandler.getConnectionStatuses(connectionStatusesRequestBody));
}

@Override
@Post(uri = "/delete")
@Status(HttpStatus.NO_CONTENT)
Expand Down

0 comments on commit abba686

Please sign in to comment.