Skip to content

Commit

Permalink
KAFKA-15219: KRaft support for DelegationTokens (apache#14083)
Browse files Browse the repository at this point in the history
Reviewers: David Arthur <[email protected]>, Ron Dagostino <[email protected]>, Manikumar Reddy <[email protected]>, Viktor Somogyi <[email protected]>
  • Loading branch information
pprovenzano authored Aug 19, 2023
1 parent 05c329e commit c2759df
Show file tree
Hide file tree
Showing 63 changed files with 2,665 additions and 440 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="javax.crypto" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />

Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager|MetadataDelta).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,27 @@ public TokenInformation(String tokenId, KafkaPrincipal owner, KafkaPrincipal tok
this.expiryTimestamp = expiryTimestamp;
}

// Convert record elements into a TokenInformation
public static TokenInformation fromRecord(String tokenId, KafkaPrincipal owner, KafkaPrincipal tokenRequester,
Collection<KafkaPrincipal> renewers, long issueTimestamp, long maxTimestamp, long expiryTimestamp) {
return new TokenInformation(
tokenId, owner, tokenRequester, renewers, issueTimestamp, maxTimestamp, expiryTimestamp);
}

public KafkaPrincipal owner() {
return owner;
}

public String ownerAsString() {
return owner.toString();
}

public KafkaPrincipal tokenRequester() {
return tokenRequester;
}

public String ownerAsString() {
return owner.toString();
public String tokenRequesterAsString() {
return tokenRequester.toString();
}

public Collection<KafkaPrincipal> renewers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 38,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "CreateDelegationTokenRequest",
// Version 1 is the same as version 0.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 41,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 40,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "ExpireDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 39,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "RenewDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher, DelegationTokenPublisher}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.NetworkClient
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -279,11 +280,8 @@ class BrokerServer(
)

/* start token manager */
if (config.tokenAuthEnabled) {
throw new UnsupportedOperationException("Delegation tokens are not supported")
}
tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
tokenManager = new DelegationTokenManager(config, tokenCache, time)
tokenManager.startup()

groupCoordinator = createGroupCoordinator()

Expand Down Expand Up @@ -418,6 +416,11 @@ class BrokerServer(
sharedServer.metadataPublishingFaultHandler,
"broker",
credentialProvider),
new DelegationTokenPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"broker",
tokenManager),
new AclPublisher(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
Expand Down
122 changes: 120 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import java.{lang, util}
import java.nio.ByteBuffer
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
import java.util.concurrent.CompletableFuture
Expand All @@ -28,7 +29,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
Expand All @@ -44,12 +45,14 @@ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, USER}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion

Expand Down Expand Up @@ -101,6 +104,9 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateDelegationTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewDelegationTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireDelegationTokenRequest(request)
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
Expand Down Expand Up @@ -842,6 +848,118 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}

// The principal is carried through in the forwarded case.
// The security protocol in the context is for the current connection (hop)
// We need to always disallow a tokenAuthenticated principal
// We need to allow special protocols but only in the forwarded case for testing.
def allowTokenRequests(request: RequestChannel.Request): Boolean = {
val protocol = request.context.securityProtocol
if (request.context.principal.tokenAuthenticated ||
// We allow forwarded requests to use PLAINTEXT for testing purposes
(request.isForwarded == false && protocol == SecurityProtocol.PLAINTEXT) ||
// disallow requests from 1-way SSL
(request.isForwarded == false && protocol == SecurityProtocol.SSL && request.context.principal == KafkaPrincipal.ANONYMOUS))
false
else
true
}

def handleCreateDelegationTokenRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val createTokenRequest = request.body[CreateDelegationTokenRequest]

val requester = request.context.principal
val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
val ownerPrincipalType = createTokenRequest.data.ownerPrincipalType
val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
request.context.principal
} else {
new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
}

if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
CompletableFuture.completedFuture[Unit](())
} else if (!owner.equals(requester) &&
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
// Requester is always allowed to create token for self
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
CompletableFuture.completedFuture[Unit](())
} else {

val context = new ControllerRequestContext(request.context.header.data,
request.context.principal, OptionalLong.empty())

// Copy the response data to a new response so we can apply the request version
controller.createDelegationToken(context, createTokenRequest.data)
.thenApply[Unit] { response =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(
request.context.requestVersion,
requestThrottleMs,
Errors.forCode(response.errorCode()),
new KafkaPrincipal(response.principalType(), response.principalName()),
new KafkaPrincipal(response.tokenRequesterPrincipalType(), response.tokenRequesterPrincipalName()),
response.issueTimestampMs(),
response.expiryTimestampMs(),
response.maxTimestampMs(),
response.tokenId(),
ByteBuffer.wrap(response.hmac())))
}
}
}

def handleRenewDelegationTokenRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val renewTokenRequest = request.body[RenewDelegationTokenRequest]

if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
CompletableFuture.completedFuture[Unit](())
} else {
val context = new ControllerRequestContext(
request.context.header.data,
request.context.principal,
OptionalLong.empty())
controller.renewDelegationToken(context, renewTokenRequest.data)
.thenApply[Unit] { response =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
}
}

def handleExpireDelegationTokenRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val expireTokenRequest = request.body[ExpireDelegationTokenRequest]

if (!allowTokenRequests(request)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
CompletableFuture.completedFuture[Unit](())
} else {
val context = new ControllerRequestContext(
request.context.header.data,
request.context.principal,
OptionalLong.empty())
controller.expireDelegationToken(context, expireTokenRequest.data)
.thenApply[Unit] { response =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
}
}

def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
val listRequest = request.body[ListPartitionReassignmentsRequest]
authHelper.authorizeClusterOperation(request, DESCRIBE)
Expand Down
29 changes: 27 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher,
DynamicConfigPublisher, ScramPublisher, DelegationTokenPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -205,6 +206,14 @@ class ControllerServer(
QuorumFeatures.defaultFeatureMap(),
controllerNodes)

val delegationTokenKeyString = {
if (config.tokenAuthEnabled) {
config.delegationTokenSecretKey.value
} else {
null
}
}

val controllerBuilder = {
val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
OptionalLong.of(TimeUnit.NANOSECONDS.convert(config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS))
Expand Down Expand Up @@ -236,7 +245,12 @@ class ControllerServer(
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
setZkMigrationEnabled(config.migrationEnabled)
setZkMigrationEnabled(config.migrationEnabled).
setDelegationTokenCache(tokenCache).
setDelegationTokenSecretKey(delegationTokenKeyString).
setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs).
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs)
}
controller = controllerBuilder.build()

Expand Down Expand Up @@ -332,6 +346,17 @@ class ControllerServer(
credentialProvider
))


// Set up the DelegationToken publisher.
// We need a tokenManager for the Publisher
// The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
metadataPublishers.add(new DelegationTokenPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
new DelegationTokenManager(config, tokenCache, time)
))

// Set up the metrics publisher.
metadataPublishers.add(new ControllerMetadataMetricsPublisher(
sharedServer.controllerServerMetrics,
Expand Down
Loading

0 comments on commit c2759df

Please sign in to comment.