Skip to content

Commit

Permalink
Re-enable env vars for rss config (#9304)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Oct 18, 2023
1 parent 27a23a3 commit 7eedfc7
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.workers.config;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.ResourceRequirements;
Expand Down Expand Up @@ -66,14 +67,11 @@ public WorkerConfigs(final WorkerEnvironment workerEnvironment,
* Constructs a job-type-agnostic WorkerConfigs. For WorkerConfigs customized for specific
* job-types, use static `build*JOBTYPE*WorkerConfigs` method if one exists.
*/
@VisibleForTesting
public WorkerConfigs(final Configs configs) {
this(
configs.getWorkerEnvironment(),
new ResourceRequirements()
.withCpuRequest(configs.getJobMainContainerCpuRequest())
.withCpuLimit(configs.getJobMainContainerCpuLimit())
.withMemoryRequest(configs.getJobMainContainerMemoryRequest())
.withMemoryLimit(configs.getJobMainContainerMemoryLimit()),
new ResourceRequirements(),
configs.getJobKubeTolerations(),
configs.getJobKubeNodeSelectors(),
configs.getUseCustomKubeNodeSelector() ? Optional.of(configs.getIsolatedJobKubeNodeSelectors()) : Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,14 @@ class WorkerConfigsTest {

private static final String JOB = "job";
private static final Map<String, String> DEFAULT_NODE_SELECTORS = ImmutableMap.of(JOB, "default");
private static final String DEFAULT_CPU_REQUEST = "0.1";
private static final String DEFAULT_CPU_LIMIT = "0.2";
private static final String DEFAULT_MEMORY_REQUEST = "100Mi";
private static final String DEFAULT_MEMORY_LIMIT = "200Mi";
private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new ResourceRequirements()
.withCpuRequest(DEFAULT_CPU_REQUEST)
.withCpuLimit(DEFAULT_CPU_LIMIT)
.withMemoryRequest(DEFAULT_MEMORY_REQUEST)
.withMemoryLimit(DEFAULT_MEMORY_LIMIT);
private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new ResourceRequirements();

private Configs configs;

@BeforeEach
void setup() {
configs = mock(EnvConfigs.class);
when(configs.getJobKubeNodeSelectors()).thenReturn(DEFAULT_NODE_SELECTORS);
when(configs.getJobMainContainerCpuRequest()).thenReturn(DEFAULT_CPU_REQUEST);
when(configs.getJobMainContainerCpuLimit()).thenReturn(DEFAULT_CPU_LIMIT);
when(configs.getJobMainContainerMemoryRequest()).thenReturn(DEFAULT_MEMORY_REQUEST);
when(configs.getJobMainContainerMemoryLimit()).thenReturn(DEFAULT_MEMORY_LIMIT);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,30 +295,6 @@ public interface Configs {
*/
boolean connectorSpecificResourceDefaultsEnabled();

/**
* Define the job container's minimum CPU usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
*/
String getJobMainContainerCpuRequest();

/**
* Define the job container's maximum CPU usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
*/
String getJobMainContainerCpuLimit();

/**
* Define the job container's minimum RAM usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
*/
String getJobMainContainerMemoryRequest();

/**
* Define the job container's maximum RAM usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
*/
String getJobMainContainerMemoryLimit();

/**
* Get datadog or OTEL metric client for Airbyte to emit metrics. Allows empty value
*/
Expand Down Expand Up @@ -365,54 +341,6 @@ public interface Configs {

// Jobs - Kube only

/**
* Define the check job container's minimum CPU request. Defaults to
* {@link #getJobMainContainerCpuRequest()} if not set. Internal-use only.
*/
String getCheckJobMainContainerCpuRequest();

/**
* Define the check job container's maximum CPU usage. Defaults to
* {@link #getJobMainContainerCpuLimit()} if not set. Internal-use only.
*/
String getCheckJobMainContainerCpuLimit();

/**
* Define the check job container's minimum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only.
*/
String getCheckJobMainContainerMemoryRequest();

/**
* Define the check job container's maximum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only.
*/
String getCheckJobMainContainerMemoryLimit();

/**
* Define the normalization job container's minimum CPU request. Defaults to
* {@link #getJobMainContainerCpuRequest()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerCpuRequest();

/**
* Define the normalization job container's maximum CPU usage. Defaults to
* {@link #getJobMainContainerCpuLimit()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerCpuLimit();

/**
* Define the normalization job container's minimum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerMemoryRequest();

/**
* Define the normalization job container's maximum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerMemoryLimit();

/**
* Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration
* contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,6 @@ public class EnvConfigs implements Configs {
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

static final String CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST = "CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST";
static final String CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT = "CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT";
static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST";
static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT";

static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT";

private static final String VAULT_ADDRESS = "VAULT_ADDRESS";
private static final String VAULT_PREFIX = "VAULT_PREFIX";
private static final String VAULT_AUTH_TOKEN = "VAULT_AUTH_TOKEN";
Expand Down Expand Up @@ -834,26 +824,6 @@ public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
}

@Override
public String getJobMainContainerCpuLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_LIMIT, DEFAULT_JOB_CPU_REQUIREMENT);
}

@Override
public String getJobMainContainerMemoryRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_REQUEST, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public String getJobMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public String getMetricClient() {
return getEnvOrDefault(METRIC_CLIENT, "");
Expand Down Expand Up @@ -905,46 +875,6 @@ public int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable() {
return getEnvOrDefault(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE);
}

@Override
public String getCheckJobMainContainerCpuRequest() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
}

@Override
public String getCheckJobMainContainerCpuLimit() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT, getJobMainContainerCpuLimit());
}

@Override
public String getCheckJobMainContainerMemoryRequest() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST, getJobMainContainerMemoryRequest());
}

@Override
public String getCheckJobMainContainerMemoryLimit() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit());
}

@Override
public String getNormalizationJobMainContainerCpuRequest() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
}

@Override
public String getNormalizationJobMainContainerCpuLimit() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT, getJobMainContainerCpuLimit());
}

@Override
public String getNormalizationJobMainContainerMemoryRequest() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST, getJobMainContainerMemoryRequest());
}

@Override
public String getNormalizationJobMainContainerMemoryLimit() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit());
}

@Override
public LogConfigs getLogConfigs() {
return logConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

@SuppressWarnings("PMD.NullAssignment")
Expand Down Expand Up @@ -371,76 +370,6 @@ void testDDConstantTags() {
assertEquals(2, config.getDDConstantTags().size());
}

@Nested
@DisplayName("CheckJobResourceSettings")
class CheckJobResourceSettings {

@Test
@DisplayName("should default to JobMainCpuRequest if not set")
void testCpuRequestDefaultToJobMainCpuRequest() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, null);
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_CPU_REQUEST, "1");
assertEquals("1", config.getCheckJobMainContainerCpuRequest());
}

@Test
@DisplayName("checkJobCpuRequest should take precedent if set")
void testCheckJobCpuRequestTakePrecedentIfSet() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, "1");
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_CPU_REQUEST, "2");
assertEquals("1", config.getCheckJobMainContainerCpuRequest());
}

@Test
@DisplayName("should default to JobMainCpuLimit if not set")
void testCpuLimitDefaultToJobMainCpuLimit() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT, null);
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_CPU_LIMIT, "1");
assertEquals("1", config.getCheckJobMainContainerCpuLimit());
}

@Test
@DisplayName("checkJobCpuLimit should take precedent if set")
void testCheckJobCpuLimitTakePrecedentIfSet() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT, "1");
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_CPU_LIMIT, "2");
assertEquals("1", config.getCheckJobMainContainerCpuLimit());
}

@Test
@DisplayName("should default to JobMainMemoryRequest if not set")
void testMemoryRequestDefaultToJobMainMemoryRequest() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST, null);
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_REQUEST, "1");
assertEquals("1", config.getCheckJobMainContainerMemoryRequest());
}

@Test
@DisplayName("checkJobMemoryRequest should take precedent if set")
void testCheckJobMemoryRequestTakePrecedentIfSet() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST, "1");
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_REQUEST, "2");
assertEquals("1", config.getCheckJobMainContainerMemoryRequest());
}

@Test
@DisplayName("should default to JobMainMemoryLimit if not set")
void testMemoryLimitDefaultToJobMainMemoryLimit() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT, null);
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT, "1");
assertEquals("1", config.getCheckJobMainContainerMemoryLimit());
}

@Test
@DisplayName("checkJobMemoryLimit should take precedent if set")
void testCheckJobMemoryLimitTakePrecedentIfSet() {
envMap.put(EnvConfigs.CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT, "1");
envMap.put(EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT, "2");
assertEquals("1", config.getCheckJobMainContainerMemoryLimit());
}

}

@Test
void testSharedJobEnvMapRetrieval() {
envMap.put(EnvConfigs.AIRBYTE_VERSION, DEV);
Expand Down
Loading

0 comments on commit 7eedfc7

Please sign in to comment.