diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 564bdf16cd61..a41d56162237 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -8,6 +8,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.storage.CloudStorageConfigs; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -293,6 +294,41 @@ public interface Configs { */ String getJobKubeNamespace(); + /** + * Define the interval for checking for a Kubernetes pod status for a worker of an unspecified type. + * + * In seconds if specified by environment variable. + */ + Duration getDefaultWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "get spec" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getSpecWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "check connection" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getCheckWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "discover" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getDiscoverWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "replication" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getReplicationWorkerStatusCheckInterval(); + // Logging/Monitoring/Tracking /** * Define either S3, Minio or GCS as a logging backend. Kubernetes only. Multiple variables are diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 985d40cb17eb..958d62447c58 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -15,6 +15,7 @@ import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig; import io.airbyte.config.storage.CloudStorageConfigs.S3Config; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -117,6 +118,12 @@ public class EnvConfigs implements Configs { private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST"; private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT"; + private static final String DEFAULT_WORKER_STATUS_CHECK_INTERVAL = "DEFAULT_WORKER_STATUS_CHECK_INTERVAL"; + private static final String SPEC_WORKER_STATUS_CHECK_INTERVAL = "SPEC_WORKER_STATUS_CHECK_INTERVAL"; + private static final String CHECK_WORKER_STATUS_CHECK_INTERVAL = "CHECK_WORKER_STATUS_CHECK_INTERVAL"; + private static final String DISCOVER_WORKER_STATUS_CHECK_INTERVAL = "DISCOVER_WORKER_STATUS_CHECK_INTERVAL"; + private static final String REPLICATION_WORKER_STATUS_CHECK_INTERVAL = "REPLICATION_WORKER_STATUS_CHECK_INTERVAL"; + // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; @@ -133,6 +140,12 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000; private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000; + private static final Duration DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); + private static final Duration DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); + public static final long DEFAULT_MAX_SPEC_WORKERS = 5; public static final long DEFAULT_MAX_CHECK_WORKERS = 5; public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5; @@ -532,6 +545,46 @@ public String getJobKubeNamespace() { return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE); } + @Override + public Duration getDefaultWorkerStatusCheckInterval() { + return getEnvOrDefault( + DEFAULT_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getSpecWorkerStatusCheckInterval() { + return getEnvOrDefault( + SPEC_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getCheckWorkerStatusCheckInterval() { + return getEnvOrDefault( + CHECK_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getDiscoverWorkerStatusCheckInterval() { + return getEnvOrDefault( + DISCOVER_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getReplicationWorkerStatusCheckInterval() { + return getEnvOrDefault( + REPLICATION_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + @Override public String getJobMainContainerCpuRequest() { return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java index 39b785957b23..10d85682f73e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java @@ -16,12 +16,6 @@ @AllArgsConstructor public class WorkerConfigs { - private static final Duration DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); - private static final Duration SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); - private final Configs.WorkerEnvironment workerEnvironment; private final ResourceRequirements resourceRequirements; private final List workerKubeTolerations; @@ -54,7 +48,7 @@ public WorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - DEFAULT_WORKER_STATUS_CHECK_INTERVAL); + configs.getDefaultWorkerStatusCheckInterval()); } /** @@ -80,7 +74,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - SPEC_WORKER_STATUS_CHECK_INTERVAL); + configs.getSpecWorkerStatusCheckInterval()); } /** @@ -106,7 +100,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - CHECK_WORKER_STATUS_CHECK_INTERVAL); + configs.getCheckWorkerStatusCheckInterval()); } /** @@ -132,7 +126,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - DISCOVER_WORKER_STATUS_CHECK_INTERVAL); + configs.getDiscoverWorkerStatusCheckInterval()); } public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) { @@ -151,7 +145,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - REPLICATION_WORKER_STATUS_CHECK_INTERVAL); + configs.getReplicationWorkerStatusCheckInterval()); } public Configs.WorkerEnvironment getWorkerEnvironment() { diff --git a/kube/overlays/dev-integration-test-schedulerv2/.env b/kube/overlays/dev-integration-test-schedulerv2/.env index c60ac581bd11..a2f097809667 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/.env +++ b/kube/overlays/dev-integration-test-schedulerv2/.env @@ -68,3 +68,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= NEW_SCHEDULER=true CONTAINER_ORCHESTRATOR_ENABLED=true +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 32a6bbff83b8..8e790435392e 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -13,3 +13,8 @@ spec: configMapKeyRef: name: airbyte-env key: CONTAINER_ORCHESTRATOR_ENABLED + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index a09e9b9ae23b..b7bf847ea753 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -67,3 +67,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=false +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index 04f0725377a9..d66fa91f5075 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -32,3 +32,4 @@ secretGenerator: patchesStrategicMerge: - parallelize-worker.yaml + - speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test/speed-up-worker.yaml b/kube/overlays/dev-integration-test/speed-up-worker.yaml new file mode 100644 index 000000000000..ce9e6a00a32e --- /dev/null +++ b/kube/overlays/dev-integration-test/speed-up-worker.yaml @@ -0,0 +1,15 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-worker +spec: + template: + spec: + containers: + - name: airbyte-worker-container + env: + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL