diff --git a/az-core/src/main/java/azkaban/Constants.java b/az-core/src/main/java/azkaban/Constants.java index 32e4b50fe3..5a3e73de96 100644 --- a/az-core/src/main/java/azkaban/Constants.java +++ b/az-core/src/main/java/azkaban/Constants.java @@ -686,6 +686,9 @@ public static class ContainerizedDispatchManagerProperties { "azkaban-base.image.name"; public static final String KUBERNETES_POD_AZKABAN_CONFIG_IMAGE_NAME = AZKABAN_KUBERNETES_PREFIX + "azkaban-config.image.name"; + public static final String KUBERNETES_POD_AZKABAN_SECURITY_INIT_IMAGE_NAME = + AZKABAN_KUBERNETES_PREFIX + "azkaban-security-init.image.name"; + public static final String KUBERNETES_POD_SERVICE_ACCOUNT_TOKEN_AUTOMOUNT = KUBERNETES_POD_PREFIX + "service.account.token.automount"; @@ -733,6 +736,8 @@ public static class ContainerizedDispatchManagerProperties { public static final String KUBERNETES_INIT_MOUNT_PATH_FOR_JOBTYPES = KUBERNETES_FLOW_CONTAINER_PREFIX + "init.jobtypes.mount.path"; + public static final String PREFETCH_PROXY_USER_CERTIFICATES = + AZKABAN_CONTAINERIZED_PREFIX + "prefetch.certificates"; public static final String KUBERNETES_MOUNT_PATH_FOR_JOBTYPES = KUBERNETES_FLOW_CONTAINER_PREFIX + "jobtypes.mount.path"; public static final String KUBERNETES_POD_TEMPLATE_PATH = @@ -798,6 +803,7 @@ public static class ContainerizedDispatchManagerProperties { public static final String ENV_ENABLE_DEV_POD = "ENABLE_DEV_POD"; public static final String ENV_CPU_REQUEST = "CPU_REQUEST"; public static final String ENV_MEMORY_REQUEST = "MEMORY_REQUEST"; + } public static class LogConstants { diff --git a/azkaban-common/src/main/java/azkaban/container/models/AzKubernetesV1SpecBuilder.java b/azkaban-common/src/main/java/azkaban/container/models/AzKubernetesV1SpecBuilder.java index e9f3b9b6f7..bc48e3a44d 100644 --- a/azkaban-common/src/main/java/azkaban/container/models/AzKubernetesV1SpecBuilder.java +++ b/azkaban-common/src/main/java/azkaban/container/models/AzKubernetesV1SpecBuilder.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,7 +161,22 @@ public AzKubernetesV1SpecBuilder addInitContainerType(String name, String image, LOGGER.debug("Added init container to the pod spec"); return this; } - + public AzKubernetesV1SpecBuilder addSecurityInitContainer(String image, + ImagePullPolicy imagePullPolicy, + final InitContainerType initContainerType, Set proxyUserList ) { + V1EnvVar proxyUserEnv = new V1EnvVarBuilder() + .withName(initContainerType.mountPathKey) + .withValue(String.join(",", proxyUserList)) + .build(); + V1Container initContainer = new V1ContainerBuilder() + .withName(initContainerType.initPrefix) + .addToEnv(proxyUserEnv) + .withImagePullPolicy(imagePullPolicy.getPolicyVal()) + .withImage(image) + .build(); + this.initContainers.add(initContainer); + return this; + } /** * This method adds a HostPath volume to the pod-spec and also mounts the volume to the flow * container. diff --git a/azkaban-common/src/main/java/azkaban/container/models/InitContainerType.java b/azkaban-common/src/main/java/azkaban/container/models/InitContainerType.java index e5f16a650a..0d3fe4309b 100644 --- a/azkaban-common/src/main/java/azkaban/container/models/InitContainerType.java +++ b/azkaban-common/src/main/java/azkaban/container/models/InitContainerType.java @@ -23,12 +23,13 @@ * categories of init containers. */ public enum InitContainerType { + SECURITY("azkaban-security-init","security-volume", "PROXY_USER_LIST"), JOBTYPE("jobtype-init-", "jobtype-volume-", "JOBTYPE_MOUNT_PATH"), DEPENDENCY("dependency-init-", "dependency-volume-", "DEPENDENCY_MOUNT_PATH"); - final String initPrefix; - final String volumePrefix; - final String mountPathKey; + public final String initPrefix; + public final String volumePrefix; + public final String mountPathKey; private InitContainerType(final String initPrefix, final String volumePrefix, final String mountPathKey) { diff --git a/azkaban-common/src/main/java/azkaban/executor/KeyStoreManager.java b/azkaban-common/src/main/java/azkaban/executor/KeyStoreManager.java index 3fc1bbab21..55b689991f 100644 --- a/azkaban-common/src/main/java/azkaban/executor/KeyStoreManager.java +++ b/azkaban-common/src/main/java/azkaban/executor/KeyStoreManager.java @@ -17,6 +17,8 @@ package azkaban.executor; +import java.util.Map; +import javax.validation.constraints.NotEmpty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,7 @@ public class KeyStoreManager { private static final Logger logger = LoggerFactory.getLogger(KeyStoreManager.class); private KeyStore keyStore; - + private Map keyStoreMap; public static KeyStoreManager getInstance() { if (ksmInstance == null) { synchronized (KeyStoreManager.class) { @@ -63,4 +65,20 @@ public KeyStore getKeyStore() { public void setKeyStore(final @Nonnull KeyStore keyStore) { this.keyStore = keyStore; } + + /** + * Gets the cached KeyStoreMap object + * @return Cached KeyStoreMap object. + */ + public Map getKeyStoreMap() { + return this.keyStoreMap; + } + + /** + * + * @param keyStoreMap Must be non-null. + */ + public void setKeyStoreMap(final @NotEmpty Map keyStoreMap) { + this.keyStoreMap = keyStoreMap; + } } diff --git a/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java b/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java index b598c9797a..58f29b8e5e 100644 --- a/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java +++ b/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java @@ -152,6 +152,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain private static final VPARecommendation EMPTY_VPA_RECOMMENDATION = new VPARecommendation(null, null); + private static final String DEFAULT_AZKABAN_SECURITY_INIT_IMAGE_NAME = "azkaban-security-init"; private final String namespace; private final ApiClient client; @@ -189,6 +190,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain private final String initMountPathPrefixForDependencies; private final String appMountPathPrefixForDependencies; private final boolean saTokenAutoMount; + private final boolean prefetchAllCredentials; private static final Set INCLUDED_JOB_TYPES = new TreeSet<>( String.CASE_INSENSITIVE_ORDER); private final String secretName; @@ -205,6 +207,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain private static final Logger logger = LoggerFactory .getLogger(KubernetesContainerizedImpl.class); + private final String azkabanSecurityInitImageName; @Inject public KubernetesContainerizedImpl(final Props azkProps, @@ -344,6 +347,12 @@ public KubernetesContainerizedImpl(final Props azkProps, false); this.maxVpaRecommendation = new VPARecommendation(this.maxAllowedCPU, this.maxAllowedMemory); + this.prefetchAllCredentials = this.azkProps + .getBoolean(ContainerizedDispatchManagerProperties.PREFETCH_PROXY_USER_CERTIFICATES, + false); + this.azkabanSecurityInitImageName = this.azkProps + .getString(ContainerizedDispatchManagerProperties.KUBERNETES_POD_AZKABAN_SECURITY_INIT_IMAGE_NAME, + DEFAULT_AZKABAN_SECURITY_INIT_IMAGE_NAME); // Add all the job types that are readily available as part of azkaban base image. this.addIncludedJobTypes(); } @@ -740,7 +749,8 @@ V1PodSpec createPodSpec(final ExecutableFlow executableFlow, addEnvVariablesToSpecBuilder(v1SpecBuilder, envVariables); // Create init container yaml file for each jobType and dependency - addInitContainers(executionId, jobTypes, dependencyTypes, v1SpecBuilder, versionSet); + addInitContainers(executableFlow, jobTypes, dependencyTypes, v1SpecBuilder, + versionSet); // Add volume with secrets mounted @@ -1126,6 +1136,7 @@ private void createPod(final int executionId) allImageTypes.add(this.azkabanConfigImageName); allImageTypes.addAll(jobTypes); allImageTypes.addAll(this.dependencyTypes); + allImageTypes.add(this.azkabanSecurityInitImageName); final VersionSet versionSet = fetchVersionSet(executionId, flowParam, allImageTypes, flow); final V1PodSpec podSpec = createPodSpec(flow, flowResourceRecommendation, flowResourceRecommendationMap, versionSet, @@ -1311,18 +1322,20 @@ private ImmutableMap getAnnotationsForPod() { /** * TODO: Check if we need to turn everything into lower case? * - * @param executionId + * @param executableFlow * @param jobTypes * @param dependencyTypes * @param v1SpecBuilder * @param versionSet * @throws ExecutorManagerException */ - private void addInitContainers(final int executionId, + private void addInitContainers(final ExecutableFlow executableFlow, final Set jobTypes, final Set dependencyTypes, final AzKubernetesV1SpecBuilder v1SpecBuilder, final VersionSet versionSet) throws ExecutorManagerException { + final ExecutableFlow flow = executableFlow; + final Set proxyUserList = flow.getProxyUsers(); for (final String jobType : jobTypes) { // Skip all the job types that are available in the azkaban base image and create init // container for the remaining job types. @@ -1352,6 +1365,17 @@ private void addInitContainers(final int executionId, dependency + " in versionSet"); } } + if (this.prefetchAllCredentials) { + try { + final String imageFullPath = + versionSet.getVersion(this.azkabanSecurityInitImageName).get().pathWithVersion(); + v1SpecBuilder.addSecurityInitContainer(imageFullPath, ImagePullPolicy.IF_NOT_PRESENT, + InitContainerType.SECURITY, proxyUserList); + } catch (final Exception e) { + throw new ExecutorManagerException("Did not find security image. Failed Proxy User Init " + + "container"); + } + } } private void addSecretVolume(final AzKubernetesV1SpecBuilder v1SpecBuilder) { diff --git a/azkaban-exec-server/src/main/java/azkaban/container/FlowContainer.java b/azkaban-exec-server/src/main/java/azkaban/container/FlowContainer.java index ad2cc7f566..ab4f9e85fa 100644 --- a/azkaban-exec-server/src/main/java/azkaban/container/FlowContainer.java +++ b/azkaban-exec-server/src/main/java/azkaban/container/FlowContainer.java @@ -486,11 +486,20 @@ private void setupKeyStore() throws ExecutorManagerException { throw new RuntimeException("Failed to get hadoop security manager!" + e.getCause(), e); } - - final KeyStore keyStore = hadoopSecurityManager.getKeyStore(commonPluginLoadProps); - if (keyStore == null) { - logger.error("Failed to Prefetch KeyStore"); - throw new ExecutorManagerException("Failed to Prefetch KeyStore"); + if (commonPluginLoadProps.getBoolean("use.polp.keystores", false)){ + final Map keyStoreMap = + hadoopSecurityManager.getKeyStoreMap(commonPluginLoadProps); + if (keyStoreMap == null) { + logger.error("Failed to Prefetch KeyStore Map of Proxy Users"); + throw new ExecutorManagerException("Failed to Prefetch KeyStore Map of Proxy Users"); + } + } + else { + final KeyStore keyStore = hadoopSecurityManager.getKeyStore(commonPluginLoadProps); + if (keyStore == null) { + logger.error("Failed to Prefetch KeyStore"); + throw new ExecutorManagerException("Failed to Prefetch KeyStore"); + } } logger.info("In-memory Keystore is setup, delete the cert file"); // Delete the cert file from disk as the KeyStore is already cached above. diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/AbstractHadoopSecurityManager.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/AbstractHadoopSecurityManager.java index 982cc39259..3a7d85f1d8 100644 --- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/AbstractHadoopSecurityManager.java +++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/AbstractHadoopSecurityManager.java @@ -38,6 +38,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -359,6 +360,17 @@ public KeyStore getKeyStore(final Props props) { KeyStoreManager.getInstance().setKeyStore(keyStore); return keyStore; } + @Override + public Map getKeyStoreMap(final Props props) { + logger.info("Prefetching KeyStore for the flow"); + final Credentials cred = new Credentials(); + final CredentialProviderWithKeyStoreMap customCredential = (CredentialProviderWithKeyStoreMap) + getCustomCredentialProvider(props, cred, logger, + Constants.ConfigurationKeys.CUSTOM_CREDENTIAL_NAME); + final Map keyStoreMap = customCredential.getKeyStoreMap(); + KeyStoreManager.getInstance().setKeyStoreMap(keyStoreMap); + return keyStoreMap; + } /** * This method is used to verify whether Hadoop security is enabled or not. diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/CredentialProviderWithKeyStoreMap.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/CredentialProviderWithKeyStoreMap.java new file mode 100644 index 0000000000..d047734b3b --- /dev/null +++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/CredentialProviderWithKeyStoreMap.java @@ -0,0 +1,23 @@ +package azkaban.security; + +import java.security.KeyStore; +import java.util.Map; +import javax.annotation.Nonnull; + + +public interface CredentialProviderWithKeyStoreMap extends CredentialProvider { + /** + * get KeyStoreMap to be reused within an Azkaban Flow + * + * @return KeyStore + */ + public Map getKeyStoreMap(); + + /** + * Set KeyStoreMap to be reused within an Azkaban Flow + * + * @return KeyStore + */ + public void setKeyStoreMap(final @Nonnull Map keyStoreMap); + +} \ No newline at end of file diff --git a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java index 26bf20225f..64eb907237 100644 --- a/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java +++ b/azkaban-hadoop-security-plugin/src/main/java/azkaban/security/commons/HadoopSecurityManager.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.security.KeyStore; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.Credentials; @@ -109,4 +110,6 @@ public abstract void prefetchToken(File tokenFile, Props props, Logger logger) throws HadoopSecurityManagerException; public abstract KeyStore getKeyStore(final Props props); + + public abstract Map getKeyStoreMap(Props commonPluginLoadProps); }