Skip to content

Commit

Permalink
KAFKA-14351: Controller Mutation Quota for KRaft (apache#13116)
Browse files Browse the repository at this point in the history
Implement KIP-599 controller mutation quotas for the KRaft controller. These quotas apply to create
topics, create partitions, and delete topic operations. They are specified in terms of number of
partitions.

The approach taken here is to reuse the ControllerMutationQuotaManager that is also used in ZK
mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas enforces the quota,
whereas Sensor.record notes that new partitions have been modified. While ControllerApis handles
fetching the Sensor objects, we must make the final callback to check the quotas from within
QuorumController. The reason is because only QuorumController knows the final number of partitions
that must be modified. (As one example, up-to-date information about the number of partitions that
will be deleted when a topic is deleted is really only available in QuorumController.)

For quota enforcement, the logic is already in place. The KRaft controller is expected to set the
throttle time in the response that is embedded in EnvelopeResponse, but it does not actually apply
the throttle because there is no client connection to throttle. Instead, the broker that forwarded
the request is expected to return the throttle value from the controller and to throttle the client
connection. It also applies its own request quota, so the enforced/returned quota is the maximum of
the two.

This PR also installs a DynamicConfigPublisher in ControllerServer. This allows dynamic
configurations to be published on the controller. Previously, they could be set, but they were not
applied. Note that we still don't have a good way to set node-level configurations for isolatied
controllers. However, this will allow us to set cluster configs (aka default node configs) and have
them take effect on the controllers.

In a similar vein, this PR separates out the dynamic client quota publisher logic used on the
broker into DynamicClientQuotaPublisher. We can now install this on both BrokerServer and
ControllerServer. This makes dynamically configuring quotas (such as controller mutation quotas)
possible.

Also add a ducktape test, controller_mutation_quota_test.py.

Reviewers: David Jacot <[email protected]>, Ismael Juma <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
rondagostino authored Mar 7, 2023
1 parent 29a1a16 commit e3817ca
Show file tree
Hide file tree
Showing 15 changed files with 1,040 additions and 155 deletions.
27 changes: 20 additions & 7 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package kafka.server

import java.util
import java.{lang, util}
import java.util.{Collections, OptionalLong}
import java.util.Map.Entry
import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
Expand Down Expand Up @@ -175,16 +176,18 @@ class ControllerApis(val requestChannel: RequestChannel,

def handleDeleteTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
val deleteTopicsRequest = request.body[DeleteTopicsRequest]
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs))
requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data.timeoutMs),
controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = deleteTopics(context,
deleteTopicsRequest.data,
request.context.apiVersion,
authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
future.handle[Unit] { (results, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, throttleTimeMs => {
if (exception != null) {
deleteTopicsRequest.getErrorResponse(throttleTimeMs, exception)
} else {
Expand Down Expand Up @@ -339,16 +342,18 @@ class ControllerApis(val requestChannel: RequestChannel,

def handleCreateTopics(request: RequestChannel.Request): CompletableFuture[Unit] = {
val createTopicsRequest = request.body[CreateTopicsRequest]
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs))
requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data.timeoutMs),
controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = createTopics(context,
createTopicsRequest.data,
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
names, logIfDenied = false)(identity))
future.handle[Unit] { (result, exception) =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, throttleTimeMs => {
if (exception != null) {
createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
} else {
Expand All @@ -359,6 +364,12 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}

private def controllerMutationQuotaRecorderFor(controllerMutationQuota: ControllerMutationQuota) = {
new Consumer[lang.Integer]() {
override def accept(permits: lang.Integer): Unit = controllerMutationQuota.record(permits.doubleValue())
}
}

def createTopics(
context: ControllerRequestContext,
request: CreateTopicsRequestData,
Expand Down Expand Up @@ -748,16 +759,18 @@ class ControllerApis(val requestChannel: RequestChannel,
authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
}
val createPartitionsRequest = request.body[CreatePartitionsRequest]
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data.timeoutMs))
requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data.timeoutMs),
controllerMutationQuotaRecorderFor(controllerMutationQuota))
val future = createPartitions(context,
createPartitionsRequest.data(),
filterAlterAuthorizedTopics)
future.handle[Unit] { (responses, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, requestThrottleMs => {
val responseData = new CreatePartitionsResponseData().
setResults(responses).
setThrottleTimeMs(requestThrottleMs)
Expand Down
39 changes: 26 additions & 13 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -102,9 +104,11 @@ class ControllerServer(
var alterConfigPolicy: Option[AlterConfigPolicy] = None
var controller: Controller = _
var quotaManagers: QuotaManagers = _
var clientQuotaMetadataManager: ClientQuotaMetadataManager = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
var migrationSupport: Option[ControllerMigrationSupport] = None
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE

private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
Expand All @@ -118,13 +122,6 @@ class ControllerServer(
true
}

private def doRemoteKraftSetup(): Unit = {
// Explicitly configure metric reporters on this remote controller.
// We do not yet support dynamic reconfiguration on remote controllers in general;
// remove this once that is implemented.
new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
}

def clusterId: String = sharedServer.metaProps.clusterId

def startup(): Unit = {
Expand Down Expand Up @@ -243,11 +240,6 @@ class ControllerServer(
}
controller = controllerBuilder.build()

// Perform any setup that is done only when this node is a controller-only node.
if (!config.processRoles.contains(BrokerRole)) {
doRemoteKraftSetup()
}

if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
val migrationClient = new ZkMigrationClient(zkClient)
Expand All @@ -272,6 +264,7 @@ class ControllerServer(
metrics,
time,
threadNamePrefix)
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
Expand Down Expand Up @@ -309,6 +302,26 @@ class ControllerServer(
FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
// We must install the below publisher and receive the changes when we are also running the broker role
// because we don't share a single KafkaConfig instance with the broker, and therefore
// the broker's DynamicConfigPublisher won't take care of the changes for us.
val dynamicConfigHandlers = immutable.Map[String, ConfigHandler](
// controllers don't host topics, so no need to do anything with dynamic topic config changes here
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
val dynamicConfigPublisher = new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
dynamicConfigHandlers,
"controller")
val dynamicClientQuotaPublisher = new DynamicClientQuotaPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
clientQuotaMetadataManager)
FutureUtils.waitWithLogging(logger.underlying, "all of the dynamic config and client quota publishers to be installed",
sharedServer.loader.installPublishers(List(dynamicConfigPublisher, dynamicClientQuotaPublisher).asJava), startupDeadline, time)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
Expand Down
99 changes: 71 additions & 28 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
Expand Down Expand Up @@ -262,15 +263,35 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))

addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
}

/**
* Add reconfigurables to be notified when a dynamic controller config is updated.
*/
def addReconfigurables(controller: ControllerServer): Unit = {
controller.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
if (!kafkaConfig.processRoles.contains(BrokerRole)) {
// only add these if the controller isn't also running the broker role
// because these would already be added via the broker in that case
addReconfigurable(controller.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId, controller.config, controller.metrics, controller.clusterId))
}
addReconfigurable(new DynamicClientQuotaCallback(controller.quotaManagers, controller.config))
addBrokerReconfigurable(new ControllerDynamicThreadPool(controller))
// TODO: addBrokerReconfigurable(new DynamicListenerConfig(controller))
addBrokerReconfigurable(controller.socketServer)
}

def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
reconfigurables.add(reconfigurable)
Expand Down Expand Up @@ -704,19 +725,12 @@ object DynamicThreadPool {
KafkaConfig.NumReplicaFetchersProp,
KafkaConfig.NumRecoveryThreadsPerDataDirProp,
KafkaConfig.BackgroundThreadsProp)
}

class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {

override def reconfigurableConfigs: Set[String] = {
DynamicThreadPool.ReconfigurableConfigs
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
def validateReconfiguration(currentConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (DynamicThreadPool.ReconfigurableConfigs.contains(k)) {
if (ReconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = currentValue(k)
val oldValue = getValue(currentConfig, k)
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
if (newValue <= 0)
Expand All @@ -730,6 +744,43 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
}
}

def getValue(config: KafkaConfig, name: String): Int = {
name match {
case KafkaConfig.NumIoThreadsProp => config.numIoThreads
case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => config.numRecoveryThreadsPerDataDir
case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
case n => throw new IllegalStateException(s"Unexpected config $n")
}
}
}

class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {

override def reconfigurableConfigs: Set[String] = {
Set(KafkaConfig.NumIoThreadsProp)
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
DynamicThreadPool.validateReconfiguration(controller.config, newConfig) // common validation
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
controller.controllerApisHandlerPool.resizeThreadPool(newConfig.numIoThreads)
}
}

class BrokerDynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {

override def reconfigurableConfigs: Set[String] = {
DynamicThreadPool.ReconfigurableConfigs
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
DynamicThreadPool.validateReconfiguration(server.config, newConfig)
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
Expand All @@ -740,16 +791,6 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
}

private def currentValue(name: String): Int = {
name match {
case KafkaConfig.NumIoThreadsProp => server.config.numIoThreads
case KafkaConfig.NumReplicaFetchersProp => server.config.numReplicaFetchers
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => server.config.numRecoveryThreadsPerDataDir
case KafkaConfig.BackgroundThreadsProp => server.config.backgroundThreads
case n => throw new IllegalStateException(s"Unexpected config $n")
}
}
}

class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
Expand Down Expand Up @@ -911,31 +952,33 @@ object DynamicListenerConfig {
)
}

class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
class DynamicClientQuotaCallback(
quotaManagers: QuotaFactory.QuotaManagers,
serverConfig: KafkaConfig
) extends Reconfigurable {

override def configure(configs: util.Map[String, _]): Unit = {}

override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
server.quotaManagers.clientQuotaCallback.foreach {
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
case _ =>
}
configs
}

override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
server.quotaManagers.clientQuotaCallback.foreach {
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => callback.validateReconfiguration(configs)
case _ =>
}
}

override def reconfigure(configs: util.Map[String, _]): Unit = {
val config = server.config
server.quotaManagers.clientQuotaCallback.foreach {
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
config.dynamicConfig.maybeReconfigure(callback, config.dynamicConfig.currentKafkaConfig, configs)
serverConfig.dynamicConfig.maybeReconfigure(callback, serverConfig.dynamicConfig.currentKafkaConfig, configs)
true
case _ => false
}
Expand Down
Loading

0 comments on commit e3817ca

Please sign in to comment.