Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
use Kubernetes watch api for retrieving exit codes (airbytehq#11083)
Browse files Browse the repository at this point in the history
* use kubernetes api for retrieving exit codes

* undelete test

* clean up more status check interval

* fmt

* wip

* clean up

* smarter filtering

* reordering

* exception handling

* better logging for test + speed up acceptance tests temp

* re-enable running on branch

* fix race condition in test

* add log

* trigger build

* trigger build

* re-run tests with everything enabled

* run tests

* run tests

* clean up

* respond to comments

* fix formatting

* fix whitespace

* remove comment

* 10 -> 5

* log exit code error message
  • Loading branch information
jrhizor authored Mar 31, 2022
1 parent b75295a commit 493f0ea
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 239 deletions.
36 changes: 0 additions & 36 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -342,41 +341,6 @@ public interface Configs {
*/
String getJobKubeNamespace();

/**
* Define the interval for checking for a Kubernetes pod status for a worker of an unspecified type.
*
* In seconds if specified by environment variable. Airbyte internal use only.
*/
Duration getDefaultWorkerStatusCheckInterval();

/**
* Define the interval for checking for "get spec" Kubernetes pod statuses.
*
* In seconds if specified by environment variable. Airbyte internal use only.
*/
Duration getSpecWorkerStatusCheckInterval();

/**
* Define the interval for checking for "check connection" Kubernetes pod statuses.
*
* In seconds if specified by environment variable. Airbyte internal use only.
*/
Duration getCheckWorkerStatusCheckInterval();

/**
* Define the interval for checking for "discover" Kubernetes pod statuses.
*
* In seconds if specified by environment variable. Airbyte internal use only.
*/
Duration getDiscoverWorkerStatusCheckInterval();

/**
* Define the interval for checking for "replication" Kubernetes pod statuses.
*
* In seconds if specified by environment variable. Airbyte internal use only.
*/
Duration getReplicationWorkerStatusCheckInterval();

// Logging/Monitoring/Tracking
/**
* Define either S3, Minio or GCS as a logging backend. Kubernetes only. Multiple variables are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -134,12 +133,6 @@ public class EnvConfigs implements Configs {
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

private static final String DEFAULT_WORKER_STATUS_CHECK_INTERVAL = "DEFAULT_WORKER_STATUS_CHECK_INTERVAL";
private static final String SPEC_WORKER_STATUS_CHECK_INTERVAL = "SPEC_WORKER_STATUS_CHECK_INTERVAL";
private static final String CHECK_WORKER_STATUS_CHECK_INTERVAL = "CHECK_WORKER_STATUS_CHECK_INTERVAL";
private static final String DISCOVER_WORKER_STATUS_CHECK_INTERVAL = "DISCOVER_WORKER_STATUS_CHECK_INTERVAL";
private static final String REPLICATION_WORKER_STATUS_CHECK_INTERVAL = "REPLICATION_WORKER_STATUS_CHECK_INTERVAL";

static final String CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST = "CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST";
static final String CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT = "CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT";
static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST";
Expand All @@ -161,12 +154,6 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000;

private static final Duration DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final Duration DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
public static final long DEFAULT_MAX_CHECK_WORKERS = 5;
public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
Expand Down Expand Up @@ -586,46 +573,6 @@ public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public Duration getDefaultWorkerStatusCheckInterval() {
return getEnvOrDefault(
DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getSpecWorkerStatusCheckInterval() {
return getEnvOrDefault(
SPEC_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getCheckWorkerStatusCheckInterval() {
return getEnvOrDefault(
CHECK_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getDiscoverWorkerStatusCheckInterval() {
return getEnvOrDefault(
DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getReplicationWorkerStatusCheckInterval() {
return getEnvOrDefault(
REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ public class AcceptanceTests {
@SuppressWarnings("UnstableApiUsage")
@BeforeAll
public static void init() throws URISyntaxException, IOException, InterruptedException {
System.out.println("in init");
if (IS_GKE && !IS_KUBE) {
throw new RuntimeException("KUBE Flag should also be enabled if GKE flag is enabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.airbyte.config.Configs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,7 +25,6 @@ public class WorkerConfigs {
private final String jobBusyboxImage;
private final String jobCurlImage;
private final Map<String, String> envMap;
private final Duration workerStatusCheckInterval;

/**
* Constructs a job-type-agnostic WorkerConfigs. For WorkerConfigs customized for specific
Expand All @@ -47,8 +45,7 @@ public WorkerConfigs(final Configs configs) {
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
configs.getDefaultWorkerStatusCheckInterval());
configs.getJobDefaultEnvMap());
}

/**
Expand All @@ -73,8 +70,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) {
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
configs.getSpecWorkerStatusCheckInterval());
configs.getJobDefaultEnvMap());
}

/**
Expand All @@ -99,8 +95,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) {
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
configs.getCheckWorkerStatusCheckInterval());
configs.getJobDefaultEnvMap());
}

/**
Expand All @@ -125,8 +120,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) {
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
configs.getDiscoverWorkerStatusCheckInterval());
configs.getJobDefaultEnvMap());
}

public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) {
Expand All @@ -144,8 +138,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs)
configs.getJobKubeSocatImage(),
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
configs.getReplicationWorkerStatusCheckInterval());
configs.getJobDefaultEnvMap());
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down Expand Up @@ -188,8 +181,4 @@ public Map<String, String> getEnvMap() {
return envMap;
}

public Duration getWorkerStatusCheckInterval() {
return workerStatusCheckInterval;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private int computeExitValue() {
}

// If the pod does exist, it may be in a terminal (error or completed) state.
final boolean isTerminal = KubePodProcess.isTerminal(pod);
final boolean isTerminal = KubePodResourceHelper.isTerminal(pod);

if (isTerminal) {
// In case the doc store was updated in between when we pulled it and when
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.process;

import com.google.common.collect.MoreCollectors;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

/**
* The exit code watcher uses the Kubernetes watch API, which provides a subscription to events for
* a pod. This subscription has better latency than polling at the expense of keeping a connection
* open with the Kubernetes API server. Since it offers all events, it helps us handle cases like
* where a pod is swept or deleted immediately after running on a Kubernetes cluster (we will still
* be able to retrieve the exit code).
*/
@Slf4j
public class ExitCodeWatcher implements Watcher<Pod> {

private final Consumer<Integer> onExitCode;
private final Consumer<WatcherException> onWatchFailure;
private boolean exitCodeRetrieved = false;

/**
*
* @param onExitCode callback used to store the exit code
* @param onWatchFailure callback that's triggered when the watch fails. should be some failed exit
* code.
*/
public ExitCodeWatcher(final Consumer<Integer> onExitCode, final Consumer<WatcherException> onWatchFailure) {
this.onExitCode = onExitCode;
this.onWatchFailure = onWatchFailure;
}

@Override
public void eventReceived(Action action, Pod resource) {
try {
if (!exitCodeRetrieved && KubePodResourceHelper.isTerminal(resource)) {
final ContainerStatus mainContainerStatus = resource.getStatus().getContainerStatuses()
.stream()
.filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME))
.collect(MoreCollectors.onlyElement());

if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) {
final int exitCode = mainContainerStatus.getState().getTerminated().getExitCode();
log.info("Processing event with exit code " + exitCode + " for pod: " + resource.getMetadata().getName());
onExitCode.accept(exitCode);
exitCodeRetrieved = true;
}
}
} catch (Exception e) {
String podName = "<unknown_name>";
if (resource.getMetadata() != null) {
podName = resource.getMetadata().getName();
}

log.error("ExitCodeWatcher event handling failed for pod: " + podName, e);
}
}

@Override
public void onClose(WatcherException cause) {
onWatchFailure.accept(cause);
}

}
Loading

0 comments on commit 493f0ea

Please sign in to comment.