Skip to content

Commit

Permalink
Adding Principle of Least Priviledge Security model for Keystore Mana… (
Browse files Browse the repository at this point in the history
azkaban#3216)

* Adding Principle of Least Priviledge Security model for Keystore Manager and for in-memory Security Certificates

* Reverting Kafka relevant changes from POLP change

* Addressing PR comment - > Added default value for use.polp.keystores and bter naming conventions along with saving extra DB call.

* Added security init image name using local variable
  • Loading branch information
utk-12 authored Jan 6, 2023
1 parent ff5440f commit 8a65e94
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 13 deletions.
6 changes: 6 additions & 0 deletions az-core/src/main/java/azkaban/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,9 @@ public static class ContainerizedDispatchManagerProperties {
"azkaban-base.image.name";
public static final String KUBERNETES_POD_AZKABAN_CONFIG_IMAGE_NAME =
AZKABAN_KUBERNETES_PREFIX + "azkaban-config.image.name";
public static final String KUBERNETES_POD_AZKABAN_SECURITY_INIT_IMAGE_NAME =
AZKABAN_KUBERNETES_PREFIX + "azkaban-security-init.image.name";

public static final String KUBERNETES_POD_SERVICE_ACCOUNT_TOKEN_AUTOMOUNT =
KUBERNETES_POD_PREFIX + "service.account.token.automount";

Expand Down Expand Up @@ -733,6 +736,8 @@ public static class ContainerizedDispatchManagerProperties {

public static final String KUBERNETES_INIT_MOUNT_PATH_FOR_JOBTYPES =
KUBERNETES_FLOW_CONTAINER_PREFIX + "init.jobtypes.mount.path";
public static final String PREFETCH_PROXY_USER_CERTIFICATES =
AZKABAN_CONTAINERIZED_PREFIX + "prefetch.certificates";
public static final String KUBERNETES_MOUNT_PATH_FOR_JOBTYPES =
KUBERNETES_FLOW_CONTAINER_PREFIX + "jobtypes.mount.path";
public static final String KUBERNETES_POD_TEMPLATE_PATH =
Expand Down Expand Up @@ -798,6 +803,7 @@ public static class ContainerizedDispatchManagerProperties {
public static final String ENV_ENABLE_DEV_POD = "ENABLE_DEV_POD";
public static final String ENV_CPU_REQUEST = "CPU_REQUEST";
public static final String ENV_MEMORY_REQUEST = "MEMORY_REQUEST";

}

public static class LogConstants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,7 +161,22 @@ public AzKubernetesV1SpecBuilder addInitContainerType(String name, String image,
LOGGER.debug("Added init container to the pod spec");
return this;
}

public AzKubernetesV1SpecBuilder addSecurityInitContainer(String image,
ImagePullPolicy imagePullPolicy,
final InitContainerType initContainerType, Set<String> proxyUserList ) {
V1EnvVar proxyUserEnv = new V1EnvVarBuilder()
.withName(initContainerType.mountPathKey)
.withValue(String.join(",", proxyUserList))
.build();
V1Container initContainer = new V1ContainerBuilder()
.withName(initContainerType.initPrefix)
.addToEnv(proxyUserEnv)
.withImagePullPolicy(imagePullPolicy.getPolicyVal())
.withImage(image)
.build();
this.initContainers.add(initContainer);
return this;
}
/**
* This method adds a HostPath volume to the pod-spec and also mounts the volume to the flow
* container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
* categories of init containers.
*/
public enum InitContainerType {
SECURITY("azkaban-security-init","security-volume", "PROXY_USER_LIST"),
JOBTYPE("jobtype-init-", "jobtype-volume-", "JOBTYPE_MOUNT_PATH"),
DEPENDENCY("dependency-init-", "dependency-volume-", "DEPENDENCY_MOUNT_PATH");

final String initPrefix;
final String volumePrefix;
final String mountPathKey;
public final String initPrefix;
public final String volumePrefix;
public final String mountPathKey;

private InitContainerType(final String initPrefix, final String volumePrefix,
final String mountPathKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package azkaban.executor;


import java.util.Map;
import javax.validation.constraints.NotEmpty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +34,7 @@ public class KeyStoreManager {
private static final Logger logger = LoggerFactory.getLogger(KeyStoreManager.class);

private KeyStore keyStore;

private Map<String, KeyStore> keyStoreMap;
public static KeyStoreManager getInstance() {
if (ksmInstance == null) {
synchronized (KeyStoreManager.class) {
Expand Down Expand Up @@ -63,4 +65,20 @@ public KeyStore getKeyStore() {
public void setKeyStore(final @Nonnull KeyStore keyStore) {
this.keyStore = keyStore;
}

/**
* Gets the cached KeyStoreMap object
* @return Cached KeyStoreMap object.
*/
public Map<String, KeyStore> getKeyStoreMap() {
return this.keyStoreMap;
}

/**
*
* @param keyStoreMap Must be non-null.
*/
public void setKeyStoreMap(final @NotEmpty Map<String, KeyStore> keyStoreMap) {
this.keyStoreMap = keyStoreMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain

private static final VPARecommendation EMPTY_VPA_RECOMMENDATION = new VPARecommendation(null,
null);
private static final String DEFAULT_AZKABAN_SECURITY_INIT_IMAGE_NAME = "azkaban-security-init";

private final String namespace;
private final ApiClient client;
Expand Down Expand Up @@ -189,6 +190,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain
private final String initMountPathPrefixForDependencies;
private final String appMountPathPrefixForDependencies;
private final boolean saTokenAutoMount;
private final boolean prefetchAllCredentials;
private static final Set<String> INCLUDED_JOB_TYPES = new TreeSet<>(
String.CASE_INSENSITIVE_ORDER);
private final String secretName;
Expand All @@ -205,6 +207,7 @@ public class KubernetesContainerizedImpl extends EventHandler implements Contain

private static final Logger logger = LoggerFactory
.getLogger(KubernetesContainerizedImpl.class);
private final String azkabanSecurityInitImageName;

@Inject
public KubernetesContainerizedImpl(final Props azkProps,
Expand Down Expand Up @@ -344,6 +347,12 @@ public KubernetesContainerizedImpl(final Props azkProps,
false);
this.maxVpaRecommendation = new VPARecommendation(this.maxAllowedCPU,
this.maxAllowedMemory);
this.prefetchAllCredentials = this.azkProps
.getBoolean(ContainerizedDispatchManagerProperties.PREFETCH_PROXY_USER_CERTIFICATES,
false);
this.azkabanSecurityInitImageName = this.azkProps
.getString(ContainerizedDispatchManagerProperties.KUBERNETES_POD_AZKABAN_SECURITY_INIT_IMAGE_NAME,
DEFAULT_AZKABAN_SECURITY_INIT_IMAGE_NAME);
// Add all the job types that are readily available as part of azkaban base image.
this.addIncludedJobTypes();
}
Expand Down Expand Up @@ -740,7 +749,8 @@ V1PodSpec createPodSpec(final ExecutableFlow executableFlow,
addEnvVariablesToSpecBuilder(v1SpecBuilder, envVariables);

// Create init container yaml file for each jobType and dependency
addInitContainers(executionId, jobTypes, dependencyTypes, v1SpecBuilder, versionSet);
addInitContainers(executableFlow, jobTypes, dependencyTypes, v1SpecBuilder,
versionSet);


// Add volume with secrets mounted
Expand Down Expand Up @@ -1126,6 +1136,7 @@ private void createPod(final int executionId)
allImageTypes.add(this.azkabanConfigImageName);
allImageTypes.addAll(jobTypes);
allImageTypes.addAll(this.dependencyTypes);
allImageTypes.add(this.azkabanSecurityInitImageName);
final VersionSet versionSet = fetchVersionSet(executionId, flowParam, allImageTypes, flow);
final V1PodSpec podSpec = createPodSpec(flow, flowResourceRecommendation,
flowResourceRecommendationMap, versionSet,
Expand Down Expand Up @@ -1311,18 +1322,20 @@ private ImmutableMap getAnnotationsForPod() {
/**
* TODO: Check if we need to turn everything into lower case?
*
* @param executionId
* @param executableFlow
* @param jobTypes
* @param dependencyTypes
* @param v1SpecBuilder
* @param versionSet
* @throws ExecutorManagerException
*/
private void addInitContainers(final int executionId,
private void addInitContainers(final ExecutableFlow executableFlow,
final Set<String> jobTypes, final Set<String> dependencyTypes,
final AzKubernetesV1SpecBuilder v1SpecBuilder,
final VersionSet versionSet)
throws ExecutorManagerException {
final ExecutableFlow flow = executableFlow;
final Set<String> proxyUserList = flow.getProxyUsers();
for (final String jobType : jobTypes) {
// Skip all the job types that are available in the azkaban base image and create init
// container for the remaining job types.
Expand Down Expand Up @@ -1352,6 +1365,17 @@ private void addInitContainers(final int executionId,
dependency + " in versionSet");
}
}
if (this.prefetchAllCredentials) {
try {
final String imageFullPath =
versionSet.getVersion(this.azkabanSecurityInitImageName).get().pathWithVersion();
v1SpecBuilder.addSecurityInitContainer(imageFullPath, ImagePullPolicy.IF_NOT_PRESENT,
InitContainerType.SECURITY, proxyUserList);
} catch (final Exception e) {
throw new ExecutorManagerException("Did not find security image. Failed Proxy User Init "
+ "container");
}
}
}

private void addSecretVolume(final AzKubernetesV1SpecBuilder v1SpecBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,20 @@ private void setupKeyStore() throws ExecutorManagerException {
throw new RuntimeException("Failed to get hadoop security manager!"
+ e.getCause(), e);
}

final KeyStore keyStore = hadoopSecurityManager.getKeyStore(commonPluginLoadProps);
if (keyStore == null) {
logger.error("Failed to Prefetch KeyStore");
throw new ExecutorManagerException("Failed to Prefetch KeyStore");
if (commonPluginLoadProps.getBoolean("use.polp.keystores", false)){
final Map<String, KeyStore> keyStoreMap =
hadoopSecurityManager.getKeyStoreMap(commonPluginLoadProps);
if (keyStoreMap == null) {
logger.error("Failed to Prefetch KeyStore Map of Proxy Users");
throw new ExecutorManagerException("Failed to Prefetch KeyStore Map of Proxy Users");
}
}
else {
final KeyStore keyStore = hadoopSecurityManager.getKeyStore(commonPluginLoadProps);
if (keyStore == null) {
logger.error("Failed to Prefetch KeyStore");
throw new ExecutorManagerException("Failed to Prefetch KeyStore");
}
}
logger.info("In-memory Keystore is setup, delete the cert file");
// Delete the cert file from disk as the KeyStore is already cached above.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -359,6 +360,17 @@ public KeyStore getKeyStore(final Props props) {
KeyStoreManager.getInstance().setKeyStore(keyStore);
return keyStore;
}
@Override
public Map<String,KeyStore> getKeyStoreMap(final Props props) {
logger.info("Prefetching KeyStore for the flow");
final Credentials cred = new Credentials();
final CredentialProviderWithKeyStoreMap customCredential = (CredentialProviderWithKeyStoreMap)
getCustomCredentialProvider(props, cred, logger,
Constants.ConfigurationKeys.CUSTOM_CREDENTIAL_NAME);
final Map<String,KeyStore> keyStoreMap = customCredential.getKeyStoreMap();
KeyStoreManager.getInstance().setKeyStoreMap(keyStoreMap);
return keyStoreMap;
}

/**
* This method is used to verify whether Hadoop security is enabled or not.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package azkaban.security;

import java.security.KeyStore;
import java.util.Map;
import javax.annotation.Nonnull;


public interface CredentialProviderWithKeyStoreMap extends CredentialProvider {
/**
* get KeyStoreMap to be reused within an Azkaban Flow
*
* @return KeyStore
*/
public Map<String, KeyStore> getKeyStoreMap();

/**
* Set KeyStoreMap to be reused within an Azkaban Flow
*
* @return KeyStore
*/
public void setKeyStoreMap(final @Nonnull Map<String, KeyStore> keyStoreMap);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
Expand Down Expand Up @@ -109,4 +110,6 @@ public abstract void prefetchToken(File tokenFile, Props props, Logger logger)
throws HadoopSecurityManagerException;

public abstract KeyStore getKeyStore(final Props props);

public abstract Map<String, KeyStore> getKeyStoreMap(Props commonPluginLoadProps);
}

0 comments on commit 8a65e94

Please sign in to comment.