diff --git a/.env b/.env index fa0527b81960..8b114e7b0bb1 100644 --- a/.env +++ b/.env @@ -40,6 +40,7 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local # Issue: https://github.com/airbytehq/airbyte/issues/577 HACK_LOCAL_ROOT_PARENT=/tmp +# Maximum simultaneous jobs SUBMITTER_NUM_THREADS=10 # Miscellaneous diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index b3babb14afab..7fd55d1088f9 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -456,6 +456,9 @@ jobs: - role: worker EOF + - name: Check Docker System Info + run: docker system info + - name: KIND Kubernetes Cluster Setup uses: helm/kind-action@v1.2.0 with: @@ -475,13 +478,18 @@ jobs: AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }} GOOGLE_CLOUD_STORAGE_TEST_CREDS: ${{ secrets.GOOGLE_CLOUD_STORAGE_TEST_CREDS }} + - name: Show Disk Usage + run: | + df -h + docker system df + - name: Run Kubernetes End-to-End Acceptance Tests env: USER: root HOME: /home/runner AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }} run: | - IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh + CI=true IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh # In case of self-hosted EC2 errors, remove this block. stop-kube-acceptance-test-runner: name: Stop Kube Acceptance Test EC2 Runner diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 5e69b2dffc82..48420dd4f2cc 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -82,6 +82,8 @@ public interface Configs { List getWorkerPodTolerations(); + MaxWorkersConfig getMaxWorkers(); + String getTemporalHost(); Set getTemporalWorkerPorts(); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index a0d61ef5ef60..f87b47d631b8 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -72,6 +72,10 @@ public class EnvConfigs implements Configs { private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB"; + public static final String MAX_SPEC_WORKERS = "MAX_SPEC_WORKERS"; + public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS"; + public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS"; + public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS"; private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE"; @@ -87,6 +91,11 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60; private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000; + public static final long DEFAULT_MAX_SPEC_WORKERS = 5; + public static final long DEFAULT_MAX_CHECK_WORKERS = 5; + public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5; + public static final long DEFAULT_MAX_SYNC_WORKERS = 5; + public static final String DEFAULT_NETWORK = "host"; private final Function getEnv; @@ -292,6 +301,15 @@ public List getWorkerPodTolerations() { .collect(Collectors.toList()); } + @Override + public MaxWorkersConfig getMaxWorkers() { + return new MaxWorkersConfig( + Math.toIntExact(getEnvOrDefault(MAX_SPEC_WORKERS, DEFAULT_MAX_SPEC_WORKERS)), + Math.toIntExact(getEnvOrDefault(MAX_CHECK_WORKERS, DEFAULT_MAX_CHECK_WORKERS)), + Math.toIntExact(getEnvOrDefault(MAX_DISCOVER_WORKERS, DEFAULT_MAX_DISCOVER_WORKERS)), + Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS))); + } + @Override public String getTemporalHost() { return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233"); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/MaxWorkersConfig.java b/airbyte-config/models/src/main/java/io/airbyte/config/MaxWorkersConfig.java new file mode 100644 index 000000000000..53ba51fb56f3 --- /dev/null +++ b/airbyte-config/models/src/main/java/io/airbyte/config/MaxWorkersConfig.java @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config; + +public class MaxWorkersConfig { + + private final int maxSpecWorkers; + private final int maxCheckWorkers; + private final int maxDiscoverWorkers; + private final int maxSyncWorkers; + + public MaxWorkersConfig(final int maxSpecWorkers, final int maxCheckWorkers, final int maxDiscoverWorkers, final int maxSyncWorkers) { + this.maxSpecWorkers = maxSpecWorkers; + this.maxCheckWorkers = maxCheckWorkers; + this.maxDiscoverWorkers = maxDiscoverWorkers; + this.maxSyncWorkers = maxSyncWorkers; + } + + public int getMaxSpecWorkers() { + return maxSpecWorkers; + } + + public int getMaxCheckWorkers() { + return maxCheckWorkers; + } + + public int getMaxDiscoverWorkers() { + return maxDiscoverWorkers; + } + + public int getMaxSyncWorkers() { + return maxSyncWorkers; + } + +} diff --git a/airbyte-scheduler/app/Dockerfile b/airbyte-scheduler/app/Dockerfile index 57b3faa0ff7d..011d4c51e816 100644 --- a/airbyte-scheduler/app/Dockerfile +++ b/airbyte-scheduler/app/Dockerfile @@ -1,32 +1,10 @@ FROM openjdk:14.0.2-slim AS scheduler - -# Install Docker to launch worker images. Eventually should be replaced with Docker-java. -# See https://gitter.im/docker-java/docker-java?at=5f3eb87ba8c1780176603f4e for more information on why we are not currently using Docker-java -RUN apt-get update && apt-get install -y \ - apt-transport-https \ - ca-certificates \ - curl \ - gnupg-agent \ - software-properties-common -RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add - -RUN add-apt-repository \ - "deb [arch=amd64] https://download.docker.com/linux/debian \ - $(lsb_release -cs) \ - stable" -RUN apt-get update && apt-get install -y docker-ce-cli jq - ENV APPLICATION airbyte-scheduler WORKDIR /app -# Install kubectl -RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.14/bin/linux/amd64/kubectl -RUN chmod +x ./kubectl -RUN mv ./kubectl /usr/local/bin - -# Move and run scheduler -COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar +COPY build/distributions/${APPLICATION}-0*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index c8b795325ed0..d77912d5abcf 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -27,9 +27,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClientSingleton; -import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.invoker.ApiException; -import io.airbyte.api.client.model.HealthCheckRead; import io.airbyte.commons.concurrency.GracefulShutdownHandler; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; @@ -49,21 +46,9 @@ import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; -import io.airbyte.workers.process.DockerProcessFactory; -import io.airbyte.workers.process.KubePortManagerSingleton; -import io.airbyte.workers.process.KubeProcessFactory; -import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.process.WorkerHeartbeatServer; +import io.airbyte.workers.WorkerApp; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalPool; -import io.airbyte.workers.temporal.TemporalUtils; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.util.Config; -import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; -import java.net.InetAddress; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; @@ -92,43 +77,33 @@ public class SchedulerApp { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; - private static int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); + private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5); private static final Duration CLEANING_DELAY = Duration.ofHours(2); private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build(); - private static final int KUBE_HEARTBEAT_PORT = 9000; private final Path workspaceRoot; - private final ProcessFactory processFactory; private final JobPersistence jobPersistence; private final ConfigRepository configRepository; private final JobCleaner jobCleaner; private final JobNotifier jobNotifier; private final TemporalClient temporalClient; - private final WorkflowServiceStubs temporalService; public SchedulerApp(Path workspaceRoot, - ProcessFactory processFactory, JobPersistence jobPersistence, ConfigRepository configRepository, JobCleaner jobCleaner, JobNotifier jobNotifier, - TemporalClient temporalClient, - WorkflowServiceStubs temporalService) { + TemporalClient temporalClient) { this.workspaceRoot = workspaceRoot; - this.processFactory = processFactory; this.jobPersistence = jobPersistence; this.configRepository = configRepository; this.jobCleaner = jobCleaner; this.jobNotifier = jobNotifier; this.temporalClient = temporalClient; - this.temporalService = temporalService; } public void start() throws IOException { - final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory); - temporalPool.run(); - final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); @@ -178,42 +153,6 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi } } - private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException { - if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { - final ApiClient officialClient = Config.defaultClient(); - final KubernetesClient fabricClient = new DefaultKubernetesClient(); - final String localIp = InetAddress.getLocalHost().getHostAddress(); - final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; - LOGGER.info("Using Kubernetes namespace: {}", configs.getKubeNamespace()); - return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl); - } else { - return new DockerProcessFactory( - configs.getWorkspaceRoot(), - configs.getWorkspaceDockerMount(), - configs.getLocalDockerMount(), - configs.getDockerNetwork()); - } - } - - private static void waitForServer(Configs configs) throws InterruptedException { - final AirbyteApiClient apiClient = new AirbyteApiClient( - new io.airbyte.api.client.invoker.ApiClient().setScheme("http") - .setHost(configs.getAirbyteApiHost()) - .setPort(configs.getAirbyteApiPort()) - .setBasePath("/api")); - - boolean isHealthy = false; - while (!isHealthy) { - try { - HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck(); - isHealthy = healthCheck.getDb(); - } catch (ApiException e) { - LOGGER.info("Waiting for server to become available..."); - Thread.sleep(2000); - } - } - } - public static void main(String[] args) throws IOException, InterruptedException { final Configs configs = new EnvConfigs(); @@ -227,7 +166,7 @@ public static void main(String[] args) throws IOException, InterruptedException LOGGER.info("temporalHost = " + temporalHost); // Wait for the server to initialize the database and run migration - waitForServer(configs); + WorkerApp.waitForServer(configs); LOGGER.info("Creating Job DB connection pool..."); final Database jobDatabase = new JobsDatabaseInstance( @@ -236,8 +175,6 @@ public static void main(String[] args) throws IOException, InterruptedException configs.getDatabaseUrl()) .getInitialized(); - final ProcessFactory processFactory = getProcessBuilderFactory(configs); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final Database configDatabase = new ConfigsDatabaseInstance( configs.getConfigDatabaseUser(), @@ -252,26 +189,6 @@ public static void main(String[] args) throws IOException, InterruptedException jobPersistence); final JobNotifier jobNotifier = new JobNotifier(configs.getWebappUrl(), configRepository, new WorkspaceHelper(configRepository, jobPersistence)); - if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { - var supportedWorkers = KubePortManagerSingleton.getSupportedWorkers(); - if (supportedWorkers < SUBMITTER_NUM_THREADS) { - LOGGER.warn("{} workers configured with only {} ports available. Insufficient ports. Setting workers to {}.", SUBMITTER_NUM_THREADS, - KubePortManagerSingleton.getNumAvailablePorts(), supportedWorkers); - SUBMITTER_NUM_THREADS = supportedWorkers; - } - - Map mdc = MDC.getCopyOfContextMap(); - Executors.newSingleThreadExecutor().submit( - () -> { - MDC.setContextMap(mdc); - try { - new WorkerHeartbeatServer(KUBE_HEARTBEAT_PORT).start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get()); TrackingClientSingleton.initialize( @@ -281,11 +198,10 @@ public static void main(String[] args) throws IOException, InterruptedException configs.getAirbyteVersion(), configRepository); - final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot); LOGGER.info("Launching scheduler..."); - new SchedulerApp(workspaceRoot, processFactory, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient, temporalService) + new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient) .start(); } diff --git a/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java b/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java index 7b9648066690..2d25a0e28385 100644 --- a/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java +++ b/airbyte-tests/src/main/java/io/airbyte/test/airbyte_test_container/AirbyteTestContainer.java @@ -88,6 +88,7 @@ public void start() throws IOException, InterruptedException { serviceLogConsumer(dockerComposeContainer, "scheduler"); serviceLogConsumer(dockerComposeContainer, "server"); serviceLogConsumer(dockerComposeContainer, "webapp"); + serviceLogConsumer(dockerComposeContainer, "worker"); serviceLogConsumer(dockerComposeContainer, "airbyte-temporal"); dockerComposeContainer.start(); diff --git a/airbyte-workers/Dockerfile b/airbyte-workers/Dockerfile new file mode 100644 index 000000000000..86fc3f1ecec8 --- /dev/null +++ b/airbyte-workers/Dockerfile @@ -0,0 +1,33 @@ +FROM openjdk:14.0.2-slim AS worker + +# Install Docker to launch worker images. Eventually should be replaced with Docker-java. +# See https://gitter.im/docker-java/docker-java?at=5f3eb87ba8c1780176603f4e for more information on why we are not currently using Docker-java +RUN apt-get update && apt-get install -y \ + apt-transport-https \ + ca-certificates \ + curl \ + gnupg-agent \ + software-properties-common +RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add - +RUN add-apt-repository \ + "deb [arch=amd64] https://download.docker.com/linux/debian \ + $(lsb_release -cs) \ + stable" +RUN apt-get update && apt-get install -y docker-ce-cli jq + +ENV APPLICATION airbyte-workers + +WORKDIR /app + +# Install kubectl +RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.14/bin/linux/amd64/kubectl +RUN chmod +x ./kubectl +RUN mv ./kubectl /usr/local/bin + +# Move and run worker +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +# wait for upstream dependencies to become available before starting server +ENTRYPOINT ["/bin/bash", "-c", "bin/${APPLICATION}"] diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 32f37bd3ea60..7fda83d72c4b 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -1,7 +1,7 @@ import org.jsonschema2pojo.SourceType plugins { - id 'java-library' + id 'application' id 'com.github.eirnym.js2p' version '1.0' id 'airbyte-integration-test-java' } @@ -22,6 +22,8 @@ dependencies { implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723' implementation 'org.eclipse.jetty:jetty-servlet:9.4.31.v20200723' + + implementation project(':airbyte-api') implementation project(':airbyte-config:models') implementation project(':airbyte-db:lib') implementation project(':airbyte-json-validation') @@ -50,3 +52,9 @@ jsonSchema2Pojo { includeConstructors = false includeSetters = true } + +mainClassName = 'io.airbyte.workers.WorkerApp' + +application { + mainClass = mainClassName +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java new file mode 100644 index 000000000000..b3b2637ab580 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -0,0 +1,181 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers; + +import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.invoker.ApiException; +import io.airbyte.api.client.model.HealthCheckRead; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; +import io.airbyte.config.MaxWorkersConfig; +import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.workers.process.DockerProcessFactory; +import io.airbyte.workers.process.KubeProcessFactory; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.process.WorkerHeartbeatServer; +import io.airbyte.workers.temporal.CheckConnectionWorkflow; +import io.airbyte.workers.temporal.DiscoverCatalogWorkflow; +import io.airbyte.workers.temporal.SpecWorkflow; +import io.airbyte.workers.temporal.SyncWorkflow; +import io.airbyte.workers.temporal.TemporalJobType; +import io.airbyte.workers.temporal.TemporalUtils; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.util.Config; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerOptions; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class WorkerApp { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerApp.class); + public static final int KUBE_HEARTBEAT_PORT = 9000; + + private final Path workspaceRoot; + private final ProcessFactory processFactory; + private final WorkflowServiceStubs temporalService; + private final MaxWorkersConfig maxWorkers; + + public WorkerApp(Path workspaceRoot, + ProcessFactory processFactory, + WorkflowServiceStubs temporalService, + MaxWorkersConfig maxWorkers) { + this.workspaceRoot = workspaceRoot; + this.processFactory = processFactory; + this.temporalService = temporalService; + this.maxWorkers = maxWorkers; + } + + public void start() { + Map mdc = MDC.getCopyOfContextMap(); + Executors.newSingleThreadExecutor().submit( + () -> { + MDC.setContextMap(mdc); + try { + new WorkerHeartbeatServer(KUBE_HEARTBEAT_PORT).start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); + + final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); + specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class); + specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(processFactory, workspaceRoot)); + + final Worker checkConnectionWorker = + factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); + checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class); + checkConnectionWorker.registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, workspaceRoot)); + + final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); + discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class); + discoverWorker.registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, workspaceRoot)); + + final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); + syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); + syncWorker.registerActivitiesImplementations( + new SyncWorkflow.ReplicationActivityImpl(processFactory, workspaceRoot), + new SyncWorkflow.NormalizationActivityImpl(processFactory, workspaceRoot), + new SyncWorkflow.DbtTransformationActivityImpl(processFactory, workspaceRoot)); + + factory.start(); + } + + private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException { + if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { + final ApiClient officialClient = Config.defaultClient(); + final KubernetesClient fabricClient = new DefaultKubernetesClient(); + final String localIp = InetAddress.getLocalHost().getHostAddress(); + final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; + LOGGER.info("Using Kubernetes namespace: {}", configs.getKubeNamespace()); + return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl); + } else { + return new DockerProcessFactory( + configs.getWorkspaceRoot(), + configs.getWorkspaceDockerMount(), + configs.getLocalDockerMount(), + configs.getDockerNetwork()); + } + } + + public static void waitForServer(Configs configs) throws InterruptedException { + final AirbyteApiClient apiClient = new AirbyteApiClient( + new io.airbyte.api.client.invoker.ApiClient().setScheme("http") + .setHost(configs.getAirbyteApiHost()) + .setPort(configs.getAirbyteApiPort()) + .setBasePath("/api")); + + boolean isHealthy = false; + while (!isHealthy) { + try { + HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck(); + isHealthy = healthCheck.getDb(); + } catch (ApiException e) { + LOGGER.info("Waiting for server to become available..."); + Thread.sleep(2000); + } + } + } + + private static final WorkerOptions getWorkerOptions(int max) { + return WorkerOptions.newBuilder() + .setMaxConcurrentActivityExecutionSize(max) + .build(); + } + + public static void main(String[] args) throws IOException, InterruptedException { + final Configs configs = new EnvConfigs(); + + waitForServer(configs); + + LogClientSingleton.setWorkspaceMdc(LogClientSingleton.getSchedulerLogsRoot(configs)); + + final Path workspaceRoot = configs.getWorkspaceRoot(); + LOGGER.info("workspaceRoot = " + workspaceRoot); + + final String temporalHost = configs.getTemporalHost(); + LOGGER.info("temporalHost = " + temporalHost); + + final ProcessFactory processFactory = getProcessBuilderFactory(configs); + + final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + + new WorkerApp(workspaceRoot, processFactory, temporalService, configs.getMaxWorkers()).start(); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java deleted file mode 100644 index c8c28282419c..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.workers.temporal; - -import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.SyncWorkflow.DbtTransformationActivityImpl; -import io.airbyte.workers.temporal.SyncWorkflow.NormalizationActivityImpl; -import io.airbyte.workers.temporal.SyncWorkflow.ReplicationActivityImpl; -import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; -import io.temporal.worker.Worker; -import io.temporal.worker.WorkerFactory; -import java.nio.file.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TemporalPool implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class); - - private final WorkflowServiceStubs temporalService; - private final Path workspaceRoot; - private final ProcessFactory processFactory; - - public TemporalPool(WorkflowServiceStubs temporalService, Path workspaceRoot, ProcessFactory processFactory) { - this.temporalService = temporalService; - this.workspaceRoot = workspaceRoot; - this.processFactory = processFactory; - } - - @Override - public void run() { - - final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); - - final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name()); - specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class); - specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(processFactory, workspaceRoot)); - - final Worker checkConnectionWorker = factory.newWorker(TemporalJobType.CHECK_CONNECTION.name()); - checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class); - checkConnectionWorker.registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, workspaceRoot)); - - final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name()); - discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class); - discoverWorker.registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, workspaceRoot)); - - final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name()); - syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); - syncWorker.registerActivitiesImplementations( - new ReplicationActivityImpl(processFactory, workspaceRoot), - new NormalizationActivityImpl(processFactory, workspaceRoot), - new DbtTransformationActivityImpl(processFactory, workspaceRoot)); - - factory.start(); - } - -} diff --git a/build.gradle b/build.gradle index 78f14f4374e6..3226cc06136c 100644 --- a/build.gradle +++ b/build.gradle @@ -117,6 +117,7 @@ allprojects { ":airbyte-db:lib", ":airbyte-migration", ":airbyte-scheduler:app", + ":airbyte-workers", ":airbyte-server", ":airbyte-webapp", ].toSet().asImmutable() diff --git a/docker-compose.build.yaml b/docker-compose.build.yaml index a19df0cc9c92..bce29f9bbb07 100644 --- a/docker-compose.build.yaml +++ b/docker-compose.build.yaml @@ -22,6 +22,13 @@ services: context: airbyte-scheduler/app labels: io.airbyte.git-revision: ${GIT_REVISION} + worker: + image: airbyte/worker:${VERSION} + build: + dockerfile: Dockerfile + context: airbyte-workers + labels: + io.airbyte.git-revision: ${GIT_REVISION} server: image: airbyte/server:${VERSION} build: diff --git a/docker-compose.yaml b/docker-compose.yaml index c5a633e0d0db..367d481d065f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -33,6 +33,48 @@ services: logging: *default-logging container_name: airbyte-scheduler restart: unless-stopped + environment: + - WEBAPP_URL=${WEBAPP_URL} + - DATABASE_USER=${DATABASE_USER} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_URL=${DATABASE_URL} + - CONFIG_DATABASE_USER=${CONFIG_DATABASE_USER:-} + - CONFIG_DATABASE_PASSWORD=${CONFIG_DATABASE_PASSWORD:-} + - CONFIG_DATABASE_URL=${CONFIG_DATABASE_URL:-} + - RUN_DATABASE_MIGRATION_ON_STARTUP=${RUN_DATABASE_MIGRATION_ON_STARTUP} + - WORKSPACE_ROOT=${WORKSPACE_ROOT} + - WORKSPACE_DOCKER_MOUNT=${WORKSPACE_DOCKER_MOUNT} + - LOCAL_ROOT=${LOCAL_ROOT} + - LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT} + - CONFIG_ROOT=${CONFIG_ROOT} + - TRACKING_STRATEGY=${TRACKING_STRATEGY} + - AIRBYTE_VERSION=${VERSION} + - AIRBYTE_ROLE=${AIRBYTE_ROLE:-} + - TEMPORAL_HOST=${TEMPORAL_HOST} + - WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT} + - S3_LOG_BUCKET=${S3_LOG_BUCKET} + - S3_LOG_BUCKET_REGION=${S3_LOG_BUCKET_REGION} + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - GCP_STORAGE_BUCKET=${GCP_STORAGE_BUCKET} + - LOG_LEVEL=${LOG_LEVEL} + - SUBMITTER_NUM_THREADS=${SUBMITTER_NUM_THREADS} + - RESOURCE_CPU_REQUEST=${RESOURCE_CPU_REQUEST} + - RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT} + - RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST} + - RESOURCE_MEMORY_LIMIT=${RESOURCE_MEMORY_LIMIT} + - MAX_SYNC_JOB_ATTEMPTS=${MAX_SYNC_JOB_ATTEMPTS} + - MAX_SYNC_TIMEOUT_DAYS=${MAX_SYNC_TIMEOUT_DAYS} + - INTERNAL_API_HOST=${INTERNAL_API_HOST} + volumes: + - workspace:${WORKSPACE_ROOT} + - ${LOCAL_ROOT}:${LOCAL_ROOT} + - data:${CONFIG_ROOT} + worker: + image: airbyte/worker:${VERSION} + logging: *default-logging + container_name: airbyte-worker + restart: unless-stopped environment: - WEBAPP_URL=${WEBAPP_URL} - DATABASE_USER=${DATABASE_USER} diff --git a/docs/deploying-airbyte/on-kubernetes.md b/docs/deploying-airbyte/on-kubernetes.md index 4b4a0e15b3c3..ec8013391555 100644 --- a/docs/deploying-airbyte/on-kubernetes.md +++ b/docs/deploying-airbyte/on-kubernetes.md @@ -145,6 +145,14 @@ Now visit [http://localhost:8000](http://localhost:8000) in your browser and sta * Volume sizes * You can modify `kube/resources/volume-*` files to specify different volume sizes for the persistent volumes backing Airbyte. +### Increasing job parallelism + +The number of simultaneous jobs (getting specs, checking connections, discovering schemas, and performing syncs) is limited by a few factors. First of all, the `SUBMITTER_NUM_THREADS` (set in the `.env` file for your Kustimization overlay) provides a global limit on the number of simultaneous jobs that can run across all worker pods. + +The number of worker pods can be changed by increasing the number of replicas for the `airbyte-worker` deployment. An example of a Kustomization patch that increases this number can be seen in `airbyte/kube/overlays/dev-integration-test/kustomization.yaml` and `airbyte/kube/overlays/dev-integration-test/parallelize-worker.yaml`. The number of simultaneous jobs on a specific worker pod is also limited by the number of ports exposed by the worker deployment and set by `TEMPORAL_WORKER_PORTS` in your `.env` file. Without additional ports used to communicate to connector pods, jobs will start to run but will hang until ports become available. + +You can also tune environment variables for the max simultaneous job types that can run on the worker pod by setting `MAX_SPEC_WORKERS`, `MAX_CHECK_WORKERS`, `MAX_DISCOVER_WORKERS`, `MAX_SYNC_WORKERS` for the worker pod deployment (not in the `.env` file). These values can be used if you want to create separate worker deployments for separate types of workers with different resource allocations. + ### Cloud logging Airbyte writes logs to two directories. App logs, including server and scheduler logs, are written to the `app-logging` directory. diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 2966a14dc6f2..61d64f2c9109 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -24,6 +24,7 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +# Maximum total simultaneous jobs across all worker nodes SUBMITTER_NUM_THREADS=10 # Miscellaneous diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index 8e0484c73a35..76c0dc98688c 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -15,6 +15,8 @@ images: newTag: dev - name: airbyte/webapp newTag: dev + - name: airbyte/worker + newTag: dev - name: temporalio/auto-setup newTag: 1.7.0 @@ -24,3 +26,4 @@ configMapGenerator: patchesStrategicMerge: - pod-antiaffinity.yaml + - parallelize-worker.yaml diff --git a/kube/overlays/dev-integration-test/parallelize-worker.yaml b/kube/overlays/dev-integration-test/parallelize-worker.yaml new file mode 100644 index 000000000000..43884355cd08 --- /dev/null +++ b/kube/overlays/dev-integration-test/parallelize-worker.yaml @@ -0,0 +1,6 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-worker +spec: + replicas: 2 diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 5cc9cfd6d573..899ca13d53ad 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -26,6 +26,7 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +# Maximum total simultaneous jobs across all worker nodes SUBMITTER_NUM_THREADS=10 # Miscellaneous diff --git a/kube/overlays/dev/kustomization.yaml b/kube/overlays/dev/kustomization.yaml index 886fa1311e6f..221220972c28 100644 --- a/kube/overlays/dev/kustomization.yaml +++ b/kube/overlays/dev/kustomization.yaml @@ -15,6 +15,8 @@ images: newTag: dev - name: airbyte/webapp newTag: dev + - name: airbyte/worker + newTag: dev - name: temporalio/auto-setup newTag: 1.7.0 diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 68a17a5bea60..0d03ab670bb4 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -26,6 +26,7 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +# Maximum total simultaneous jobs across all worker nodes SUBMITTER_NUM_THREADS=10 # Miscellaneous diff --git a/kube/overlays/stable-with-resource-limits/kustomization.yaml b/kube/overlays/stable-with-resource-limits/kustomization.yaml index 7a441e8376b5..943ae33f40e2 100644 --- a/kube/overlays/stable-with-resource-limits/kustomization.yaml +++ b/kube/overlays/stable-with-resource-limits/kustomization.yaml @@ -15,6 +15,8 @@ images: newTag: 0.29.15-alpha - name: airbyte/webapp newTag: 0.29.15-alpha + - name: airbyte/worker + newTag: 0.29.15-alpha - name: temporalio/auto-setup newTag: 1.7.0 diff --git a/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml b/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml index 65233ca902e1..2d7047970e02 100644 --- a/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml +++ b/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml @@ -28,6 +28,20 @@ spec: --- apiVersion: apps/v1 kind: Deployment +metadata: + name: airbyte-worker +spec: + template: + spec: + containers: + - name: airbyte-worker-container + resources: + limits: + cpu: 2 + memory: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment metadata: name: airbyte-server spec: diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 68a17a5bea60..0d03ab670bb4 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -26,6 +26,7 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace LOCAL_ROOT=/tmp/airbyte_local +# Maximum total simultaneous jobs across all worker nodes SUBMITTER_NUM_THREADS=10 # Miscellaneous diff --git a/kube/overlays/stable/kustomization.yaml b/kube/overlays/stable/kustomization.yaml index a94ec681fdea..4e1db5555e2b 100644 --- a/kube/overlays/stable/kustomization.yaml +++ b/kube/overlays/stable/kustomization.yaml @@ -15,6 +15,8 @@ images: newTag: 0.29.15-alpha - name: airbyte/webapp newTag: 0.29.15-alpha + - name: airbyte/worker + newTag: 0.29.15-alpha - name: temporalio/auto-setup newTag: 1.7.0 diff --git a/kube/resources/kustomization.yaml b/kube/resources/kustomization.yaml index f6e422853a69..f83707d3f39c 100644 --- a/kube/resources/kustomization.yaml +++ b/kube/resources/kustomization.yaml @@ -13,3 +13,4 @@ resources: - volume-configs.yaml - volume-db.yaml - webapp.yaml + - worker.yaml diff --git a/kube/resources/scheduler.yaml b/kube/resources/scheduler.yaml index e7fa1664668c..2a5b833bce78 100644 --- a/kube/resources/scheduler.yaml +++ b/kube/resources/scheduler.yaml @@ -12,8 +12,6 @@ spec: labels: airbyte: scheduler spec: - serviceAccountName: airbyte-admin - automountServiceAccountToken: true containers: - name: airbyte-scheduler-container image: airbyte/scheduler @@ -169,38 +167,6 @@ spec: configMapKeyRef: name: airbyte-env key: INTERNAL_API_HOST - ports: - - containerPort: 9000 # for heartbeat server - - containerPort: 9001 # start temporal worker port pool - - containerPort: 9002 - - containerPort: 9003 - - containerPort: 9004 - - containerPort: 9005 - - containerPort: 9006 - - containerPort: 9007 - - containerPort: 9008 - - containerPort: 9009 - - containerPort: 9010 - - containerPort: 9011 - - containerPort: 9012 - - containerPort: 9013 - - containerPort: 9014 - - containerPort: 9015 - - containerPort: 9016 - - containerPort: 9017 - - containerPort: 9018 - - containerPort: 9019 - - containerPort: 9020 - - containerPort: 9021 - - containerPort: 9022 - - containerPort: 9023 - - containerPort: 9024 - - containerPort: 9025 - - containerPort: 9026 - - containerPort: 9027 - - containerPort: 9028 - - containerPort: 9029 - - containerPort: 9030 # end temporal worker port pool volumeMounts: - name: gcs-log-creds-volume mountPath: /secrets/gcs-log-creds diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml new file mode 100644 index 000000000000..afd2f1fba82a --- /dev/null +++ b/kube/resources/worker.yaml @@ -0,0 +1,211 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-worker +spec: + replicas: 1 + selector: + matchLabels: + airbyte: worker + template: + metadata: + labels: + airbyte: worker + spec: + serviceAccountName: airbyte-admin + automountServiceAccountToken: true + containers: + - name: airbyte-worker-container + image: airbyte/worker + env: + - name: AIRBYTE_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AIRBYTE_VERSION + - name: CONFIG_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONFIG_ROOT + - name: DATABASE_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_HOST + - name: DATABASE_PORT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_PORT + - name: DATABASE_PASSWORD + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_PASSWORD + - name: DATABASE_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_URL + - name: DATABASE_USER + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_USER + - name: TRACKING_STRATEGY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TRACKING_STRATEGY + - name: WORKSPACE_DOCKER_MOUNT + value: workspace + - name: WORKSPACE_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKSPACE_ROOT + - name: WORKER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKER_ENVIRONMENT + - name: LOCAL_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOCAL_ROOT + - name: WEBAPP_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WEBAPP_URL + - name: TEMPORAL_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_HOST + - name: TEMPORAL_WORKER_PORTS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_WORKER_PORTS + - name: LOG_LEVEL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOG_LEVEL + - name: KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SUBMITTER_NUM_THREADS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: SUBMITTER_NUM_THREADS + - name: RESOURCE_CPU_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: RESOURCE_CPU_REQUEST + - name: RESOURCE_CPU_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: RESOURCE_CPU_LIMIT + - name: RESOURCE_MEMORY_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: RESOURCE_MEMORY_REQUEST + - name: RESOURCE_MEMORY_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: RESOURCE_MEMORY_LIMIT + - name: S3_LOG_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET + - name: S3_LOG_BUCKET_REGION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET_REGION + - name: AWS_ACCESS_KEY_ID + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AWS_SECRET_ACCESS_KEY + - name: S3_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_MINIO_ENDPOINT + - name: S3_PATH_STYLE_ACCESS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_PATH_STYLE_ACCESS + - name: GOOGLE_APPLICATION_CREDENTIALS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: GOOGLE_APPLICATION_CREDENTIALS + - name: GCP_STORAGE_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: GCP_STORAGE_BUCKET + - name: INTERNAL_API_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: INTERNAL_API_HOST + ports: + - containerPort: 9000 # for heartbeat server + - containerPort: 9001 # start temporal worker port pool + - containerPort: 9002 + - containerPort: 9003 + - containerPort: 9004 + - containerPort: 9005 + - containerPort: 9006 + - containerPort: 9007 + - containerPort: 9008 + - containerPort: 9009 + - containerPort: 9010 + - containerPort: 9011 + - containerPort: 9012 + - containerPort: 9013 + - containerPort: 9014 + - containerPort: 9015 + - containerPort: 9016 + - containerPort: 9017 + - containerPort: 9018 + - containerPort: 9019 + - containerPort: 9020 + - containerPort: 9021 + - containerPort: 9022 + - containerPort: 9023 + - containerPort: 9024 + - containerPort: 9025 + - containerPort: 9026 + - containerPort: 9027 + - containerPort: 9028 + - containerPort: 9029 + - containerPort: 9030 # end temporal worker port pool + volumeMounts: + - name: gcs-log-creds-volume + mountPath: /secrets/gcs-log-creds + readOnly: true + volumes: + - name: gcs-log-creds-volume + secret: + secretName: gcs-log-creds diff --git a/settings.gradle b/settings.gradle index 1520ce2918c9..83239efdff1f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ if(!System.getenv().containsKey("SUB_BUILD")) { } // shared +include ':airbyte-api' include ':airbyte-commons' include ':airbyte-commons-docker' include ':airbyte-json-validation' @@ -38,7 +39,6 @@ include ':airbyte-test-utils' // platform if(!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD") == "PLATFORM") { include ':airbyte-analytics' - include ':airbyte-api' include ':airbyte-cli' include ':airbyte-db:jooq' include ':airbyte-config:init' diff --git a/tools/bin/acceptance_test_kube.sh b/tools/bin/acceptance_test_kube.sh index a91773122df0..9532af93616b 100755 --- a/tools/bin/acceptance_test_kube.sh +++ b/tools/bin/acceptance_test_kube.sh @@ -12,6 +12,7 @@ echo "Loading images into KIND..." kind load docker-image airbyte/server:dev --name chart-testing kind load docker-image airbyte/scheduler:dev --name chart-testing kind load docker-image airbyte/webapp:dev --name chart-testing +kind load docker-image airbyte/worker:dev --name chart-testing kind load docker-image airbyte/db:dev --name chart-testing echo "Starting app..." @@ -55,5 +56,22 @@ kubectl port-forward svc/airbyte-server-svc 8001:8001 & echo "Running worker integration tests..." SUB_BUILD=PLATFORM ./gradlew :airbyte-workers:integrationTest --scan +echo "Printing system disk usage..." +df -h + +echo "Printing docker disk usage..." +docker system df + +if [ -n "$CI" ]; then + echo "Pruning all images..." + docker image prune --all --force + + echo "Printing system disk usage after pruning..." + df -h + + echo "Printing docker disk usage after pruning..." + docker system df +fi + echo "Running e2e tests via gradle..." KUBE=true SUB_BUILD=PLATFORM USE_EXTERNAL_DEPLOYMENT=true ./gradlew :airbyte-tests:acceptanceTests --scan