Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
make status checks configurable from env vars + use shorter replicati…
Browse files Browse the repository at this point in the history
…on interval for testing (airbytehq#10368)

* make status check interval env-configurable

* apply to test files to get the speed improvements

* evert "apply to test files to get the speed improvements"

This reverts commit 97159e3.

* Revert "evert "apply to test files to get the speed improvements""

This reverts commit bf3c6a5.
  • Loading branch information
jrhizor authored Feb 16, 2022
1 parent 1f90843 commit 3da09aa
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 11 deletions.
36 changes: 36 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -293,6 +294,41 @@ public interface Configs {
*/
String getJobKubeNamespace();

/**
* Define the interval for checking for a Kubernetes pod status for a worker of an unspecified type.
*
* In seconds if specified by environment variable.
*/
Duration getDefaultWorkerStatusCheckInterval();

/**
* Define the interval for checking for "get spec" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getSpecWorkerStatusCheckInterval();

/**
* Define the interval for checking for "check connection" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getCheckWorkerStatusCheckInterval();

/**
* Define the interval for checking for "discover" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getDiscoverWorkerStatusCheckInterval();

/**
* Define the interval for checking for "replication" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getReplicationWorkerStatusCheckInterval();

// Logging/Monitoring/Tracking
/**
* Define either S3, Minio or GCS as a logging backend. Kubernetes only. Multiple variables are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -117,6 +118,12 @@ public class EnvConfigs implements Configs {
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

private static final String DEFAULT_WORKER_STATUS_CHECK_INTERVAL = "DEFAULT_WORKER_STATUS_CHECK_INTERVAL";
private static final String SPEC_WORKER_STATUS_CHECK_INTERVAL = "SPEC_WORKER_STATUS_CHECK_INTERVAL";
private static final String CHECK_WORKER_STATUS_CHECK_INTERVAL = "CHECK_WORKER_STATUS_CHECK_INTERVAL";
private static final String DISCOVER_WORKER_STATUS_CHECK_INTERVAL = "DISCOVER_WORKER_STATUS_CHECK_INTERVAL";
private static final String REPLICATION_WORKER_STATUS_CHECK_INTERVAL = "REPLICATION_WORKER_STATUS_CHECK_INTERVAL";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
Expand All @@ -133,6 +140,12 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000;

private static final Duration DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final Duration DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
public static final long DEFAULT_MAX_CHECK_WORKERS = 5;
public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
Expand Down Expand Up @@ -532,6 +545,46 @@ public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public Duration getDefaultWorkerStatusCheckInterval() {
return getEnvOrDefault(
DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getSpecWorkerStatusCheckInterval() {
return getEnvOrDefault(
SPEC_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getCheckWorkerStatusCheckInterval() {
return getEnvOrDefault(
CHECK_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getDiscoverWorkerStatusCheckInterval() {
return getEnvOrDefault(
DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getReplicationWorkerStatusCheckInterval() {
return getEnvOrDefault(
REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
@AllArgsConstructor
public class WorkerConfigs {

private static final Duration DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final Duration SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);

private final Configs.WorkerEnvironment workerEnvironment;
private final ResourceRequirements resourceRequirements;
private final List<TolerationPOJO> workerKubeTolerations;
Expand Down Expand Up @@ -54,7 +48,7 @@ public WorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
DEFAULT_WORKER_STATUS_CHECK_INTERVAL);
configs.getDefaultWorkerStatusCheckInterval());
}

/**
Expand All @@ -80,7 +74,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
SPEC_WORKER_STATUS_CHECK_INTERVAL);
configs.getSpecWorkerStatusCheckInterval());
}

/**
Expand All @@ -106,7 +100,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
CHECK_WORKER_STATUS_CHECK_INTERVAL);
configs.getCheckWorkerStatusCheckInterval());
}

/**
Expand All @@ -132,7 +126,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
DISCOVER_WORKER_STATUS_CHECK_INTERVAL);
configs.getDiscoverWorkerStatusCheckInterval());
}

public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) {
Expand All @@ -151,7 +145,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs)
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
REPLICATION_WORKER_STATUS_CHECK_INTERVAL);
configs.getReplicationWorkerStatusCheckInterval());
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down
3 changes: 3 additions & 0 deletions kube/overlays/dev-integration-test-schedulerv2/.env
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
NEW_SCHEDULER=true
CONTAINER_ORCHESTRATOR_ENABLED=true

# Set this lower to speed up tests significantly
REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1

Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ spec:
configMapKeyRef:
name: airbyte-env
key: CONTAINER_ORCHESTRATOR_ENABLED
- name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
valueFrom:
configMapKeyRef:
name: airbyte-env
key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
3 changes: 3 additions & 0 deletions kube/overlays/dev-integration-test/.env
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
# Launch a separate pod to orchestrate sync steps
CONTAINER_ORCHESTRATOR_ENABLED=false

# Set this lower to speed up tests significantly
REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1

1 change: 1 addition & 0 deletions kube/overlays/dev-integration-test/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ secretGenerator:

patchesStrategicMerge:
- parallelize-worker.yaml
- speed-up-worker.yaml
15 changes: 15 additions & 0 deletions kube/overlays/dev-integration-test/speed-up-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: airbyte-worker
spec:
template:
spec:
containers:
- name: airbyte-worker-container
env:
- name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
valueFrom:
configMapKeyRef:
name: airbyte-env
key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL

0 comments on commit 3da09aa

Please sign in to comment.