Skip to content

Commit

Permalink
Introduce OrchestratorFactoryHelpers (#5700)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Apr 10, 2023
1 parent c77217b commit a5d0dff
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.orchestrator;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.SourceDefinitionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.internal.HeartbeatMonitor;
import io.airbyte.workers.internal.HeartbeatTimeoutChaperone;
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker;
import io.airbyte.workers.internal.book_keeping.MessageTracker;
import io.airbyte.workers.internal.sync_persistence.SyncPersistence;
import io.airbyte.workers.internal.sync_persistence.SyncPersistenceFactory;
import java.time.Duration;
import java.util.UUID;

/**
* Helper Functions to build the dependencies of the Orchestrator.
* <p>
* The primary goal of this class is to reduce code duplication while we refactor the init and
* introduce dependency injection in this part of the codebase.
*/
public class OrchestratorFactoryHelpers {

/**
* Create HeartbeatMonitor.
*/
public static HeartbeatMonitor createHeartbeatMonitor(final SourceApi sourceApi,
final SourceDefinitionApi sourceDefinitionApi,
final StandardSyncInput syncInput)
throws ApiException {
final UUID sourceDefinitionId = sourceApi.getSource(new SourceIdRequestBody().sourceId(syncInput.getSourceId())).getSourceDefinitionId();

final long maxSecondsBetweenMessages = AirbyteApiClient.retryWithJitter(() -> sourceDefinitionApi
.getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)), "get the source definition")
.getMaxSecondsBetweenMessages();
// reset jobs use an empty source to induce resetting all data in destination.
return new HeartbeatMonitor(Duration.ofSeconds(maxSecondsBetweenMessages));
}

/**
* Get HeartbeatTimeoutChaperone.
*/
public static HeartbeatTimeoutChaperone createHeartbeatTimeoutChaperone(final HeartbeatMonitor heartbeatMonitor,
final FeatureFlagClient featureFlagClient,
final StandardSyncInput syncInput) {
return new HeartbeatTimeoutChaperone(heartbeatMonitor,
HeartbeatTimeoutChaperone.DEFAULT_TIMEOUT_CHECK_DURATION,
featureFlagClient,
syncInput.getWorkspaceId(),
syncInput.getConnectionId(),
MetricClientFactory.getMetricClient());
}

/**
* Create MessageTracker.
*/
public static MessageTracker createMessageTracker(final SyncPersistence syncPersistence,
final FeatureFlags featureFlags,
final StandardSyncInput syncInput) {
final boolean commitStatsAsap = DefaultReplicationWorker.shouldCommitStatsAsap(syncInput);
final MessageTracker messageTracker =
commitStatsAsap ? new AirbyteMessageTracker(syncPersistence, featureFlags) : new AirbyteMessageTracker(featureFlags);
return messageTracker;
}

/**
* Create SyncPersistence.
*/
public static SyncPersistence createSyncPersistence(final SyncPersistenceFactory syncPersistenceFactory,
final StandardSyncInput syncInput,
final IntegrationLauncherConfig sourceLauncherConfig) {
// TODO clean up the feature flag init once commitStates and commitStats have been rolled out
final boolean commitStatesAsap = DefaultReplicationWorker.shouldCommitStateAsap(syncInput);
final SyncPersistence syncPersistence = commitStatesAsap
? syncPersistenceFactory.get(syncInput.getConnectionId(), Long.parseLong(sourceLauncherConfig.getJobId()),
sourceLauncherConfig.getAttemptId().intValue(), syncInput.getCatalog())
: null;
return syncPersistence;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY;

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.DestinationApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.SourceDefinitionApi;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.commons.features.FeatureFlagHelper;
import io.airbyte.commons.features.FeatureFlags;
Expand Down Expand Up @@ -51,17 +48,16 @@
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.internal.VersionedAirbyteMessageBufferedWriterFactory;
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory;
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker;
import io.airbyte.workers.internal.book_keeping.MessageTracker;
import io.airbyte.workers.internal.sync_persistence.SyncPersistence;
import io.airbyte.workers.internal.sync_persistence.SyncPersistenceFactory;
import io.airbyte.workers.orchestrator.OrchestratorFactoryHelpers;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -166,13 +162,7 @@ public Optional<String> runJob() throws Exception {

log.info("Setting up source...");

final UUID sourceDefinitionId = sourceApi.getSource(new SourceIdRequestBody().sourceId(syncInput.getSourceId())).getSourceDefinitionId();

final long maxSecondsBetweenMessages = AirbyteApiClient.retryWithJitter(() -> sourceDefinitionApi
.getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)), "get the source definition")
.getMaxSecondsBetweenMessages();
// reset jobs use an empty source to induce resetting all data in destination.
final HeartbeatMonitor heartbeatMonitor = new HeartbeatMonitor(Duration.ofSeconds(maxSecondsBetweenMessages));
final HeartbeatMonitor heartbeatMonitor = OrchestratorFactoryHelpers.createHeartbeatMonitor(sourceApi, sourceDefinitionApi, syncInput);

final var airbyteSource =
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource(
Expand All @@ -188,9 +178,8 @@ public Optional<String> runJob() throws Exception {
final var metricReporter = new WorkerMetricReporter(metricClient,
sourceLauncherConfig.getDockerImage());

try (final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone(heartbeatMonitor,
HeartbeatTimeoutChaperone.DEFAULT_TIMEOUT_CHECK_DURATION, featureFlagClient, syncInput.getWorkspaceId(), syncInput.getConnectionId(),
MetricClientFactory.getMetricClient())) {
try (final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = OrchestratorFactoryHelpers.createHeartbeatTimeoutChaperone(heartbeatMonitor,
featureFlagClient, syncInput)) {

log.info("Setting up replication worker...");
final UUID workspaceId = syncInput.getWorkspaceId();
Expand All @@ -201,15 +190,9 @@ public Optional<String> runJob() throws Exception {
&& (featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId))
|| FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));

// TODO clean up the feature flag init once commitStates and commitStats have been rolled out
final boolean commitStatesAsap = DefaultReplicationWorker.shouldCommitStateAsap(syncInput);
final SyncPersistence syncPersistence = commitStatesAsap
? syncPersistenceFactory.get(syncInput.getConnectionId(), Long.parseLong(sourceLauncherConfig.getJobId()),
sourceLauncherConfig.getAttemptId().intValue(), syncInput.getCatalog())
: null;
final boolean commitStatsAsap = DefaultReplicationWorker.shouldCommitStatsAsap(syncInput);
final MessageTracker messageTracker =
commitStatsAsap ? new AirbyteMessageTracker(syncPersistence, featureFlags) : new AirbyteMessageTracker(featureFlags);
final SyncPersistence syncPersistence =
OrchestratorFactoryHelpers.createSyncPersistence(syncPersistenceFactory, syncInput, sourceLauncherConfig);
final MessageTracker messageTracker = OrchestratorFactoryHelpers.createMessageTracker(syncPersistence, featureFlags, syncInput);

final var replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import datadog.trace.api.Trace;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.commons.features.FeatureFlagHelper;
import io.airbyte.commons.features.FeatureFlags;
Expand Down Expand Up @@ -70,12 +68,12 @@
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.internal.VersionedAirbyteMessageBufferedWriterFactory;
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory;
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker;
import io.airbyte.workers.internal.book_keeping.MessageTracker;
import io.airbyte.workers.internal.exception.DestinationException;
import io.airbyte.workers.internal.exception.SourceException;
import io.airbyte.workers.internal.sync_persistence.SyncPersistence;
import io.airbyte.workers.internal.sync_persistence.SyncPersistenceFactory;
import io.airbyte.workers.orchestrator.OrchestratorFactoryHelpers;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
Expand All @@ -87,7 +85,6 @@
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -200,22 +197,10 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,

final CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception> workerFactory;

final UUID sourceDefinitionId = AirbyteApiClient.retryWithJitter(
() -> airbyteApiClient.getSourceApi().getSource(new SourceIdRequestBody().sourceId(syncInput.getSourceId())).getSourceDefinitionId(),
"get source");

final long maxSecondsBetweenMessages = AirbyteApiClient.retryWithJitter(() -> airbyteApiClient.getSourceDefinitionApi()
.getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId))
.getMaxSecondsBetweenMessages(), "get source definition");

final HeartbeatMonitor heartbeatMonitor = new HeartbeatMonitor(Duration.ofSeconds(maxSecondsBetweenMessages));

final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone(heartbeatMonitor,
HeartbeatTimeoutChaperone.DEFAULT_TIMEOUT_CHECK_DURATION,
featureFlagClient,
syncInput.getWorkspaceId(),
syncInput.getConnectionId(),
MetricClientFactory.getMetricClient());
final HeartbeatMonitor heartbeatMonitor = OrchestratorFactoryHelpers.createHeartbeatMonitor(airbyteApiClient.getSourceApi(),
airbyteApiClient.getSourceDefinitionApi(), syncInput);
final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = OrchestratorFactoryHelpers.createHeartbeatTimeoutChaperone(heartbeatMonitor,
featureFlagClient, syncInput);

if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(
Expand Down Expand Up @@ -370,14 +355,9 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
&& (featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId))
|| FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));

final boolean commitStatesAsap = DefaultReplicationWorker.shouldCommitStateAsap(syncInput);
final SyncPersistence syncPersistence = commitStatesAsap
? syncPersistenceFactory.get(syncInput.getConnectionId(), Long.parseLong(sourceLauncherConfig.getJobId()),
sourceLauncherConfig.getAttemptId().intValue(), syncInput.getCatalog())
: null;
final boolean commitStatsAsap = DefaultReplicationWorker.shouldCommitStatsAsap(syncInput);
final MessageTracker messageTracker =
commitStatsAsap ? new AirbyteMessageTracker(syncPersistence, featureFlags) : new AirbyteMessageTracker(featureFlags);
final SyncPersistence syncPersistence =
OrchestratorFactoryHelpers.createSyncPersistence(syncPersistenceFactory, syncInput, sourceLauncherConfig);
final MessageTracker messageTracker = OrchestratorFactoryHelpers.createMessageTracker(syncPersistence, featureFlags, syncInput);

return new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand Down

0 comments on commit a5d0dff

Please sign in to comment.