Skip to content

Commit

Permalink
Revert "Rbroughan/check progress activity" (#7574)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Jun 30, 2023
1 parent e3032a2 commit 95cc8f9
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class MetricTags {
public static final String WORKSPACE_ID = "workspace_id";
public static final String UNKNOWN = "unknown";
public static final String USER_TYPE = "user_type"; // real user, service account, data plane user, etc
public static final String WILL_RETRY = "will_retry";

public static final String NOTIFICATION_TRIGGER = "notification_trigger";
public static final String NOTIFICATION_CLIENT = "notification_client";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ public enum OssMetricsRegistry implements MetricsRegistry {
REPLICATION_WORKER_CREATED(MetricEmittingApps.WORKER,
"replication_worker_created",
"number of replication worker created"),
REPLICATION_MADE_PROGRESS(MetricEmittingApps.WORKER,
"replication_made_progress",
"Count of replication runs that made progress. To be faceted by attributes."),
REPLICATION_PROGRESS_CHECK_FAIL(MetricEmittingApps.WORKER,
"replication_progress_check_fail",
"Count of failures checking progress. To be faceted by attributes."),
RESET_REQUEST(MetricEmittingApps.WORKER,
"reset_request",
"number of requested resets"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.workers.temporal.check.connection.SubmitCheckConnectionActivity;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.CheckRunProgressActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.FeatureFlagFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
Expand Down Expand Up @@ -79,8 +78,7 @@ public List<Object> connectionManagerActivities(
final WorkflowConfigActivity workflowConfigActivity,
final RouteToSyncTaskQueueActivity routeToTaskQueueActivity,
final FeatureFlagFetchActivity featureFlagFetchActivity,
final SubmitCheckConnectionActivity submitCheckConnectionActivity,
final CheckRunProgressActivity checkRunProgressActivity) {
final SubmitCheckConnectionActivity submitCheckConnectionActivity) {
return List.of(generateInputActivity,
jobCreationAndStatusUpdateActivity,
configFetchActivity,
Expand All @@ -91,8 +89,7 @@ public List<Object> connectionManagerActivities(
workflowConfigActivity,
routeToTaskQueueActivity,
featureFlagFetchActivity,
submitCheckConnectionActivity,
checkRunProgressActivity);
submitCheckConnectionActivity);
}

@Singleton
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.airbyte.featureflag.CheckConnectionUseApiEnabled;
import io.airbyte.featureflag.CheckConnectionUseChildWorkflowEnabled;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -46,7 +44,6 @@
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionOutput;
import io.airbyte.workers.temporal.scheduling.activities.CheckRunProgressActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
Expand Down Expand Up @@ -109,11 +106,10 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_WITH_API_TAG = "check_with_api";
private static final String CHECK_WITH_CHILD_WORKFLOW_TAG = "check_with_child_workflow";
private static final String SYNC_TASK_QUEUE_ROUTE_RENAME_TAG = "sync_task_queue_route_rename";
private static final String CHECK_RUN_PROGRESS_TAG = "check_run_progress";
private static final int GENERATE_CHECK_INPUT_CURRENT_VERSION = 1;
private static final int CHECK_WITH_CHILD_WORKFLOW_CURRENT_VERSION = 1;

private static final int SYNC_TASK_QUEUE_ROUTE_RENAME_CURRENT_VERSION = 1;
private static final int CHECK_RUN_PROGRESS_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

Expand Down Expand Up @@ -145,8 +141,6 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private RouteToSyncTaskQueueActivity routeToSyncTaskQueueActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private FeatureFlagFetchActivity featureFlagFetchActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private CheckRunProgressActivity checkRunProgressActivity;

private CancellationScope cancellableSyncWorkflow;

Expand Down Expand Up @@ -351,21 +345,14 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));

// ATTENTION: connectionUpdaterInput.getAttemptNumber() is 1-based (usually)
// this differs from workflowInternalState.getAttemptNumber() being 0-based.
// TODO: Don't mix these bases. Bug filed https://github.com/airbytehq/airbyte/issues/27808
final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber));

// This is outside the retry if/else block because we will pass it to our retry manager regardless
// of retry state.
// This will be added in a future PR as we develop this feature.
final boolean madeProgress = checkRunProgress();

final FailureType failureType =
standardSyncOutput != null ? standardSyncOutput.getFailures().isEmpty() ? null : standardSyncOutput.getFailures().get(0).getFailureType()
: null;
if (isWithinRetryLimit(attemptNumber) && failureType != FailureType.CONFIG_ERROR) {
if (maxAttempt > attemptNumber && failureType != FailureType.CONFIG_ERROR) {
// restart from failure
connectionUpdaterInput.setAttemptNumber(attemptNumber + 1);
connectionUpdaterInput.setFromFailure(true);
Expand All @@ -385,47 +372,11 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,

// Record the failure metric
recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.of(failureCause), OssMetricsRegistry.TEMPORAL_WORKFLOW_FAILURE, null));
// Record whether we made progress
if (madeProgress) {
// job id and other attrs get populated by the wrapping activity
final var attrs = new MetricAttribute[] {
new MetricAttribute(MetricTags.WILL_RETRY, String.valueOf(false)),
new MetricAttribute(MetricTags.ATTEMPT_NUMBER, String.valueOf(workflowInternalState.getAttemptNumber()))
};
recordMetric(
new RecordMetricInput(connectionUpdaterInput, Optional.of(failureCause), OssMetricsRegistry.REPLICATION_MADE_PROGRESS, attrs));
}

resetNewConnectionInput(connectionUpdaterInput);
}
}

private boolean isWithinRetryLimit(final int attemptNumber) {
final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();

return maxAttempt > attemptNumber;
}

private boolean shouldCheckRunProgress() {
final int version = Workflow.getVersion(CHECK_RUN_PROGRESS_TAG, Workflow.DEFAULT_VERSION, CHECK_RUN_PROGRESS_VERSION);
return version >= CHECK_RUN_PROGRESS_VERSION;
}

private boolean checkRunProgress() {

if (!shouldCheckRunProgress()) {
return false;
}

final CheckRunProgressActivity.Output progressCheck = runMandatoryActivityWithOutput(
checkRunProgressActivity::checkProgress,
new CheckRunProgressActivity.Input(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber()));

return progressCheck.madeProgress();
}

/**
* Returns whether the new check input generation activity should be called, depending on the
* presence of workflow versioning. This should be removed once the new activity is fully rolled
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 95cc8f9

Please sign in to comment.