Skip to content

Commit

Permalink
🎉 Configurable job pull image policy in k8s (airbytehq#6827)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmolimar authored Oct 6, 2021
1 parent 75aee68 commit febeb52
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface Configs {

WorkspaceRetentionConfig getWorkspaceRetentionConfig();

String getJobImagePullPolicy();

List<WorkerPodToleration> getWorkerPodTolerations();

Map<String, String> getWorkerNodeSelectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_DATABASE_URL = "CONFIG_DATABASE_URL";
public static final String RUN_DATABASE_MIGRATION_ON_STARTUP = "RUN_DATABASE_MIGRATION_ON_STARTUP";
public static final String WEBAPP_URL = "WEBAPP_URL";
public static final String JOB_IMAGE_PULL_POLICY = "JOB_IMAGE_PULL_POLICY";
public static final String WORKER_POD_TOLERATIONS = "WORKER_POD_TOLERATIONS";
public static final String WORKER_POD_NODE_SELECTORS = "WORKER_POD_NODE_SELECTORS";
public static final String MAX_SYNC_JOB_ATTEMPTS = "MAX_SYNC_JOB_ATTEMPTS";
Expand Down Expand Up @@ -76,6 +77,7 @@ public class EnvConfigs implements Configs {
private static final String DEFAULT_KUBE_NAMESPACE = "default";
private static final String DEFAULT_RESOURCE_REQUIREMENT_CPU = null;
private static final String DEFAULT_RESOURCE_REQUIREMENT_MEMORY = null;
private static final String DEFAULT_JOB_IMAGE_PULL_POLICY = "IfNotPresent";
private static final String SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID";
private static final String SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
Expand Down Expand Up @@ -278,6 +280,11 @@ private WorkerPodToleration workerPodToleration(final String tolerationStr) {
}
}

@Override
public String getJobImagePullPolicy() {
return getEnvOrDefault(JOB_IMAGE_PULL_POLICY, DEFAULT_JOB_IMAGE_PULL_POLICY);
}

/**
* Returns worker pod tolerations parsed from its own environment variable. The value of the env is
* a string that represents one or more tolerations.
Expand Down
39 changes: 16 additions & 23 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -28,19 +29,22 @@
// TODO:(Issue-4824): Figure out how to log Docker process information.
public class WorkerUtils {

public static final List<WorkerPodToleration> DEFAULT_WORKER_POD_TOLERATIONS = initWorkerPodTolerations();
public static final Map<String, String> DEFAULT_WORKER_POD_NODE_SELECTORS = initWorkerPodNodeSelectors();
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class);
private static final Configs CONFIGS = new EnvConfigs();

public static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = initResourceRequirements();
public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = new EnvConfigs().getJobsImagePullSecret();
public static final List<WorkerPodToleration> DEFAULT_WORKER_POD_TOLERATIONS = CONFIGS.getWorkerPodTolerations();
public static final Map<String, String> DEFAULT_WORKER_POD_NODE_SELECTORS = CONFIGS.getWorkerNodeSelectors();
public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = CONFIGS.getJobsImagePullSecret();
public static final String DEFAULT_JOB_IMAGE_PULL_POLICY = CONFIGS.getJobImagePullPolicy();

private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class);

public static void gentleClose(final Process process, final long timeout, final TimeUnit timeUnit) {
if (process == null) {
return;
}

if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
LOGGER.debug("Gently closing process {}", process.info().commandLine().get());
}

Expand Down Expand Up @@ -96,7 +100,7 @@ static void gentleCloseWithHeartbeat(final Process process,
final BiConsumer<Process, Duration> forceShutdown) {
while (process.isAlive() && heartbeatMonitor.isBeating()) {
try {
if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
LOGGER.debug("Gently closing process {} with heartbeat..", process.info().commandLine().get());
}

Expand All @@ -108,7 +112,7 @@ static void gentleCloseWithHeartbeat(final Process process,

if (process.isAlive()) {
try {
if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
LOGGER.debug("Gently closing process {} without heartbeat..", process.info().commandLine().get());
}

Expand All @@ -120,7 +124,7 @@ static void gentleCloseWithHeartbeat(final Process process,

// if we were unable to exist gracefully, force shutdown...
if (process.isAlive()) {
if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) {
LOGGER.debug("Force shutdown process {}..", process.info().commandLine().get());
}

Expand Down Expand Up @@ -211,23 +215,12 @@ public static Path getJobRoot(Path workspaceRoot, String jobId, int attemptId) {
.resolve(String.valueOf(attemptId));
}

private static List<WorkerPodToleration> initWorkerPodTolerations() {
final EnvConfigs configs = new EnvConfigs();
return configs.getWorkerPodTolerations();
}

private static Map<String, String> initWorkerPodNodeSelectors() {
final EnvConfigs configs = new EnvConfigs();
return configs.getWorkerNodeSelectors();
}

private static ResourceRequirements initResourceRequirements() {
final EnvConfigs configs = new EnvConfigs();
return new ResourceRequirements()
.withCpuRequest(configs.getCpuRequest())
.withCpuLimit(configs.getCpuLimit())
.withMemoryRequest(configs.getMemoryRequest())
.withMemoryLimit(configs.getMemoryLimit());
.withCpuRequest(CONFIGS.getCpuRequest())
.withCpuLimit(CONFIGS.getCpuLimit())
.withMemoryRequest(CONFIGS.getMemoryRequest())
.withMemoryLimit(CONFIGS.getMemoryLimit());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ private static Container getInit(boolean usesStdin, List<VolumeMount> mainVolume
}

private static Container getMain(String image,
String imagePullPolicy,
boolean usesStdin,
String entrypointOverride,
List<VolumeMount> mainVolumeMounts,
Expand All @@ -177,6 +178,7 @@ private static Container getMain(String image,
final ContainerBuilder containerBuilder = new ContainerBuilder()
.withName("main")
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withCommand("sh", "-c", mainCommand)
.withWorkingDir(CONFIG_DIR)
.withVolumeMounts(mainVolumeMounts);
Expand Down Expand Up @@ -246,6 +248,7 @@ public KubePodProcess(String processRunnerHost,
String podName,
String namespace,
String image,
String imagePullPolicy,
int stdoutLocalPort,
int stderrLocalPort,
String kubeHeartbeatUrl,
Expand Down Expand Up @@ -309,6 +312,7 @@ public KubePodProcess(String processRunnerHost,
Container init = getInit(usesStdin, List.of(pipeVolumeMount, configVolumeMount));
Container main = getMain(
image,
imagePullPolicy,
usesStdin,
entrypointOverride,
List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public Process create(String jobId,
podName,
namespace,
imageName,
WorkerUtils.DEFAULT_JOB_IMAGE_PULL_POLICY,
stdoutLocalPort,
stderrLocalPort,
kubeHeartbeatUrl,
Expand Down
3 changes: 3 additions & 0 deletions kube/overlays/dev-integration-test/.env
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ RESOURCE_MEMORY_LIMIT=
# Worker pod tolerations and node selectors
WORKER_POD_TOLERATIONS=
WORKER_POD_NODE_SELECTORS=

# Job image pull policy
JOB_IMAGE_PULL_POLICY=
3 changes: 3 additions & 0 deletions kube/overlays/dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT=
# Worker pod tolerations and node selectors
WORKER_POD_TOLERATIONS=
WORKER_POD_NODE_SELECTORS=

# Job image pull policy
JOB_IMAGE_PULL_POLICY=
3 changes: 3 additions & 0 deletions kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT=
# Worker pod tolerations and node selectors
WORKER_POD_TOLERATIONS=
WORKER_POD_NODE_SELECTORS=

# Job image pull policy
JOB_IMAGE_PULL_POLICY=
3 changes: 3 additions & 0 deletions kube/overlays/stable/.env
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT=
# Worker pod tolerations and node selectors
WORKER_POD_TOLERATIONS=
WORKER_POD_NODE_SELECTORS=

# Job image pull policy
JOB_IMAGE_PULL_POLICY=
5 changes: 5 additions & 0 deletions kube/resources/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ spec:
configMapKeyRef:
name: airbyte-env
key: WORKER_POD_NODE_SELECTORS
- name: JOB_IMAGE_PULL_POLICY
valueFrom:
configMapKeyRef:
name: airbyte-env
key: JOB_IMAGE_PULL_POLICY
ports:
- containerPort: 9000 # for heartbeat server
- containerPort: 9001 # start temporal worker port pool
Expand Down

0 comments on commit febeb52

Please sign in to comment.