diff --git a/.env b/.env
index 03d63b19bfa9..4e4bd18d3cc4 100644
--- a/.env
+++ b/.env
@@ -50,7 +50,6 @@ CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.1.001
-
### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
INTERNAL_API_HOST=airbyte-server:8001
diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml
index ed578354eb79..70cabfd000a7 100644
--- a/airbyte-commons/src/main/resources/log4j2.xml
+++ b/airbyte-commons/src/main/resources/log4j2.xml
@@ -128,7 +128,7 @@
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}">
-
+
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
index 065c50963f90..540bc7b05b74 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java
@@ -89,16 +89,16 @@ public class EnvConfigs implements Configs {
private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String CONTAINER_ORCHESTRATOR_ENABLED = "CONTAINER_ORCHESTRATOR_ENABLED";
- private static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
- private static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
- private static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY";
- private static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY";
- private static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME";
- private static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT";
- private static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY";
- private static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY";
- private static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME";
- private static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS";
+ public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
+ public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
+ public static final String STATE_STORAGE_S3_ACCESS_KEY = "STATE_STORAGE_S3_ACCESS_KEY";
+ public static final String STATE_STORAGE_S3_SECRET_ACCESS_KEY = "STATE_STORAGE_S3_SECRET_ACCESS_KEY";
+ public static final String STATE_STORAGE_MINIO_BUCKET_NAME = "STATE_STORAGE_MINIO_BUCKET_NAME";
+ public static final String STATE_STORAGE_MINIO_ENDPOINT = "STATE_STORAGE_MINIO_ENDPOINT";
+ public static final String STATE_STORAGE_MINIO_ACCESS_KEY = "STATE_STORAGE_MINIO_ACCESS_KEY";
+ public static final String STATE_STORAGE_MINIO_SECRET_ACCESS_KEY = "STATE_STORAGE_MINIO_SECRET_ACCESS_KEY";
+ public static final String STATE_STORAGE_GCS_BUCKET_NAME = "STATE_STORAGE_GCS_BUCKET_NAME";
+ public static final String STATE_STORAGE_GCS_APPLICATION_CREDENTIALS = "STATE_STORAGE_GCS_APPLICATION_CREDENTIALS";
// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
@@ -172,18 +172,18 @@ private Optional getStateStorageConfiguration() {
if (getEnv(STATE_STORAGE_GCS_BUCKET_NAME) != null) {
return Optional.of(CloudStorageConfigs.gcs(new GcsConfig(
getEnvOrDefault(STATE_STORAGE_GCS_BUCKET_NAME, ""),
- getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, ""))));
+ getEnvOrDefault(STATE_STORAGE_GCS_APPLICATION_CREDENTIALS, ""))));
} else if (getEnv(STATE_STORAGE_MINIO_ENDPOINT) != null) {
return Optional.of(CloudStorageConfigs.minio(new MinioConfig(
getEnvOrDefault(STATE_STORAGE_MINIO_BUCKET_NAME, ""),
- getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
- getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
+ getEnvOrDefault(STATE_STORAGE_MINIO_ACCESS_KEY, ""),
+ getEnvOrDefault(STATE_STORAGE_MINIO_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_MINIO_ENDPOINT, ""))));
} else if (getEnv(STATE_STORAGE_S3_REGION) != null) {
return Optional.of(CloudStorageConfigs.s3(new S3Config(
getEnvOrDefault(STATE_STORAGE_S3_BUCKET_NAME, ""),
- getEnvOrDefault(LogClientSingleton.AWS_ACCESS_KEY_ID, ""),
- getEnvOrDefault(LogClientSingleton.AWS_SECRET_ACCESS_KEY, ""),
+ getEnvOrDefault(STATE_STORAGE_S3_ACCESS_KEY, ""),
+ getEnvOrDefault(STATE_STORAGE_S3_SECRET_ACCESS_KEY, ""),
getEnvOrDefault(STATE_STORAGE_S3_REGION, ""))));
} else {
return Optional.empty();
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java
index 0c242c9098ec..dbd4b7b6f3e4 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultGcsClientFactory.java
@@ -5,9 +5,12 @@
package io.airbyte.config.storage;
import com.google.api.client.util.Preconditions;
+import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;
/**
@@ -16,8 +19,11 @@
*/
public class DefaultGcsClientFactory implements Supplier {
+ private final GcsConfig config;
+
public DefaultGcsClientFactory(final GcsConfig config) {
validate(config);
+ this.config = config;
}
private static void validate(final GcsConfig config) {
@@ -27,7 +33,13 @@ private static void validate(final GcsConfig config) {
@Override
public Storage get() {
- return StorageOptions.getDefaultInstance().getService();
+ try {
+ final var credentialsByteStream = new ByteArrayInputStream(config.getGoogleApplicationCredentials().getBytes(StandardCharsets.UTF_8));
+ final var credentials = ServiceAccountCredentials.fromStream(credentialsByteStream);
+ return StorageOptions.newBuilder().setCredentials(credentials).build().getService();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java
index 62b0237009b8..897cc68cd02a 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java
@@ -8,6 +8,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.S3ApiWorkerStorageConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.util.function.Supplier;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
@@ -41,6 +42,7 @@ static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) {
@Override
public S3Client get() {
final var builder = S3Client.builder();
+ builder.credentialsProvider(() -> AwsBasicCredentials.create(s3Config.getAwsAccessKey(), s3Config.getAwsSecretAccessKey()));
builder.region(Region.of(s3Config.getRegion()));
return builder.build();
}
diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java b/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java
index 31cd86e71db7..5cef8b97ac2f 100644
--- a/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java
+++ b/airbyte-config/models/src/main/java/io/airbyte/config/storage/MinioS3ClientFactory.java
@@ -9,6 +9,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Supplier;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
@@ -40,6 +41,7 @@ public S3Client get() {
final var minioEndpoint = minioConfig.getMinioEndpoint();
try {
final var minioUri = new URI(minioEndpoint);
+ builder.credentialsProvider(() -> AwsBasicCredentials.create(minioConfig.getAwsAccessKey(), minioConfig.getAwsSecretAccessKey()));
builder.endpointOverride(minioUri);
builder.region(Region.US_EAST_1); // Although this is not used, the S3 client will error out if this is not set. Set a stub value.
} catch (final URISyntaxException e) {
diff --git a/airbyte-container-orchestrator/Dockerfile b/airbyte-container-orchestrator/Dockerfile
index b805172795d8..4fba8521e028 100644
--- a/airbyte-container-orchestrator/Dockerfile
+++ b/airbyte-container-orchestrator/Dockerfile
@@ -18,6 +18,13 @@ RUN add-apt-repository \
stable"
RUN apt-get update && apt-get install -y docker-ce-cli jq
+# Install kubectl for copying files to kube pods. Eventually should be replaced with a kube java client.
+# See https://github.com/airbytehq/airbyte/issues/8643 for more information on why we are using kubectl for copying.
+# The following commands were taken from https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/#install-using-native-package-management
+RUN curl -fsSLo /usr/share/keyrings/kubernetes-archive-keyring.gpg https://packages.cloud.google.com/apt/doc/apt-key.gpg
+RUN echo "deb [signed-by=/usr/share/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | tee /etc/apt/sources.list.d/kubernetes.list
+RUN apt-get update && apt-get install -y kubectl
+
ENV APPLICATION airbyte-container-orchestrator
ENV AIRBYTE_ENTRYPOINT "/app/${APPLICATION}-0.35.6-alpha/bin/${APPLICATION}"
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java
new file mode 100644
index 000000000000..d05d2f408e1a
--- /dev/null
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.container_orchestrator;
+
+import io.airbyte.workers.process.AsyncKubePodStatus;
+import io.airbyte.workers.process.KubePodInfo;
+
+/**
+ * The state manager writes the "truth" for states of the async pod process. If the store isn't
+ * updated by the underlying pod, it will appear as failed.
+ *
+ * It doesn't have a single value for a state. Instead, in a location on cloud storage or disk, it
+ * writes every state it's encountered.
+ */
+public interface AsyncStateManager {
+
+ /**
+ * Writes a file containing a string value to a location designated by the input status.
+ */
+ void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value);
+
+ /**
+ * Writes an empty file to a location designated by the input status.
+ */
+ void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status);
+
+ /**
+ * Interprets the state given all written state messages for the pod.
+ */
+ AsyncKubePodStatus getStatus(final KubePodInfo kubePodInfo);
+
+ /**
+ * @return the output stored in the success file. This can be an empty string.
+ * @throws IllegalArgumentException if no success file exists
+ */
+ String getOutput(final KubePodInfo kubePodInfo) throws IllegalArgumentException;
+
+}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java
index c75b0e3a65d6..476f049d608f 100644
--- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java
@@ -4,16 +4,25 @@
package io.airbyte.container_orchestrator;
-import io.airbyte.commons.json.Jsons;
+import io.airbyte.commons.logging.LoggingHelper;
+import io.airbyte.commons.logging.MdcScope;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
+import io.airbyte.config.helpers.LogClientSingleton;
+import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
+import io.airbyte.workers.WorkerUtils;
+import io.airbyte.workers.process.AsyncKubePodStatus;
+import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.DockerProcessFactory;
+import io.airbyte.workers.process.KubePodInfo;
+import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
+import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.sync.DbtLauncherWorker;
import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
@@ -22,9 +31,9 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.net.InetAddress;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
/**
@@ -41,54 +50,129 @@
@Slf4j
public class ContainerOrchestratorApp {
- public static void main(final String[] args) throws Exception {
- WorkerHeartbeatServer heartbeatServer = null;
+ private final String application;
+ private final Map envMap;
+ private final JobRunConfig jobRunConfig;
+ private final KubePodInfo kubePodInfo;
+ private final Configs configs;
- try {
- // read files that contain all necessary configuration
- final String application = Files.readString(Path.of(OrchestratorConstants.INIT_FILE_APPLICATION));
- final Map envMap =
- (Map) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class);
+ public ContainerOrchestratorApp(
+ final String application,
+ final Map envMap,
+ final JobRunConfig jobRunConfig,
+ final KubePodInfo kubePodInfo) {
+ this.application = application;
+ this.envMap = envMap;
+ this.jobRunConfig = jobRunConfig;
+ this.kubePodInfo = kubePodInfo;
+ this.configs = new EnvConfigs(envMap);
+ }
- final Configs configs = new EnvConfigs(envMap);
+ private void configureLogging() {
+ for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
+ if (envMap.containsKey(envVar)) {
+ System.setProperty(envVar, envMap.get(envVar));
+ }
+ }
- heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
- heartbeatServer.startBackground();
+ final var logClient = LogClientSingleton.getInstance();
+ logClient.setJobMdc(
+ configs.getWorkerEnvironment(),
+ configs.getLogConfigs(),
+ WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()));
+ }
+
+ /**
+ * Handles state updates (including writing failures) and running the job orchestrator. As much of
+ * the initialization as possible should go in here so it's logged properly and the state storage is
+ * updated appropriately.
+ */
+ private void runInternal(final DefaultAsyncStateManager asyncStateManager) {
+ try {
+ asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.INITIALIZING);
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
final ProcessFactory processFactory = getProcessBuilderFactory(configs, workerConfigs);
final JobOrchestrator> jobOrchestrator = getJobOrchestrator(configs, workerConfigs, processFactory, application);
- log.info("Starting {} orchestrator...", jobOrchestrator.getOrchestratorName());
- jobOrchestrator.runJob();
- log.info("{} orchestrator complete!", jobOrchestrator.getOrchestratorName());
- } finally {
- if (heartbeatServer != null) {
- log.info("Shutting down heartbeat server...");
- heartbeatServer.stop();
+ if (jobOrchestrator == null) {
+ throw new IllegalStateException("Could not find job orchestrator for application: " + application);
}
+
+ final var heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
+ heartbeatServer.startBackground();
+
+ asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.RUNNING);
+
+ final Optional output = jobOrchestrator.runJob();
+
+ asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.SUCCEEDED, output.orElse(""));
+
+ // required to kill clients with thread pools
+ System.exit(0);
+ } catch (Throwable t) {
+ asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED);
+ System.exit(1);
}
+ }
- // required to kill kube client
- log.info("Runner closing...");
- System.exit(0);
+ /**
+ * Configures logging/mdc scope, and creates all objects necessary to handle state updates.
+ * Everything else is delegated to {@link ContainerOrchestratorApp#runInternal}.
+ */
+ public void run() {
+ configureLogging();
+
+ // set mdc scope for the remaining execution
+ try (final var mdcScope = new MdcScope.Builder()
+ .setLogPrefix(application)
+ .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND)
+ .build()) {
+
+ // IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
+ // version is deployed!
+ final var documentStoreClient = StateClients.create(configs.getStateStorageCloudConfigs(), WorkerApp.STATE_STORAGE_PREFIX);
+ final var asyncStateManager = new DefaultAsyncStateManager(documentStoreClient);
+
+ runInternal(asyncStateManager);
+ }
+ }
+
+ public static void main(final String[] args) {
+ try {
+ // wait for config files to be copied
+ final var successFile = Path.of(KubePodProcess.CONFIG_DIR, KubePodProcess.SUCCESS_FILE_NAME);
+
+ while (!successFile.toFile().exists()) {
+ log.info("Waiting for config file transfers to complete...");
+ Thread.sleep(1000);
+ }
+
+ final var applicationName = JobOrchestrator.readApplicationName();
+ final var envMap = JobOrchestrator.readEnvMap();
+ final var jobRunConfig = JobOrchestrator.readJobRunConfig();
+ final var kubePodInfo = JobOrchestrator.readKubePodInfo();
+
+ final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo);
+ app.run();
+ } catch (Throwable t) {
+ log.info("Orchestrator failed...", t);
+ System.exit(1);
+ }
}
private static JobOrchestrator> getJobOrchestrator(final Configs configs,
final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
final String application) {
- if (application.equals(ReplicationLauncherWorker.REPLICATION)) {
- return new ReplicationJobOrchestrator(configs, workerConfigs, processFactory);
- } else if (application.equals(NormalizationLauncherWorker.NORMALIZATION)) {
- return new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
- } else if (application.equals(DbtLauncherWorker.DBT)) {
- return new DbtJobOrchestrator(configs, workerConfigs, processFactory);
- } else {
- log.error("Runner failed", new IllegalStateException("Unexpected value: " + application));
- System.exit(1);
- throw new IllegalStateException(); // should never be reached, but necessary to compile
- }
+
+ return switch (application) {
+ case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(configs, workerConfigs, processFactory);
+ case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
+ case DbtLauncherWorker.DBT -> new DbtJobOrchestrator(configs, workerConfigs, processFactory);
+ case AsyncOrchestratorPodProcess.NO_OP -> new NoOpOrchestrator();
+ default -> null;
+ };
}
/**
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java
index fc426adb5ed2..a7b043f75a1e 100644
--- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java
@@ -13,9 +13,11 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
+import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -42,12 +44,13 @@ public Class getInputClass() {
}
@Override
- public void runJob() throws Exception {
- final JobRunConfig jobRunConfig = readJobRunConfig();
+ public Optional runJob() throws Exception {
+ final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig();
final OperatorDbtInput dbtInput = readInput();
final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile(
- ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);
+ Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG),
+ IntegrationLauncherConfig.class);
log.info("Setting up dbt worker...");
final DbtTransformationWorker worker = new DbtTransformationWorker(
@@ -65,6 +68,8 @@ public void runJob() throws Exception {
log.info("Running dbt worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
worker.run(dbtInput, jobRoot);
+
+ return Optional.empty();
}
}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java
new file mode 100644
index 000000000000..212d9adef578
--- /dev/null
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DefaultAsyncStateManager.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.container_orchestrator;
+
+import io.airbyte.workers.process.AsyncKubePodStatus;
+import io.airbyte.workers.process.KubePodInfo;
+import io.airbyte.workers.storage.DocumentStoreClient;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DefaultAsyncStateManager implements AsyncStateManager {
+
+ private static final List STATUS_CHECK_ORDER = List.of(
+ // terminal states first
+ AsyncKubePodStatus.FAILED,
+ AsyncKubePodStatus.SUCCEEDED,
+
+ // then check in progress state
+ AsyncKubePodStatus.RUNNING,
+
+ // then check for initialization state
+ AsyncKubePodStatus.INITIALIZING);
+
+ private final DocumentStoreClient documentStoreClient;
+
+ public DefaultAsyncStateManager(final DocumentStoreClient documentStoreClient) {
+ this.documentStoreClient = documentStoreClient;
+ }
+
+ @Override
+ public void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value) {
+ final var key = getDocumentStoreKey(kubePodInfo, status);
+ log.info("Writing async status {} for {}...", status, kubePodInfo);
+ documentStoreClient.write(key, value);
+ }
+
+ @Override
+ public void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) {
+ write(kubePodInfo, status, "");
+ }
+
+ /**
+ * Checks terminal states first, then running, then initialized. Defaults to not started.
+ *
+ * The order matters here!
+ */
+ @Override
+ public AsyncKubePodStatus getStatus(KubePodInfo kubePodInfo) {
+ for (AsyncKubePodStatus status : STATUS_CHECK_ORDER) {
+ if (statusFileExists(kubePodInfo, status)) {
+ return status;
+ }
+ }
+
+ return AsyncKubePodStatus.NOT_STARTED;
+ }
+
+ @Override
+ public String getOutput(KubePodInfo kubePodInfo) throws IllegalArgumentException {
+ final var key = getDocumentStoreKey(kubePodInfo, AsyncKubePodStatus.SUCCEEDED);
+ final var output = documentStoreClient.read(key);
+
+ if (output.isPresent()) {
+ return output.get();
+ } else {
+ throw new IllegalArgumentException("Expected to retrieve output from a successfully completed pod!");
+ }
+ }
+
+ /**
+ * IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
+ * version is deployed!
+ */
+ public static String getDocumentStoreKey(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) {
+ return kubePodInfo.namespace() + "/" + kubePodInfo.name() + "/" + status.name();
+ }
+
+ private boolean statusFileExists(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status) {
+ final var key = getDocumentStoreKey(kubePodInfo, status);
+ return documentStoreClient.read(key).isPresent();
+ }
+
+}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java
index d207763a4f28..8c572cd97bf6 100644
--- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/JobOrchestrator.java
@@ -6,10 +6,15 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.scheduler.models.JobRunConfig;
+import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
+import io.airbyte.workers.process.KubePodInfo;
+import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Map;
+import java.util.Optional;
/**
* The job orchestrator helps abstract over container launcher application differences across
@@ -27,19 +32,46 @@ public interface JobOrchestrator {
// reads input from a file that was copied to the container launcher
default INPUT readInput() throws IOException {
- return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_INPUT, getInputClass());
+ return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_INPUT), getInputClass());
}
- // reads the job run config from a file that was copied to the container launcher
- default JobRunConfig readJobRunConfig() throws IOException {
- return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, JobRunConfig.class);
+ /**
+ * reads the application name from a file that was copied to the container launcher
+ */
+ static String readApplicationName() throws IOException {
+ return Files.readString(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_APPLICATION));
}
- // the unique logic that belongs to each type of job belongs here
- void runJob() throws Exception;
+ /**
+ * reads the environment variable map from a file that was copied to the container launcher
+ */
+ static Map readEnvMap() throws IOException {
+ return (Map) readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_ENV_MAP), Map.class);
+ }
+
+ /**
+ * reads the job run config from a file that was copied to the container launcher
+ */
+ static JobRunConfig readJobRunConfig() throws IOException {
+ return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG), JobRunConfig.class);
+ }
+
+ /**
+ * reads the kube pod info from a file that was copied to the container launcher
+ */
+ static KubePodInfo readKubePodInfo() throws IOException {
+ return readAndDeserializeFile(Path.of(KubePodProcess.CONFIG_DIR, AsyncOrchestratorPodProcess.KUBE_POD_INFO), KubePodInfo.class);
+ }
+
+ /**
+ * Contains the unique logic that belongs to each type of job.
+ *
+ * @return an optional output value to place within the output document store item.
+ */
+ Optional runJob() throws Exception;
- static T readAndDeserializeFile(String path, Class type) throws IOException {
- return Jsons.deserialize(Files.readString(Path.of(path)), type);
+ static T readAndDeserializeFile(Path path, Class type) throws IOException {
+ return Jsons.deserialize(Files.readString(path), type);
}
}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java
new file mode 100644
index 000000000000..77181e817644
--- /dev/null
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NoOpOrchestrator.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.container_orchestrator;
+
+import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * For testing only.
+ */
+@Slf4j
+public class NoOpOrchestrator implements JobOrchestrator {
+
+ @Override
+ public String getOrchestratorName() {
+ return AsyncOrchestratorPodProcess.NO_OP;
+ }
+
+ @Override
+ public Class getInputClass() {
+ return String.class;
+ }
+
+ @Override
+ public Optional runJob() throws Exception {
+ log.info("Running no-op job.");
+ return Optional.empty();
+ }
+
+}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java
index 6d8251e28947..0462be271ca8 100644
--- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java
@@ -13,9 +13,11 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
+import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -42,12 +44,13 @@ public Class getInputClass() {
}
@Override
- public void runJob() throws Exception {
- final JobRunConfig jobRunConfig = readJobRunConfig();
+ public Optional runJob() throws Exception {
+ final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig();
final NormalizationInput normalizationInput = readInput();
final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile(
- ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);
+ Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG),
+ IntegrationLauncherConfig.class);
log.info("Setting up normalization worker...");
final NormalizationWorker normalizationWorker = new DefaultNormalizationWorker(
@@ -64,6 +67,7 @@ public void runJob() throws Exception {
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
normalizationWorker.run(normalizationInput, jobRoot);
+ return Optional.empty();
}
}
diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java
index cbcc4338c056..f7f515ce4d18 100644
--- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java
+++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java
@@ -17,6 +17,7 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
+import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
@@ -26,6 +27,7 @@
import io.airbyte.workers.protocols.airbyte.NamespacingMapper;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -52,15 +54,17 @@ public Class getInputClass() {
}
@Override
- public void runJob() throws Exception {
- final JobRunConfig jobRunConfig = readJobRunConfig();
+ public Optional runJob() throws Exception {
+ final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig();
final StandardSyncInput syncInput = readInput();
final IntegrationLauncherConfig sourceLauncherConfig = JobOrchestrator.readAndDeserializeFile(
- ReplicationLauncherWorker.INIT_FILE_SOURCE_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);
+ Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_SOURCE_LAUNCHER_CONFIG),
+ IntegrationLauncherConfig.class);
final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile(
- ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);
+ Path.of(KubePodProcess.CONFIG_DIR, ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG),
+ IntegrationLauncherConfig.class);
log.info("Setting up source launcher...");
final IntegrationLauncher sourceLauncher = new AirbyteIntegrationLauncher(
@@ -97,10 +101,8 @@ public void runJob() throws Exception {
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
final ReplicationOutput replicationOutput = replicationWorker.run(syncInput, jobRoot);
- log.info("Sending output...");
- // this uses stdout directly because it shouldn't have the logging related prefix
- // the replication output is read from the container that launched the runner
- System.out.println(Jsons.serialize(replicationOutput));
+ log.info("Returning output...");
+ return Optional.of(Jsons.serialize(replicationOutput));
}
}
diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java
new file mode 100644
index 000000000000..c543d3d6a823
--- /dev/null
+++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.container_orchestrator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+
+import io.airbyte.workers.process.AsyncKubePodStatus;
+import io.airbyte.workers.process.KubePodInfo;
+import io.airbyte.workers.storage.DocumentStoreClient;
+import java.util.Optional;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class DefaultAsyncStateManagerTest {
+
+ private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1");
+
+ private DocumentStoreClient documentStore;
+ private AsyncStateManager stateManager;
+
+ @BeforeEach
+ void setup() {
+ documentStore = mock(DocumentStoreClient.class);
+ stateManager = new DefaultAsyncStateManager(documentStore);
+ }
+
+ @Test
+ void testEmptyWrite() {
+ stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.INITIALIZING);
+
+ // test for overwrite (which should be allowed)
+ stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.INITIALIZING);
+
+ final var key = getKey(AsyncKubePodStatus.INITIALIZING);
+ verify(documentStore, times(2)).write(key, "");
+ }
+
+ @Test
+ void testContentfulWrite() {
+ stateManager.write(KUBE_POD_INFO, AsyncKubePodStatus.SUCCEEDED, "some output value");
+
+ final var key = getKey(AsyncKubePodStatus.SUCCEEDED);
+ verify(documentStore, times(1)).write(key, "some output value");
+ }
+
+ @Test
+ void testReadingOutputWhenItExists() {
+ final var key = getKey(AsyncKubePodStatus.SUCCEEDED);
+ when(documentStore.read(key)).thenReturn(Optional.of("some output value"));
+ assertEquals("some output value", stateManager.getOutput(KUBE_POD_INFO));
+ }
+
+ @Test
+ void testReadingOutputWhenItDoesNotExist() {
+ // getting the output should throw an exception when there is no record in the document store
+ assertThrows(IllegalArgumentException.class, () -> {
+ stateManager.getOutput(KUBE_POD_INFO);
+ });
+ }
+
+ @Test
+ void testSuccessfulStatusRetrievalLifecycle() {
+ when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.empty());
+ final var beforeInitializingStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.NOT_STARTED, beforeInitializingStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.of(""));
+ final var initializingStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.INITIALIZING, initializingStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.RUNNING))).thenReturn(Optional.of(""));
+ final var runningStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.RUNNING, runningStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.SUCCEEDED))).thenReturn(Optional.of("output"));
+ final var succeededStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.SUCCEEDED, succeededStatus);
+ }
+
+ @Test
+ void testFailureStatusRetrievalLifecycle() {
+ when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.empty());
+ final var beforeInitializingStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.NOT_STARTED, beforeInitializingStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.INITIALIZING))).thenReturn(Optional.of(""));
+ final var initializingStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.INITIALIZING, initializingStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.RUNNING))).thenReturn(Optional.of(""));
+ final var runningStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.RUNNING, runningStatus);
+
+ when(documentStore.read(getKey(AsyncKubePodStatus.FAILED))).thenReturn(Optional.of("output"));
+ final var failedStatus = stateManager.getStatus(KUBE_POD_INFO);
+ assertEquals(AsyncKubePodStatus.FAILED, failedStatus);
+ }
+
+ private static String getKey(final AsyncKubePodStatus status) {
+ return DefaultAsyncStateManager.getDocumentStoreKey(KUBE_POD_INFO, status);
+ }
+
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
index 5bf166662218..bc9e89b01e69 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
@@ -39,6 +39,8 @@
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
+import io.airbyte.workers.storage.DocumentStoreClient;
+import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalUtils;
@@ -84,9 +86,12 @@ public class WorkerApp {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerApp.class);
public static final int KUBE_HEARTBEAT_PORT = 9000;
+ // IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
+ // version is deployed!
+ public static final Path STATE_STORAGE_PREFIX = Path.of("/state");
+
private final Path workspaceRoot;
private final ProcessFactory jobProcessFactory;
- private final ProcessFactory orchestratorProcessFactory;
private final SecretsHydrator secretsHydrator;
private final WorkflowServiceStubs temporalService;
private final ConfigRepository configRepository;
@@ -103,7 +108,7 @@ public class WorkerApp {
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final Configs configs;
private final ConnectionHelper connectionHelper;
- private final boolean containerOrchestratorEnabled;
+ private final Optional containerOrchestratorConfig;
private final JobNotifier jobNotifier;
private final JobTracker jobTracker;
@@ -147,10 +152,9 @@ public void start() {
final NormalizationActivityImpl normalizationActivity =
new NormalizationActivityImpl(
- containerOrchestratorEnabled,
+ containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
- orchestratorProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
@@ -161,10 +165,9 @@ public void start() {
airbyteVersion);
final DbtTransformationActivityImpl dbtTransformationActivity =
new DbtTransformationActivityImpl(
- containerOrchestratorEnabled,
+ containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
- orchestratorProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
@@ -177,10 +180,9 @@ public void start() {
final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository);
final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(
- containerOrchestratorEnabled,
+ containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
- orchestratorProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
@@ -226,10 +228,9 @@ public void start() {
* launching or not.
*/
private ReplicationActivityImpl getReplicationActivityImpl(
- final boolean containerOrchestratorEnabled,
+ final Optional containerOrchestratorConfig,
final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory,
- final ProcessFactory orchestratorProcessFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
@@ -238,33 +239,19 @@ private ReplicationActivityImpl getReplicationActivityImpl(
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
- if (containerOrchestratorEnabled) {
- return new ReplicationActivityImpl(
- containerOrchestratorEnabled,
- workerConfigs,
- orchestratorProcessFactory,
- secretsHydrator,
- workspaceRoot,
- workerEnvironment,
- logConfigs,
- databaseUser,
- databasePassword,
- databaseUrl,
- airbyteVersion);
- } else {
- return new ReplicationActivityImpl(
- containerOrchestratorEnabled,
- workerConfigs,
- jobProcessFactory,
- secretsHydrator,
- workspaceRoot,
- workerEnvironment,
- logConfigs,
- databaseUser,
- databasePassword,
- databaseUrl,
- airbyteVersion);
- }
+
+ return new ReplicationActivityImpl(
+ containerOrchestratorConfig,
+ workerConfigs,
+ jobProcessFactory,
+ secretsHydrator,
+ workspaceRoot,
+ workerEnvironment,
+ logConfigs,
+ databaseUser,
+ databasePassword,
+ databaseUrl,
+ airbyteVersion);
}
private static ProcessFactory getJobProcessFactory(final Configs configs) throws IOException {
@@ -287,36 +274,34 @@ private static ProcessFactory getJobProcessFactory(final Configs configs) throws
}
}
- private static ProcessFactory getOrchestratorProcessFactory(final Configs configs) throws IOException {
- final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
-
- if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
- final KubernetesClient fabricClient = new DefaultKubernetesClient();
- final String localIp = InetAddress.getLocalHost().getHostAddress();
- final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
- LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
- return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, true);
- } else {
- return new DockerProcessFactory(
- workerConfigs,
- configs.getWorkspaceRoot(),
- configs.getWorkspaceDockerMount(),
- configs.getLocalDockerMount(),
-
- // this needs to point at the Docker network Airbyte is running on, not the host network or job
- // runner network, otherwise it can't talk with the db/minio
- "airbyte_default",
-
- true);
- }
- }
-
private static WorkerOptions getWorkerOptions(final int max) {
return WorkerOptions.newBuilder()
.setMaxConcurrentActivityExecutionSize(max)
.build();
}
+ public static record ContainerOrchestratorConfig(
+ String namespace,
+ DocumentStoreClient documentStoreClient,
+ KubernetesClient kubernetesClient) {}
+
+ static Optional getContainerOrchestratorConfig(Configs configs) {
+ if (configs.getContainerOrchestratorEnabled()) {
+ final var kubernetesClient = new DefaultKubernetesClient();
+
+ final DocumentStoreClient documentStoreClient = StateClients.create(
+ configs.getStateStorageCloudConfigs(),
+ STATE_STORAGE_PREFIX);
+
+ return Optional.of(new ContainerOrchestratorConfig(
+ configs.getJobKubeNamespace(),
+ documentStoreClient,
+ kubernetesClient));
+ } else {
+ return Optional.empty();
+ }
+ }
+
public static void main(final String[] args) throws IOException, InterruptedException {
final Configs configs = new EnvConfigs();
@@ -336,7 +321,6 @@ public static void main(final String[] args) throws IOException, InterruptedExce
}
final ProcessFactory jobProcessFactory = getJobProcessFactory(configs);
- final ProcessFactory orchestratorProcessFactory = getOrchestratorProcessFactory(configs);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
@@ -390,6 +374,8 @@ public static void main(final String[] args) throws IOException, InterruptedExce
workspaceHelper,
workerConfigs);
+ final Optional containerOrchestratorConfig = getContainerOrchestratorConfig(configs);
+
final JobNotifier jobNotifier = new JobNotifier(
configs.getWebappUrl(),
configRepository,
@@ -401,7 +387,6 @@ public static void main(final String[] args) throws IOException, InterruptedExce
new WorkerApp(
workspaceRoot,
jobProcessFactory,
- orchestratorProcessFactory,
secretsHydrator,
temporalService,
configRepository,
@@ -418,7 +403,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
temporalWorkerRunFactory,
configs,
connectionHelper,
- configs.getContainerOrchestratorEnabled(),
+ containerOrchestratorConfig,
jobNotifier,
jobTracker).start();
}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java
new file mode 100644
index 000000000000..58a4e7de1c22
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncKubePodStatus.java
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.process;
+
+public enum AsyncKubePodStatus {
+ NOT_STARTED, // Pod hasn't been started yet.
+ INITIALIZING, // On-start container started but not completed
+ RUNNING, // Main container posted running
+ FAILED, // Reported status was "failed" or pod was in Error (or other terminal state) without a reported
+ // status.
+ SUCCEEDED; // Reported status was "success" so both main and on-start succeeded.
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java
new file mode 100644
index 000000000000..0e0ea440d878
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.process;
+
+import io.airbyte.commons.io.IOs;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.config.ResourceRequirements;
+import io.airbyte.workers.WorkerApp;
+import io.airbyte.workers.storage.DocumentStoreClient;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This process allows creating and managing a pod outside the lifecycle of the launching
+ * application. Unlike {@link KubePodProcess} there is no heartbeat mechanism that requires the
+ * launching pod and the launched pod to co-exist for the duration of execution for the launched
+ * pod.
+ *
+ * Instead, this process creates the pod and interacts with a document store on cloud storage to
+ * understand the state of the created pod.
+ *
+ * The document store is considered to be the truth when retrieving the status for an async pod
+ * process. If the store isn't updated by the underlying pod, it will appear as failed.
+ */
+@Slf4j
+public class AsyncOrchestratorPodProcess implements KubePod {
+
+ public static final String KUBE_POD_INFO = "KUBE_POD_INFO";
+ public static final String NO_OP = "NO_OP";
+
+ private final KubePodInfo kubePodInfo;
+ private final DocumentStoreClient documentStoreClient;
+ private final KubernetesClient kubernetesClient;
+ private final AtomicReference> cachedExitValue;
+
+ public AsyncOrchestratorPodProcess(
+ final KubePodInfo kubePodInfo,
+ final DocumentStoreClient documentStoreClient,
+ final KubernetesClient kubernetesClient) {
+ this.kubePodInfo = kubePodInfo;
+ this.documentStoreClient = documentStoreClient;
+ this.kubernetesClient = kubernetesClient;
+ this.cachedExitValue = new AtomicReference<>(Optional.empty());
+ }
+
+ public Optional getOutput() {
+ final var possibleOutput = getDocument(AsyncKubePodStatus.SUCCEEDED.name());
+
+ if (possibleOutput.isPresent() && possibleOutput.get().isBlank()) {
+ return Optional.empty();
+ } else {
+ return possibleOutput;
+ }
+ }
+
+ private int computeExitValue() {
+ final AsyncKubePodStatus docStoreStatus = getDocStoreStatus();
+
+ // trust the doc store if it's in a terminal state
+ if (docStoreStatus.equals(AsyncKubePodStatus.FAILED)) {
+ return 1;
+ } else if (docStoreStatus.equals(AsyncKubePodStatus.SUCCEEDED)) {
+ return 0;
+ }
+
+ final Pod pod = kubernetesClient.pods()
+ .inNamespace(getInfo().namespace())
+ .withName(getInfo().name())
+ .get();
+
+ // Since the pod creation blocks until the pod is created the first time,
+ // if the pod no longer exists (and we don't have a success/fail document)
+ // we must be in a failure state. If it wasn't able to write out its status
+ // we must assume failure, since the document store is the "truth" for
+ // async pod status.
+ if (pod == null) {
+ return 1;
+ }
+
+ // If the pod does exist, it may be in a terminal (error or completed) state.
+ final boolean isTerminal = KubePodProcess.isTerminal(pod);
+
+ if (isTerminal) {
+ // In case the doc store was updated in between when we pulled it and when
+ // we read the status from the Kubernetes API, we need to check the doc store again.
+ final AsyncKubePodStatus secondDocStoreStatus = getDocStoreStatus();
+ if (secondDocStoreStatus.equals(AsyncKubePodStatus.FAILED)) {
+ return 1;
+ } else if (secondDocStoreStatus.equals(AsyncKubePodStatus.SUCCEEDED)) {
+ return 0;
+ } else {
+ // otherwise, the actual pod is terminal when the doc store says it shouldn't be.
+ return 1;
+ }
+ }
+
+ // Otherwise, throw an exception because this is still running, which will be caught in hasExited
+ switch (docStoreStatus) {
+ case NOT_STARTED -> throw new IllegalThreadStateException("Pod hasn't started yet.");
+ case INITIALIZING -> throw new IllegalThreadStateException("Pod is initializing.");
+ default -> throw new IllegalThreadStateException("Pod is running.");
+ }
+ }
+
+ @Override
+ public int exitValue() {
+ final var optionalCached = cachedExitValue.get();
+
+ if (optionalCached.isPresent()) {
+ return optionalCached.get();
+ } else {
+ final var exitValue = computeExitValue();
+ cachedExitValue.set(Optional.of(exitValue));
+ return exitValue;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ final var wasDestroyed = kubernetesClient.pods()
+ .inNamespace(getInfo().namespace())
+ .withName(getInfo().name())
+ .delete();
+
+ if (wasDestroyed) {
+ log.info("Deleted pod {} in namespace {}", getInfo().name(), getInfo().namespace());
+ } else {
+ log.warn("Wasn't able to delete pod {} from namespace {}", getInfo().name(), getInfo().namespace());
+ }
+ }
+
+ // implementation copied from Process.java since this isn't a real Process
+ public boolean hasExited() {
+ try {
+ exitValue();
+ return true;
+ } catch (IllegalThreadStateException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
+ // implementation copied from Process.java since this isn't a real Process
+ long remainingNanos = unit.toNanos(timeout);
+ if (hasExited())
+ return true;
+ if (timeout <= 0)
+ return false;
+
+ long deadline = System.nanoTime() + remainingNanos;
+ do {
+ Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 100));
+ if (hasExited())
+ return true;
+ remainingNanos = deadline - System.nanoTime();
+ } while (remainingNanos > 0);
+
+ return false;
+ }
+
+ @Override
+ public int waitFor() throws InterruptedException {
+ boolean exited = waitFor(10, TimeUnit.DAYS);
+
+ if (exited) {
+ return exitValue();
+ } else {
+ throw new InterruptedException("Pod did not complete within timeout.");
+ }
+ }
+
+ @Override
+ public KubePodInfo getInfo() {
+ return kubePodInfo;
+ }
+
+ private Optional getDocument(final String key) {
+ return documentStoreClient.read(getInfo().namespace() + "/" + getInfo().name() + "/" + key);
+ }
+
+ private boolean checkStatus(final AsyncKubePodStatus status) {
+ return getDocument(status.name()).isPresent();
+ }
+
+ /**
+ * Checks terminal states first, then running, then initialized. Defaults to not started.
+ *
+ * The order matters here!
+ */
+ public AsyncKubePodStatus getDocStoreStatus() {
+ if (checkStatus(AsyncKubePodStatus.FAILED)) {
+ return AsyncKubePodStatus.FAILED;
+ } else if (checkStatus(AsyncKubePodStatus.SUCCEEDED)) {
+ return AsyncKubePodStatus.SUCCEEDED;
+ } else if (checkStatus(AsyncKubePodStatus.RUNNING)) {
+ return AsyncKubePodStatus.RUNNING;
+ } else if (checkStatus(AsyncKubePodStatus.INITIALIZING)) {
+ return AsyncKubePodStatus.INITIALIZING;
+ } else {
+ return AsyncKubePodStatus.NOT_STARTED;
+ }
+ }
+
+ // but does that mean there won't be a docker equivalent?
+ public void create(final String airbyteVersion,
+ final Map allLabels,
+ final ResourceRequirements resourceRequirements,
+ final Map fileMap,
+ final Map portMap) {
+ final Volume configVolume = new VolumeBuilder()
+ .withName("airbyte-config")
+ .withNewEmptyDir()
+ .withMedium("Memory")
+ .endEmptyDir()
+ .build();
+
+ final VolumeMount configVolumeMount = new VolumeMountBuilder()
+ .withName("airbyte-config")
+ .withMountPath(KubePodProcess.CONFIG_DIR)
+ .build();
+
+ final List containerPorts = KubePodProcess.createContainerPortList(portMap);
+
+ final var mainContainer = new ContainerBuilder()
+ .withName("main")
+ .withImage("airbyte/container-orchestrator:" + airbyteVersion)
+ .withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build())
+ .withPorts(containerPorts)
+ .withPorts(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null))
+ .withVolumeMounts(configVolumeMount)
+ .build();
+
+ final Pod pod = new PodBuilder()
+ .withApiVersion("v1")
+ .withNewMetadata()
+ .withName(getInfo().name())
+ .withNamespace(getInfo().namespace())
+ .withLabels(allLabels)
+ .endMetadata()
+ .withNewSpec()
+ .withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true)
+ .withRestartPolicy("Never")
+ .withContainers(mainContainer)
+ .withVolumes(configVolume)
+ .endSpec()
+ .build();
+
+ // should only create after the kubernetes API creates the pod
+ final var createdPod = kubernetesClient.pods().createOrReplace(pod);
+
+ log.info("Waiting for pod to be running...");
+ try {
+ kubernetesClient.pods()
+ .inNamespace(kubePodInfo.namespace())
+ .withName(kubePodInfo.name())
+ .waitUntilCondition(p -> {
+ return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
+ }, 5, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ final var containerState = kubernetesClient.pods()
+ .inNamespace(kubePodInfo.namespace())
+ .withName(kubePodInfo.name())
+ .get()
+ .getStatus()
+ .getContainerStatuses()
+ .get(0)
+ .getState();
+
+ if (containerState.getRunning() == null) {
+ throw new RuntimeException("Pod was not running, state was: " + containerState);
+ }
+
+ final var updatedFileMap = new HashMap<>(fileMap);
+ updatedFileMap.put(KUBE_POD_INFO, Jsons.serialize(kubePodInfo));
+
+ copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap);
+ }
+
+ public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map files) {
+ final List> fileEntries = new ArrayList<>(files.entrySet());
+
+ // copy this file last to indicate that the copy has completed
+ fileEntries.add(new AbstractMap.SimpleEntry<>(KubePodProcess.SUCCESS_FILE_NAME, ""));
+
+ Path tmpFile = null;
+ Process proc = null;
+ for (final Map.Entry file : fileEntries) {
+ try {
+ tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue()));
+
+ log.info("Uploading file: " + file.getKey());
+ final var containerPath = Path.of(KubePodProcess.CONFIG_DIR + "/" + file.getKey());
+
+ // using kubectl cp directly here, because both fabric and the official kube client APIs have
+ // several issues with copying files. See https://github.com/airbytehq/airbyte/issues/8643 for
+ // details.
+ final String command = String.format("kubectl cp %s %s/%s:%s -c %s", tmpFile, podDefinition.getMetadata().getNamespace(),
+ podDefinition.getMetadata().getName(), containerPath, "main");
+ log.info(command);
+
+ proc = Runtime.getRuntime().exec(command);
+ log.info("Waiting for kubectl cp to complete");
+ final int exitCode = proc.waitFor();
+
+ if (exitCode != 0) {
+ throw new IOException("kubectl cp failed with exit code " + exitCode);
+ }
+
+ log.info("kubectl cp complete, closing process");
+ } catch (final IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (tmpFile != null) {
+ tmpFile.toFile().delete();
+ }
+ if (proc != null) {
+ proc.destroy();
+ }
+ }
+ }
+ }
+
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java
new file mode 100644
index 000000000000..2cd4640eb185
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePod.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.process;
+
+import java.util.concurrent.TimeUnit;
+
+public interface KubePod {
+
+ int exitValue();
+
+ void destroy();
+
+ boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException;
+
+ int waitFor() throws InterruptedException;
+
+ KubePodInfo getInfo();
+
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java
new file mode 100644
index 000000000000..0a0ab5949a51
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java
@@ -0,0 +1,7 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.process;
+
+public record KubePodInfo(String namespace, String name) {}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
index 0274f42126e5..c0b6f83401eb 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
@@ -90,7 +90,7 @@
*/
// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700.
-public class KubePodProcess extends Process {
+public class KubePodProcess extends Process implements KubePod {
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);
@@ -105,11 +105,11 @@ public class KubePodProcess extends Process {
private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin";
private static final String STDOUT_PIPE_FILE = PIPES_DIR + "/stdout";
private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr";
- private static final String CONFIG_DIR = "/config";
+ public static final String CONFIG_DIR = "/config";
private static final String TERMINATION_DIR = "/termination";
private static final String TERMINATION_FILE_MAIN = TERMINATION_DIR + "/main";
private static final String TERMINATION_FILE_CHECK = TERMINATION_DIR + "/check";
- private static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING";
+ public static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING";
// 143 is the typical SIGTERM exit code.
private static final int KILLED_EXIT_CODE = 143;
@@ -182,7 +182,7 @@ private static Container getMain(final String image,
// communicates its completion to the heartbeat check via a file and closes itself if the heartbeat
// fails
- final var mainCommand = MoreResources.readResource("entrypoints/main.sh")
+ final var mainCommand = MoreResources.readResource("entrypoints/sync/main.sh")
.replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK)
.replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN)
.replaceAll("OPTIONAL_STDIN", optionalStdin)
@@ -191,11 +191,7 @@ private static Container getMain(final String image,
.replaceAll("STDERR_PIPE_FILE", STDERR_PIPE_FILE)
.replaceAll("STDOUT_PIPE_FILE", STDOUT_PIPE_FILE);
- final List containerPorts = internalToExternalPorts.keySet().stream()
- .map(integer -> new ContainerPortBuilder()
- .withContainerPort(integer)
- .build())
- .collect(Collectors.toList());
+ final List containerPorts = createContainerPortList(internalToExternalPorts);
final List envVars = envMap.entrySet().stream()
.map(entry -> new EnvVar(entry.getKey(), entry.getValue(), null))
@@ -218,9 +214,17 @@ private static Container getMain(final String image,
return containerBuilder.build();
}
- private static void copyFilesToKubeConfigVolume(final KubernetesClient client,
- final Pod podDefinition,
- final Map files) {
+ public static List createContainerPortList(final Map internalToExternalPorts) {
+ return internalToExternalPorts.keySet().stream()
+ .map(integer -> new ContainerPortBuilder()
+ .withContainerPort(integer)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ public static void copyFilesToKubeConfigVolume(final KubernetesClient client,
+ final Pod podDefinition,
+ final Map files) {
final List> fileEntries = new ArrayList<>(files.entrySet());
// copy this file last to indicate that the copy has completed
@@ -262,7 +266,11 @@ private static void copyFilesToKubeConfigVolume(final KubernetesClient client,
throw new RuntimeException(e);
} finally {
if (tmpFile != null) {
- tmpFile.toFile().delete();
+ try {
+ tmpFile.toFile().delete();
+ } catch (Exception e) {
+ LOGGER.info("Caught exception when deleting temp file but continuing to allow process deletion.", e);
+ }
}
if (proc != null) {
proc.destroy();
@@ -429,7 +437,7 @@ public KubePodProcess(final boolean isOrchestrator,
// communicates via a file if it isn't able to reach the heartbeating server and succeeds if the
// main container completes
- final String heartbeatCommand = MoreResources.readResource("entrypoints/check.sh")
+ final String heartbeatCommand = MoreResources.readResource("entrypoints/sync/check.sh")
.replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK)
.replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN)
.replaceAll("HEARTBEAT_URL", kubeHeartbeatUrl);
@@ -552,7 +560,7 @@ public InputStream getErrorStream() {
public int waitFor() throws InterruptedException {
final Pod refreshedPod =
fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
- fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
+ fabricClient.resource(refreshedPod).waitUntilCondition(KubePodProcess::isTerminal, 10, TimeUnit.DAYS);
wasKilled.set(true);
return exitValue();
}
@@ -586,6 +594,11 @@ public Info info() {
return new KubePodProcessInfo(podDefinition.getMetadata().getName());
}
+ @Override
+ public KubePodInfo getInfo() {
+ return new KubePodInfo(podDefinition.getMetadata().getNamespace(), podDefinition.getMetadata().getName());
+ }
+
/**
* Close all open resource in the opposite order of resource creation.
*
@@ -620,7 +633,7 @@ private void close() {
LOGGER.debug("Closed {}", podDefinition.getMetadata().getName());
}
- private boolean isTerminal(final Pod pod) {
+ public static boolean isTerminal(final Pod pod) {
if (pod.getStatus() != null) {
// Check if "main" container has terminated, as that defines whether the parent process has
// terminated.
@@ -698,7 +711,7 @@ public int exitValue() {
return returnCode;
}
- private static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) {
+ public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) {
if (resourceRequirements != null) {
final Map requestMap = new HashMap<>();
// if null then use unbounded resource allocation
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java
index 5323d9e2e4e7..c6b18cfce653 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java
@@ -117,12 +117,7 @@ public Process create(final String jobId,
final int stderrLocalPort = KubePortManagerSingleton.getInstance().take();
LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort);
- final var allLabels = new HashMap<>(customLabels);
- final var generalKubeLabels = Map.of(
- JOB_LABEL_KEY, jobId,
- ATTEMPT_LABEL_KEY, String.valueOf(attempt),
- WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
- allLabels.putAll(generalKubeLabels);
+ final var allLabels = getLabels(jobId, attempt, customLabels);
return new KubePodProcess(
isOrchestrator,
@@ -155,6 +150,19 @@ public Process create(final String jobId,
}
}
+ public static Map getLabels(final String jobId, final int attemptId, final Map customLabels) {
+ final var allLabels = new HashMap<>(customLabels);
+
+ final var generalKubeLabels = Map.of(
+ JOB_LABEL_KEY, jobId,
+ ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
+ WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
+
+ allLabels.putAll(generalKubeLabels);
+
+ return allLabels;
+ }
+
/**
* Docker image names are by convention separated by slashes. The last portion is the image's name.
* This is followed by a colon and a version number. e.g. airbyte/scheduler:v1 or
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java b/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java
new file mode 100644
index 000000000000..ce4be0d5e36a
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/storage/StateClients.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.storage;
+
+import io.airbyte.config.storage.CloudStorageConfigs;
+import java.nio.file.Path;
+
+public class StateClients {
+
+ public static DocumentStoreClient create(final CloudStorageConfigs cloudStorageConfigs, final Path prefix) {
+ DocumentStoreClient documentStoreClient = null;
+
+ switch (cloudStorageConfigs.getType()) {
+ case S3 -> {
+ documentStoreClient = S3DocumentStoreClient.s3(cloudStorageConfigs.getS3Config(), prefix);
+ }
+ case MINIO -> {
+ documentStoreClient = S3DocumentStoreClient.minio(cloudStorageConfigs.getMinioConfig(), prefix);
+ }
+ case GCS -> {
+ documentStoreClient = GcsDocumentStoreClient.create(cloudStorageConfigs.getGcsConfig(), prefix);
+ }
+ }
+
+ return documentStoreClient;
+ }
+
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java b/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java
deleted file mode 100644
index 2da164ec4008..000000000000
--- a/airbyte-workers/src/main/java/io/airbyte/workers/storage/WorkerStore.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
- */
-
-package io.airbyte.workers.storage;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import io.airbyte.commons.json.Jsons;
-import java.util.Optional;
-import java.util.UUID;
-
-/**
- * JSON layer over { @link CloudDocumentStore }.
- */
-public class WorkerStore {
-
- private final DocumentStoreClient documentStoreClient;
-
- public WorkerStore(final DocumentStoreClient documentStoreClient) {
- this.documentStoreClient = documentStoreClient;
- }
-
- /**
- * Set the document for an id. Overwrites existing document, if present.
- *
- * @param id - id to associate document with
- * @param document - document to persist
- */
- void set(final UUID id, final JsonNode document) {
- documentStoreClient.write(id.toString(), Jsons.serialize(document));
- }
-
- /**
- * Fetch previously persisted document.
- *
- * @param id - id that the document is associated with
- * @return returns document if present, otherwise empty
- */
- Optional get(final UUID id) {
- return documentStoreClient.read(id.toString()).map(Jsons::deserialize);
- }
-
- /**
- * Delete persisted document.
- *
- * @param id - id that the document is associated with
- * @return true if actually deletes something, otherwise false. (e.g. false if document doest not
- * exist).
- */
- boolean delete(final UUID id) {
- return documentStoreClient.delete(id.toString());
- }
-
-}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java
index 4a3eb9eb9efb..46b73467a6d6 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java
@@ -4,134 +4,35 @@
package io.airbyte.workers.temporal.sync;
-import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
-import io.airbyte.commons.logging.LoggingHelper;
-import io.airbyte.commons.logging.MdcScope;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
-import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
-import io.airbyte.workers.WorkerException;
-import io.airbyte.workers.WorkerUtils;
-import io.airbyte.workers.process.KubeProcessFactory;
-import io.airbyte.workers.process.ProcessFactory;
-import java.nio.file.Path;
import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-// todo: DRY the launchers
-public class DbtLauncherWorker implements Worker {
+public class DbtLauncherWorker extends LauncherWorker {
- private static final Logger LOGGER = LoggerFactory.getLogger(DbtLauncherWorker.class);
-
- private static final MdcScope.Builder LOG_MDC_BUILDER = new MdcScope.Builder()
- .setLogPrefix("dbt-orchestrator")
- .setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND);
-
- public static final String DBT = "dbt";
+ public static final String DBT = "dbt-orchestrator";
+ private static final String POD_NAME_PREFIX = "orchestrator-dbt";
public static final String INIT_FILE_DESTINATION_LAUNCHER_CONFIG = "destinationLauncherConfig.json";
- private final WorkerConfigs workerConfigs;
- private final ProcessFactory processFactory;
- private final String airbyteVersion;
- private final Path workspaceRoot;
- private final IntegrationLauncherConfig destinationLauncherConfig;
- private final JobRunConfig jobRunConfig;
-
- private final AtomicBoolean cancelled = new AtomicBoolean(false);
-
- private Process process;
-
- public DbtLauncherWorker(
- final Path workspaceRoot,
- final IntegrationLauncherConfig destinationLauncherConfig,
+ public DbtLauncherWorker(final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final WorkerConfigs workerConfigs,
- final ProcessFactory processFactory,
+ final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig,
final String airbyteVersion) {
- this.workspaceRoot = workspaceRoot;
- this.destinationLauncherConfig = destinationLauncherConfig;
- this.jobRunConfig = jobRunConfig;
- this.workerConfigs = workerConfigs;
- this.processFactory = processFactory;
- this.airbyteVersion = airbyteVersion;
- }
-
- @Override
- public Void run(OperatorDbtInput operatorDbtInput, Path jobRoot) throws WorkerException {
- try {
- final Path jobPath = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
-
- // we want to filter down to remove secrets, so we aren't writing over a bunch of unnecessary
- // secrets
- final Map envMap = System.getenv().entrySet().stream()
- .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- final Map fileMap = Map.of(
- OrchestratorConstants.INIT_FILE_APPLICATION, DBT,
- OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig),
- OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(operatorDbtInput),
- OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap),
- INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig));
-
- process = processFactory.create(
- "runner-" + UUID.randomUUID().toString().substring(0, 10),
- 0,
- jobPath,
- "airbyte/container-orchestrator:" + airbyteVersion,
- false,
- fileMap,
- null,
- workerConfigs.getResourceRequirements(),
- Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_RUNNER),
- Map.of(
- WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT,
- OrchestratorConstants.PORT1, OrchestratorConstants.PORT1,
- OrchestratorConstants.PORT2, OrchestratorConstants.PORT2,
- OrchestratorConstants.PORT3, OrchestratorConstants.PORT3,
- OrchestratorConstants.PORT4, OrchestratorConstants.PORT4));
-
- LineGobbler.gobble(process.getInputStream(), LOGGER::info, LOG_MDC_BUILDER);
- LineGobbler.gobble(process.getErrorStream(), LOGGER::error, LOG_MDC_BUILDER);
-
- WorkerUtils.wait(process);
-
- if (process.exitValue() != 0) {
- throw new WorkerException("Non-zero exit code!");
- }
- } catch (Exception e) {
- if (cancelled.get()) {
- throw new WorkerException("Sync was cancelled.", e);
- } else {
- throw new WorkerException("Running the sync attempt failed", e);
- }
- }
-
- return null;
- }
-
- @Override
- public void cancel() {
- cancelled.set(true);
-
- if (process == null) {
- return;
- }
-
- LOGGER.debug("Closing dbt launcher process");
- WorkerUtils.gentleClose(workerConfigs, process, 1, TimeUnit.MINUTES);
- if (process.isAlive() || process.exitValue() != 0) {
- LOGGER.error("Dbt launcher process wasn't successful");
- }
+ super(
+ DBT,
+ POD_NAME_PREFIX,
+ jobRunConfig,
+ Map.of(
+ INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)),
+ containerOrchestratorConfig,
+ airbyteVersion,
+ workerConfigs.getResourceRequirements(),
+ Void.class);
}
}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java
index 404eaef20412..425a915c69bb 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java
@@ -18,20 +18,20 @@
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DbtTransformationWorker;
import io.airbyte.workers.Worker;
+import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import java.nio.file.Path;
+import java.util.Optional;
import java.util.function.Supplier;
public class DbtTransformationActivityImpl implements DbtTransformationActivity {
- private final boolean containerOrchestratorEnabled;
private final WorkerConfigs workerConfigs;
private final ProcessFactory jobProcessFactory;
- private final ProcessFactory orchestratorProcessFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
@@ -41,11 +41,11 @@ public class DbtTransformationActivityImpl implements DbtTransformationActivity
private final String databasePassword;
private final String databaseUrl;
private final String airbyteVersion;
+ private final Optional containerOrchestratorConfig;
- public DbtTransformationActivityImpl(final boolean containerOrchestratorEnabled,
+ public DbtTransformationActivityImpl(final Optional containerOrchestratorConfig,
final WorkerConfigs workerConfigs,
final ProcessFactory jobProcessFactory,
- final ProcessFactory orchestratorProcessFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
@@ -54,10 +54,9 @@ public DbtTransformationActivityImpl(final boolean containerOrchestratorEnabled,
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
- this.containerOrchestratorEnabled = containerOrchestratorEnabled;
+ this.containerOrchestratorConfig = containerOrchestratorConfig;
this.workerConfigs = workerConfigs;
this.jobProcessFactory = jobProcessFactory;
- this.orchestratorProcessFactory = orchestratorProcessFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = new AirbyteConfigValidator();
@@ -85,7 +84,7 @@ public Void run(final JobRunConfig jobRunConfig,
final CheckedSupplier, Exception> workerFactory;
- if (containerOrchestratorEnabled) {
+ if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(destinationLauncherConfig, jobRunConfig, resourceRequirements);
@@ -122,11 +121,10 @@ private CheckedSupplier, Exception> getContainerL
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig) {
return () -> new DbtLauncherWorker(
- workspaceRoot,
destinationLauncherConfig,
jobRunConfig,
workerConfigs,
- orchestratorProcessFactory,
+ containerOrchestratorConfig.get(),
airbyteVersion);
}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java
new file mode 100644
index 000000000000..7481a5ae5b85
--- /dev/null
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.workers.temporal.sync;
+
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.config.ResourceRequirements;
+import io.airbyte.scheduler.models.JobRunConfig;
+import io.airbyte.workers.Worker;
+import io.airbyte.workers.WorkerApp;
+import io.airbyte.workers.WorkerException;
+import io.airbyte.workers.process.AsyncKubePodStatus;
+import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
+import io.airbyte.workers.process.KubePodInfo;
+import io.airbyte.workers.process.KubeProcessFactory;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Coordinates configuring and managing the state of an async process. This is tied to the (job_id,
+ * attempt_id) and will attempt to kill off lower attempt ids.
+ *
+ * @param a json-serializable input class for the worker
+ * @param