Skip to content

Commit

Permalink
split scheduler and worker (airbytehq#5737)
Browse files Browse the repository at this point in the history
* docker-compose split of scheduler and worker

* fix heartbeat location bug + add support for kubernetes

* use two workers in integration tests

* capture logs in AirbyteTestContainer

* add waiting

* rename to make it easier to review

* rename module

* fix remaining conflicts

* allow configuring max workers of each type and document usage

* fix build

* remove comment

* add worker resource requiremetns

* try to fix for connector build

* fix regression in biuld

* add env comments for SUBMITTER_NUM_THREADS

* Update airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java

Co-authored-by: Davin Chia <[email protected]>

* Update airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalPool.java

Co-authored-by: Davin Chia <[email protected]>

* merge temporalpool into workerapp

* output docker system info

* move check to before

* remove unnecessary parts of the patch

* could this be the problem? i thought i added this

* show disk usage

* add print statements

* add pruning

* fix prune option

* use force

Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
jrhizor and davinchia authored Sep 8, 2021
1 parent bbe44bf commit c7d8055
Show file tree
Hide file tree
Showing 31 changed files with 639 additions and 229 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp

# Maximum simultaneous jobs
SUBMITTER_NUM_THREADS=10

# Miscellaneous
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ jobs:
- role: worker
EOF
- name: Check Docker System Info
run: docker system info

- name: KIND Kubernetes Cluster Setup
uses: helm/[email protected]
with:
Expand All @@ -475,13 +478,18 @@ jobs:
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
GOOGLE_CLOUD_STORAGE_TEST_CREDS: ${{ secrets.GOOGLE_CLOUD_STORAGE_TEST_CREDS }}

- name: Show Disk Usage
run: |
df -h
docker system df
- name: Run Kubernetes End-to-End Acceptance Tests
env:
USER: root
HOME: /home/runner
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
run: |
IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
CI=true IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
# In case of self-hosted EC2 errors, remove this block.
stop-kube-acceptance-test-runner:
name: Stop Kube Acceptance Test EC2 Runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public interface Configs {

List<WorkerPodToleration> getWorkerPodTolerations();

MaxWorkersConfig getMaxWorkers();

String getTemporalHost();

Set<Integer> getTemporalWorkerPorts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class EnvConfigs implements Configs {
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
public static final String MAX_SPEC_WORKERS = "MAX_SPEC_WORKERS";
public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS";
public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS";
public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE";
Expand All @@ -87,6 +91,11 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
public static final long DEFAULT_MAX_CHECK_WORKERS = 5;
public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
public static final long DEFAULT_MAX_SYNC_WORKERS = 5;

public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;
Expand Down Expand Up @@ -292,6 +301,15 @@ public List<WorkerPodToleration> getWorkerPodTolerations() {
.collect(Collectors.toList());
}

@Override
public MaxWorkersConfig getMaxWorkers() {
return new MaxWorkersConfig(
Math.toIntExact(getEnvOrDefault(MAX_SPEC_WORKERS, DEFAULT_MAX_SPEC_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_CHECK_WORKERS, DEFAULT_MAX_CHECK_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_DISCOVER_WORKERS, DEFAULT_MAX_DISCOVER_WORKERS)),
Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS)));
}

@Override
public String getTemporalHost() {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.config;

public class MaxWorkersConfig {

private final int maxSpecWorkers;
private final int maxCheckWorkers;
private final int maxDiscoverWorkers;
private final int maxSyncWorkers;

public MaxWorkersConfig(final int maxSpecWorkers, final int maxCheckWorkers, final int maxDiscoverWorkers, final int maxSyncWorkers) {
this.maxSpecWorkers = maxSpecWorkers;
this.maxCheckWorkers = maxCheckWorkers;
this.maxDiscoverWorkers = maxDiscoverWorkers;
this.maxSyncWorkers = maxSyncWorkers;
}

public int getMaxSpecWorkers() {
return maxSpecWorkers;
}

public int getMaxCheckWorkers() {
return maxCheckWorkers;
}

public int getMaxDiscoverWorkers() {
return maxDiscoverWorkers;
}

public int getMaxSyncWorkers() {
return maxSyncWorkers;
}

}
24 changes: 1 addition & 23 deletions airbyte-scheduler/app/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,32 +1,10 @@
FROM openjdk:14.0.2-slim AS scheduler


# Install Docker to launch worker images. Eventually should be replaced with Docker-java.
# See https://gitter.im/docker-java/docker-java?at=5f3eb87ba8c1780176603f4e for more information on why we are not currently using Docker-java
RUN apt-get update && apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add -
RUN add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/debian \
$(lsb_release -cs) \
stable"
RUN apt-get update && apt-get install -y docker-ce-cli jq

ENV APPLICATION airbyte-scheduler

WORKDIR /app

# Install kubectl
RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.14/bin/linux/amd64/kubectl
RUN chmod +x ./kubectl
RUN mv ./kubectl /usr/local/bin

# Move and run scheduler
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
COPY build/distributions/${APPLICATION}-0*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.ApiException;
import io.airbyte.api.client.model.HealthCheckRead;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
Expand All @@ -49,21 +46,9 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
import io.airbyte.workers.temporal.TemporalUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -92,43 +77,33 @@ public class SchedulerApp {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);

private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
private static int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads());
private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads());
private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5);
private static final Duration CLEANING_DELAY = Duration.ofHours(2);
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();
private static final int KUBE_HEARTBEAT_PORT = 9000;

private final Path workspaceRoot;
private final ProcessFactory processFactory;
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final JobCleaner jobCleaner;
private final JobNotifier jobNotifier;
private final TemporalClient temporalClient;
private final WorkflowServiceStubs temporalService;

public SchedulerApp(Path workspaceRoot,
ProcessFactory processFactory,
JobPersistence jobPersistence,
ConfigRepository configRepository,
JobCleaner jobCleaner,
JobNotifier jobNotifier,
TemporalClient temporalClient,
WorkflowServiceStubs temporalService) {
TemporalClient temporalClient) {
this.workspaceRoot = workspaceRoot;
this.processFactory = processFactory;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.jobCleaner = jobCleaner;
this.jobNotifier = jobNotifier;
this.temporalClient = temporalClient;
this.temporalService = temporalService;
}

public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory);
temporalPool.run();

final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
Expand Down Expand Up @@ -178,42 +153,6 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final ApiClient officialClient = Config.defaultClient();
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getKubeNamespace());
return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl);
} else {
return new DockerProcessFactory(
configs.getWorkspaceRoot(),
configs.getWorkspaceDockerMount(),
configs.getLocalDockerMount(),
configs.getDockerNetwork());
}
}

private static void waitForServer(Configs configs) throws InterruptedException {
final AirbyteApiClient apiClient = new AirbyteApiClient(
new io.airbyte.api.client.invoker.ApiClient().setScheme("http")
.setHost(configs.getAirbyteApiHost())
.setPort(configs.getAirbyteApiPort())
.setBasePath("/api"));

boolean isHealthy = false;
while (!isHealthy) {
try {
HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck();
isHealthy = healthCheck.getDb();
} catch (ApiException e) {
LOGGER.info("Waiting for server to become available...");
Thread.sleep(2000);
}
}
}

public static void main(String[] args) throws IOException, InterruptedException {

final Configs configs = new EnvConfigs();
Expand All @@ -227,7 +166,7 @@ public static void main(String[] args) throws IOException, InterruptedException
LOGGER.info("temporalHost = " + temporalHost);

// Wait for the server to initialize the database and run migration
waitForServer(configs);
WorkerApp.waitForServer(configs);

LOGGER.info("Creating Job DB connection pool...");
final Database jobDatabase = new JobsDatabaseInstance(
Expand All @@ -236,8 +175,6 @@ public static void main(String[] args) throws IOException, InterruptedException
configs.getDatabaseUrl())
.getInitialized();

final ProcessFactory processFactory = getProcessBuilderFactory(configs);

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final Database configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
Expand All @@ -252,26 +189,6 @@ public static void main(String[] args) throws IOException, InterruptedException
jobPersistence);
final JobNotifier jobNotifier = new JobNotifier(configs.getWebappUrl(), configRepository, new WorkspaceHelper(configRepository, jobPersistence));

if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
var supportedWorkers = KubePortManagerSingleton.getSupportedWorkers();
if (supportedWorkers < SUBMITTER_NUM_THREADS) {
LOGGER.warn("{} workers configured with only {} ports available. Insufficient ports. Setting workers to {}.", SUBMITTER_NUM_THREADS,
KubePortManagerSingleton.getNumAvailablePorts(), supportedWorkers);
SUBMITTER_NUM_THREADS = supportedWorkers;
}

Map<String, String> mdc = MDC.getCopyOfContextMap();
Executors.newSingleThreadExecutor().submit(
() -> {
MDC.setContextMap(mdc);
try {
new WorkerHeartbeatServer(KUBE_HEARTBEAT_PORT).start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get());

TrackingClientSingleton.initialize(
Expand All @@ -281,11 +198,10 @@ public static void main(String[] args) throws IOException, InterruptedException
configs.getAirbyteVersion(),
configRepository);

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);

LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, processFactory, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient, temporalService)
new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient)
.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void start() throws IOException, InterruptedException {
serviceLogConsumer(dockerComposeContainer, "scheduler");
serviceLogConsumer(dockerComposeContainer, "server");
serviceLogConsumer(dockerComposeContainer, "webapp");
serviceLogConsumer(dockerComposeContainer, "worker");
serviceLogConsumer(dockerComposeContainer, "airbyte-temporal");

dockerComposeContainer.start();
Expand Down
33 changes: 33 additions & 0 deletions airbyte-workers/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM openjdk:14.0.2-slim AS worker

# Install Docker to launch worker images. Eventually should be replaced with Docker-java.
# See https://gitter.im/docker-java/docker-java?at=5f3eb87ba8c1780176603f4e for more information on why we are not currently using Docker-java
RUN apt-get update && apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add -
RUN add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/debian \
$(lsb_release -cs) \
stable"
RUN apt-get update && apt-get install -y docker-ce-cli jq

ENV APPLICATION airbyte-workers

WORKDIR /app

# Install kubectl
RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.17.14/bin/linux/amd64/kubectl
RUN chmod +x ./kubectl
RUN mv ./kubectl /usr/local/bin

# Move and run worker
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

# wait for upstream dependencies to become available before starting server
ENTRYPOINT ["/bin/bash", "-c", "bin/${APPLICATION}"]
Loading

0 comments on commit c7d8055

Please sign in to comment.