Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
separate replication orchestrator resource requests (airbytehq#10227)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor authored Feb 9, 2022
1 parent 02f382d commit efbb624
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 1 deletion.
20 changes: 20 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,26 @@ public interface Configs {
*/
String getContainerOrchestratorImage();

/**
* Define the replication orchestrator's minimum CPU usage. Defaults to none.
*/
String getReplicationOrchestratorCpuRequest();

/**
* Define the replication orchestrator's maximum CPU usage. Defaults to none.
*/
String getReplicationOrchestratorCpuLimit();

/**
* Define the replication orchestrator's minimum RAM usage. Defaults to none.
*/
String getReplicationOrchestratorMemoryRequest();

/**
* Define the replication orchestrator's maximum RAM usage. Defaults to none.
*/
String getReplicationOrchestratorMemoryLimit();

/**
* Get the longest duration of non long running activity
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public class EnvConfigs implements Configs {
public static final long DEFAULT_MAX_SYNC_WORKERS = 5;

public static final String DEFAULT_NETWORK = "host";
private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
Expand Down Expand Up @@ -594,6 +598,26 @@ public String getContainerOrchestratorImage() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_IMAGE, "airbyte/container-orchestrator:" + getAirbyteVersion().serialize());
}

@Override
public String getReplicationOrchestratorCpuRequest() {
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_CPU_REQUEST, null);
}

@Override
public String getReplicationOrchestratorCpuLimit() {
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_CPU_LIMIT, null);
}

@Override
public String getReplicationOrchestratorMemoryRequest() {
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_MEMORY_REQUEST, null);
}

@Override
public String getReplicationOrchestratorMemoryLimit() {
return getEnvOrDefault(REPLICATION_ORCHESTRATOR_MEMORY_LIMIT, null);
}

@Override
public int getMaxActivityTimeoutSecond() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "120"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
public class WorkerConfigs {

private final Configs.WorkerEnvironment workerEnvironment;

// for running source, destination, normalization, dbt, normalization orchestrator, and dbt
// orchestrator pods
private final ResourceRequirements resourceRequirements;
// for running replication orchestrator pods
private final ResourceRequirements replicationOrchestratorResourceRequirements;

private final List<TolerationPOJO> workerKubeTolerations;
private final Map<String, String> workerKubeNodeSelectors;
private final String jobImagePullSecret;
Expand All @@ -30,6 +36,11 @@ public WorkerConfigs(final Configs configs) {
.withCpuLimit(configs.getJobMainContainerCpuLimit())
.withMemoryRequest(configs.getJobMainContainerMemoryRequest())
.withMemoryLimit(configs.getJobMainContainerMemoryLimit());
this.replicationOrchestratorResourceRequirements = new ResourceRequirements()
.withCpuRequest(configs.getReplicationOrchestratorCpuRequest())
.withCpuLimit(configs.getReplicationOrchestratorCpuLimit())
.withMemoryRequest(configs.getReplicationOrchestratorMemoryRequest())
.withMemoryLimit(configs.getReplicationOrchestratorMemoryLimit());
this.workerKubeTolerations = configs.getJobKubeTolerations();
this.workerKubeNodeSelectors = configs.getJobKubeNodeSelectors();
this.jobImagePullSecret = configs.getJobKubeMainContainerImagePullSecret();
Expand Down Expand Up @@ -80,4 +91,8 @@ public Map<String, String> getEnvMap() {
return envMap;
}

public ResourceRequirements getReplicationOrchestratorResourceRequirements() {
return replicationOrchestratorResourceRequirements;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ReplicationLauncherWorker(final UUID connectionId,
INIT_FILE_SOURCE_LAUNCHER_CONFIG, Jsons.serialize(sourceLauncherConfig),
INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)),
containerOrchestratorConfig,
workerConfigs.getResourceRequirements(),
workerConfigs.getReplicationOrchestratorResourceRequirements(),
ReplicationOutput.class);
}

Expand Down

0 comments on commit efbb624

Please sign in to comment.