Skip to content

Commit

Permalink
add ability to inject environment variables globally into launched pr…
Browse files Browse the repository at this point in the history
…ocesses (airbytehq#9329)

* add global process factory  env var injection support

* add envconfig support

* fix commented out piece

* use prefix instead of json map

* clean up constructor and add javadocs
  • Loading branch information
jrhizor authored Jan 10, 2022
1 parent 22ef236 commit e556697
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ public interface Configs {
*/
String getJobMainContainerMemoryLimit();

/**
* Defines a default map of environment variables to use for any launched job containers. The
* expected format is a JSON encoded String -> String map. Make sure to escape properly. Defaults to
* an empty map.
*/
Map<String, String> getJobDefaultEnvMap();

// Jobs - Kube only
/**
* Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class EnvConfigs implements Configs {
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
public static final String JOB_MAIN_CONTAINER_MEMORY_LIMIT = "JOB_MAIN_CONTAINER_MEMORY_LIMIT";
public static final String JOB_DEFAULT_ENV_MAP = "JOB_DEFAULT_ENV_MAP";
public static final String JOB_DEFAULT_ENV_PREFIX = "JOB_DEFAULT_ENV_";
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
Expand Down Expand Up @@ -121,15 +124,24 @@ public class EnvConfigs implements Configs {
public static final String DEFAULT_NETWORK = "host";

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
private final CloudStorageConfigs stateStorageCloudConfigs;

/**
* Constructs {@link EnvConfigs} from actual environment variables.
*/
public EnvConfigs() {
this(System::getenv);
this(System.getenv());
}

public EnvConfigs(final Function<String, String> getEnv) {
this.getEnv = getEnv;
/**
* Constructs {@link EnvConfigs} from a provided map. This can be used for testing or getting
* variables from a non-envvar source.
*/
public EnvConfigs(final Map<String, String> envMap) {
this.getEnv = envMap::get;
this.getAllEnvKeys = envMap::keySet;
this.logConfigs = new LogConfigs(getLogConfiguration().orElse(null));
this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null);
}
Expand Down Expand Up @@ -481,6 +493,13 @@ public String getJobMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public Map<String, String> getJobDefaultEnvMap() {
return getAllEnvKeys.get().stream()
.filter(key -> key.startsWith(JOB_DEFAULT_ENV_PREFIX))
.collect(Collectors.toMap(key -> key.replace(JOB_DEFAULT_ENV_PREFIX, ""), getEnv));
}

@Override
public LogConfigs getLogConfigs() {
return logConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,24 @@

package io.airbyte.config;

import static org.mockito.Mockito.when;

import io.airbyte.commons.version.AirbyteVersion;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class EnvConfigsTest {

private Function<String, String> function;
private Map<String, String> envMap;
private EnvConfigs config;

@SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
function = Mockito.mock(Function.class);
config = new EnvConfigs(function);
envMap = new HashMap<>();
config = new EnvConfigs(envMap);
}

@Test
Expand All @@ -35,173 +31,187 @@ void ensureGetEnvBehavior() {

@Test
void testAirbyteRole() {
when(function.apply(EnvConfigs.AIRBYTE_ROLE)).thenReturn(null);
envMap.put(EnvConfigs.AIRBYTE_ROLE, null);
Assertions.assertNull(config.getAirbyteRole());

when(function.apply(EnvConfigs.AIRBYTE_ROLE)).thenReturn("dev");
envMap.put(EnvConfigs.AIRBYTE_ROLE, "dev");
Assertions.assertEquals("dev", config.getAirbyteRole());
}

@Test
void testAirbyteVersion() {
when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn(null);
envMap.put(EnvConfigs.AIRBYTE_VERSION, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getAirbyteVersion());

when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn("dev");
envMap.put(EnvConfigs.AIRBYTE_VERSION, "dev");
Assertions.assertEquals(new AirbyteVersion("dev"), config.getAirbyteVersion());
}

@Test
void testWorkspaceRoot() {
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceRoot());

when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getWorkspaceRoot());
}

@Test
void testLocalRoot() {
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.LOCAL_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getLocalRoot());

when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getLocalRoot());
}

@Test
void testConfigRoot() {
when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.CONFIG_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getConfigRoot());

when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn("a/b");
envMap.put(EnvConfigs.CONFIG_ROOT, "a/b");
Assertions.assertEquals(Paths.get("a/b"), config.getConfigRoot());
}

@Test
void testGetDatabaseUser() {
when(function.apply(EnvConfigs.DATABASE_USER)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_USER, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabaseUser());

when(function.apply(EnvConfigs.DATABASE_USER)).thenReturn("user");
envMap.put(EnvConfigs.DATABASE_USER, "user");
Assertions.assertEquals("user", config.getDatabaseUser());
}

@Test
void testGetDatabasePassword() {
when(function.apply(EnvConfigs.DATABASE_PASSWORD)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_PASSWORD, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabasePassword());

when(function.apply(EnvConfigs.DATABASE_PASSWORD)).thenReturn("password");
envMap.put(EnvConfigs.DATABASE_PASSWORD, "password");
Assertions.assertEquals("password", config.getDatabasePassword());
}

@Test
void testGetDatabaseUrl() {
when(function.apply(EnvConfigs.DATABASE_URL)).thenReturn(null);
envMap.put(EnvConfigs.DATABASE_URL, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getDatabaseUrl());

when(function.apply(EnvConfigs.DATABASE_URL)).thenReturn("url");
envMap.put(EnvConfigs.DATABASE_URL, "url");
Assertions.assertEquals("url", config.getDatabaseUrl());
}

@Test
void testGetWorkspaceDockerMount() {
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals("abc/def", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, "root");
envMap.put(EnvConfigs.WORKSPACE_ROOT, "abc/def");
Assertions.assertEquals("root", config.getWorkspaceDockerMount());

when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.WORKSPACE_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.WORKSPACE_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceDockerMount());
}

@Test
void testGetLocalDockerMount() {
when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals("abc/def", config.getLocalDockerMount());

when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn("abc/def");
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, "root");
envMap.put(EnvConfigs.LOCAL_ROOT, "abc/def");
Assertions.assertEquals("root", config.getLocalDockerMount());

when(function.apply(EnvConfigs.LOCAL_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.LOCAL_ROOT)).thenReturn(null);
envMap.put(EnvConfigs.LOCAL_DOCKER_MOUNT, null);
envMap.put(EnvConfigs.LOCAL_ROOT, null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getLocalDockerMount());
}

@Test
void testDockerNetwork() {
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn(null);
envMap.put(EnvConfigs.DOCKER_NETWORK, null);
Assertions.assertEquals("host", config.getDockerNetwork());

when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc");
envMap.put(EnvConfigs.DOCKER_NETWORK, "abc");
Assertions.assertEquals("abc", config.getDockerNetwork());
}

@Test
void testTrackingStrategy() {
when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn(null);
envMap.put(EnvConfigs.TRACKING_STRATEGY, null);
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("abc");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "abc");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("logging");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "logging");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("segment");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "segment");
Assertions.assertEquals(Configs.TrackingStrategy.SEGMENT, config.getTrackingStrategy());

when(function.apply(EnvConfigs.TRACKING_STRATEGY)).thenReturn("LOGGING");
envMap.put(EnvConfigs.TRACKING_STRATEGY, "LOGGING");
Assertions.assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
}

@Test
void testworkerKubeTolerations() {
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(null);
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, null);
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(";;;");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, ";;;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=k,value=v;");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=k,value=v;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=airbyte-server,operator=Exists,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists")));

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS, "key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));

when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS))
.thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
envMap.put(EnvConfigs.JOB_KUBE_TOLERATIONS,
"key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(
new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists"),
new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
}

@Test
void testworkerKubeNodeSelectors() {
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(null);
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(",,,");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("key=k,,;$%&^#");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("one=two");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));

when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("airbyte=server,something=nothing");
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testEmptyEnvMapRetrieval() {
Assertions.assertEquals(Map.of(), config.getJobDefaultEnvMap());
}

@Test
void testEnvMapRetrieval() {
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV1", "VAL1");
envMap.put(EnvConfigs.JOB_DEFAULT_ENV_PREFIX + "ENV2", "VAL\"2WithQuotesand$ymbols");

final var expected = Map.of("ENV1", "VAL1", "ENV2", "VAL\"2WithQuotesand$ymbols");
Assertions.assertEquals(expected, config.getJobDefaultEnvMap());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void main(final String[] args) throws Exception {
final Map<String, String> envMap =
(Map<String, String>) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class);

final Configs configs = new EnvConfigs(envMap::get);
final Configs configs = new EnvConfigs(envMap);

heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class WorkerConfigs {
private final String jobSocatImage;
private final String jobBusyboxImage;
private final String jobCurlImage;
private final Map<String, String> envMap;

public WorkerConfigs(final Configs configs) {
this.workerEnvironment = configs.getWorkerEnvironment();
Expand All @@ -36,6 +37,7 @@ public WorkerConfigs(final Configs configs) {
this.jobSocatImage = configs.getJobKubeSocatImage();
this.jobBusyboxImage = configs.getJobKubeBusyboxImage();
this.jobCurlImage = configs.getJobKubeCurlImage();
this.envMap = configs.getJobDefaultEnvMap();
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down Expand Up @@ -74,4 +76,8 @@ public String getJobCurlImage() {
return jobCurlImage;
}

public Map<String, String> getEnvMap() {
return envMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public Process create(final String jobId,
"--log-driver",
"none");
}

for (final var envEntry : workerConfigs.getEnvMap().entrySet()) {
cmd.add("-e");
cmd.add(envEntry.getKey() + "=" + envEntry.getValue());
}

if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
Expand Down
Loading

0 comments on commit e556697

Please sign in to comment.