Skip to content

Commit

Permalink
add secret mounting for orchestrator (airbytehq#10168)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor authored Feb 8, 2022
1 parent 5b4b3a2 commit 464954d
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,21 @@ public interface Configs {

// Container Orchestrator
/**
* Define if Airbyte should use Scheduler V2. Internal-use only.
* Define if Airbyte should use the container orchestrator. Internal-use only.
*/
boolean getContainerOrchestratorEnabled();

/**
* Get the name of the container orchestrator secret. Internal-use only.
*/
String getContainerOrchestratorSecretName();

/**
* Get the mount path for a secret that should be loaded onto container orchestrator pods.
* Internal-use only.
*/
String getContainerOrchestratorSecretMountPath();

/**
* Get the longest duration of non long running activity
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class EnvConfigs implements Configs {
private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS = "JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String CONTAINER_ORCHESTRATOR_ENABLED = "CONTAINER_ORCHESTRATOR_ENABLED";
private static final String CONTAINER_ORCHESTRATOR_SECRET_NAME = "CONTAINER_ORCHESTRATOR_SECRET_NAME";
private static final String CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH = "CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH";

public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
Expand Down Expand Up @@ -571,6 +573,16 @@ public boolean getContainerOrchestratorEnabled() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_ENABLED, false, Boolean::valueOf);
}

@Override
public String getContainerOrchestratorSecretName() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_SECRET_NAME, null);
}

@Override
public String getContainerOrchestratorSecretMountPath() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH, null);
}

@Override
public int getMaxActivityTimeoutSecond() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "120"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ private static WorkerOptions getWorkerOptions(final int max) {
public static record ContainerOrchestratorConfig(
String namespace,
DocumentStoreClient documentStoreClient,
KubernetesClient kubernetesClient) {}
KubernetesClient kubernetesClient,
String secretName,
String secretMountPath) {}

static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Configs configs) {
if (configs.getContainerOrchestratorEnabled()) {
Expand All @@ -279,7 +281,9 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Conf
return Optional.of(new ContainerOrchestratorConfig(
configs.getJobKubeNamespace(),
documentStoreClient,
kubernetesClient));
kubernetesClient,
configs.getContainerOrchestratorSecretName(),
configs.getContainerOrchestratorSecretMountPath()));
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
Expand Down Expand Up @@ -52,15 +53,21 @@ public class AsyncOrchestratorPodProcess implements KubePod {
private final KubePodInfo kubePodInfo;
private final DocumentStoreClient documentStoreClient;
private final KubernetesClient kubernetesClient;
private final String secretName;
private final String secretMountPath;
private final AtomicReference<Optional<Integer>> cachedExitValue;

public AsyncOrchestratorPodProcess(
final KubePodInfo kubePodInfo,
final DocumentStoreClient documentStoreClient,
final KubernetesClient kubernetesClient) {
final KubernetesClient kubernetesClient,
final String secretName,
final String secretMountPath) {
this.kubePodInfo = kubePodInfo;
this.documentStoreClient = documentStoreClient;
this.kubernetesClient = kubernetesClient;
this.secretName = secretName;
this.secretMountPath = secretMountPath;
this.cachedExitValue = new AtomicReference<>(Optional.empty());
}

Expand Down Expand Up @@ -230,17 +237,35 @@ public void create(final String airbyteVersion,
final ResourceRequirements resourceRequirements,
final Map<String, String> fileMap,
final Map<Integer, Integer> portMap) {
final Volume configVolume = new VolumeBuilder()
final List<Volume> volumes = new ArrayList<>();
final List<VolumeMount> volumeMounts = new ArrayList<>();

volumes.add(new VolumeBuilder()
.withName("airbyte-config")
.withNewEmptyDir()
.withMedium("Memory")
.endEmptyDir()
.build();
.build());

final VolumeMount configVolumeMount = new VolumeMountBuilder()
volumeMounts.add(new VolumeMountBuilder()
.withName("airbyte-config")
.withMountPath(KubePodProcess.CONFIG_DIR)
.build();
.build());

if (secretName != null && secretMountPath != null) {
volumes.add(new VolumeBuilder()
.withName("airbyte-secret")
.withSecret(new SecretVolumeSourceBuilder()
.withSecretName(secretName)
.withDefaultMode(420)
.build())
.build());

volumeMounts.add(new VolumeMountBuilder()
.withName("airbyte-secret")
.withMountPath(secretMountPath)
.build());
}

final List<ContainerPort> containerPorts = KubePodProcess.createContainerPortList(portMap);

Expand All @@ -250,7 +275,7 @@ public void create(final String airbyteVersion,
.withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build())
.withPorts(containerPorts)
.withPorts(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null))
.withVolumeMounts(configVolumeMount)
.withVolumeMounts(volumeMounts)
.build();

final Pod pod = new PodBuilder()
Expand All @@ -264,7 +289,7 @@ public void create(final String airbyteVersion,
.withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true)
.withRestartPolicy("Never")
.withContainers(mainContainer)
.withVolumes(configVolume)
.withVolumes(volumes)
.endSpec()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException {
process = new AsyncOrchestratorPodProcess(
kubePodInfo,
containerOrchestratorConfig.documentStoreClient(),
containerOrchestratorConfig.kubernetesClient());
containerOrchestratorConfig.kubernetesClient(),
containerOrchestratorConfig.secretName(),
containerOrchestratorConfig.secretMountPath());

if (process.getDocStoreStatus().equals(AsyncKubePodStatus.NOT_STARTED)) {
process.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public void test() throws InterruptedException {
final var asyncProcess = new AsyncOrchestratorPodProcess(
kubePodInfo,
documentStoreClient,
kubernetesClient);
kubernetesClient,
null,
null);

final Map<Integer, Integer> portMap = Map.of(
WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT,
Expand Down

0 comments on commit 464954d

Please sign in to comment.