Skip to content

Commit

Permalink
Rbroughan/handle cancelled sync workflows (#12271)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Apr 26, 2024
1 parent 248adac commit 6afdb53
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,12 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

standardSyncOutput = runChildWorkflow(jobInputs);
workflowState.setFailed(getFailStatus(standardSyncOutput));
workflowState.setCancelled(getCancelledStatus(standardSyncOutput));

if (workflowState.isFailed()) {
reportFailure(connectionUpdaterInput, standardSyncOutput, FailureCause.UNKNOWN);
} else if (workflowState.isCancelled()) {
reportCancelled(connectionId);
} else {
reportSuccess(connectionUpdaterInput, standardSyncOutput);
}
Expand Down Expand Up @@ -1026,7 +1029,7 @@ private ConnectorJobOutput runCheckInChildWorkflow(final JobRunConfig jobRunConf
/**
* Set the internal status as failed and save the failures reasons.
*
* @return True if the job failed, false otherwise
* @return true if the job failed, false otherwise
*/
private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();
Expand All @@ -1049,6 +1052,16 @@ private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
return false;
}

/**
* Extracts whether the job was cancelled from the output.
*
* @return true if the job was cancelled, false otherwise
*/
private boolean getCancelledStatus(final StandardSyncOutput standardSyncOutput) {
final StandardSyncSummary summary = standardSyncOutput.getStandardSyncSummary();
return summary != null && summary.getStatus() == ReplicationStatus.CANCELLED;
}

/*
* Set a job as cancel and continue to the next job if and continue as a reset if needed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.airbyte.workers.temporal.scheduling.testcheckworkflow.CheckConnectionSourceSuccessOnlyWorkflow;
import io.airbyte.workers.temporal.scheduling.testcheckworkflow.CheckConnectionSuccessWorkflow;
import io.airbyte.workers.temporal.scheduling.testcheckworkflow.CheckConnectionSystemErrorWorkflow;
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.CancelledSyncWorkflow;
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.DbtFailureSyncWorkflow;
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.EmptySyncWorkflow;
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.NormalizationFailureSyncWorkflow;
Expand Down Expand Up @@ -120,7 +121,7 @@

/**
* Tests the core state machine of the connection manager workflow.
*
* <p>
* We've had race conditions in this in the past which is why (after addressing them) we have
* repeated cases, just in case there's a regression where a race condition is added back to a test.
*/
Expand Down Expand Up @@ -1868,6 +1869,28 @@ private static Stream<Arguments> backoffJobFailureMatrix() {

}

@Nested
@DisplayName("General functionality")
class General {

@BeforeEach
void setup() {
setupSimpleConnectionManagerWorkflow();
}

@Test
@DisplayName("When a sync returns a status of cancelled we report the run as cancelled")
void reportsCancelledWhenConnectionDisabled() throws Exception {
final var input = testInputBuilder().build();
setupSuccessfulWorkflow(CancelledSyncWorkflow.class, input);
workflow.submitManualSync();
testEnv.sleep(Duration.ofSeconds(60));

Mockito.verify(mJobCreationAndStatusUpdateActivity).jobCancelledWithAttemptNumber(Mockito.any(JobCancelledInputWithAttemptNumber.class));
}

}

private class HasFailureFromOrigin implements ArgumentMatcher<AttemptNumberFailureInput> {

private final FailureOrigin expectedFailureOrigin;
Expand Down Expand Up @@ -2073,9 +2096,13 @@ private void setupSimpleConnectionManagerWorkflow() {
}

private void setupSuccessfulWorkflow(final ConnectionUpdaterInput input) throws Exception {
setupSuccessfulWorkflow(EmptySyncWorkflow.class, input);
}

private <T> void setupSuccessfulWorkflow(final Class<T> syncWorkflowMockClass, final ConnectionUpdaterInput input) throws Exception {
returnTrueForLastJobOrAttemptFailure();
final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name());
syncWorker.registerWorkflowImplementationTypes(EmptySyncWorkflow.class);
syncWorker.registerWorkflowImplementationTypes(syncWorkflowMockClass);
final Worker checkWorker = testEnv.newWorker(TemporalJobType.CHECK_CONNECTION.name());
checkWorker.registerWorkflowImplementationTypes(CheckConnectionSuccessWorkflow.class);
testEnv.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.temporal.scheduling.testsyncworkflow

import io.airbyte.commons.temporal.scheduling.SyncWorkflow
import io.airbyte.config.StandardSyncInput
import io.airbyte.config.StandardSyncOutput
import io.airbyte.config.StandardSyncSummary
import io.airbyte.config.StandardSyncSummary.ReplicationStatus
import io.airbyte.config.SyncStats
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.JobRunConfig
import java.util.UUID

class CancelledSyncWorkflow : SyncWorkflow {
override fun run(
jobRunConfig: JobRunConfig,
sourceLauncherConfig: IntegrationLauncherConfig,
destinationLauncherConfig: IntegrationLauncherConfig,
syncInput: StandardSyncInput,
connectionId: UUID,
): StandardSyncOutput {
return StandardSyncOutput()
.withStandardSyncSummary(
StandardSyncSummary()
.withStatus(ReplicationStatus.CANCELLED)
.withTotalStats(SyncStats()),
)
}
}

0 comments on commit 6afdb53

Please sign in to comment.