From 4f55786a8a86fe228a0b10a2f28529f5128e5d6f Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Mon, 20 May 2024 15:41:52 -0700 Subject: [PATCH] KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ZkMetadataCache could theoretically return KRaft controller information from a call to ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider during ZK migration, where it allowed ZK brokers in migration mode to forward requests to kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply delegate to quorumControllerNodeProvider in this case. Reviewers: José Armando García Sancio --- .../main/scala/kafka/server/KafkaServer.scala | 17 +++++++-------- .../scala/kafka/server/MetadataCache.scala | 3 +-- .../NodeToControllerChannelManager.scala | 21 +++++-------------- .../server/metadata/ZkMetadataCache.scala | 9 +------- .../ReplicaFetcherThreadBenchmark.java | 2 +- .../metadata/MetadataRequestBenchmark.java | 2 +- .../kafka/jmh/server/CheckpointBench.java | 2 +- .../jmh/server/PartitionCreationBench.java | 2 +- 8 files changed, 18 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 106fd4422598..5531ab1dbc33 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -165,8 +165,10 @@ class KafkaServer( var kafkaScheduler: KafkaScheduler = _ - var kraftControllerNodes: Seq[Node] = _ @volatile var metadataCache: ZkMetadataCache = _ + + @volatile var quorumControllerNodeProvider: RaftControllerNodeProvider = _ + var quotaManagers: QuotaFactory.QuotaManagers = _ val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config) @@ -324,20 +326,13 @@ class KafkaServer( remoteLogManagerOpt = createRemoteLogManager() - if (config.migrationEnabled) { - kraftControllerNodes = QuorumConfig.voterConnectionsToNodes( - QuorumConfig.parseVoterConnections(config.quorumVoters) - ).asScala - } else { - kraftControllerNodes = Seq.empty - } metadataCache = MetadataCache.zkMetadataCache( config.brokerId, config.interBrokerProtocolVersion, brokerFeatures, - kraftControllerNodes, config.migrationEnabled) - val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config) + val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config, + () => Option(quorumControllerNodeProvider).map(_.getControllerInfo())) /* initialize feature change listener */ _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient) @@ -1075,6 +1070,8 @@ class KafkaServer( } _brokerState = BrokerState.NOT_RUNNING + quorumControllerNodeProvider = null + startupComplete.set(false) isShuttingDown.set(false) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 015e46a76523..b8eda3fe4dc3 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -116,10 +116,9 @@ object MetadataCache { def zkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(), - kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node], zkMigrationEnabled: Boolean = false) : ZkMetadataCache = { - new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes, zkMigrationEnabled) + new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled) } def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = { diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index 36997a4ea49e..0017a5876af1 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -55,22 +55,15 @@ trait ControllerNodeProvider { class MetadataCacheControllerNodeProvider( val metadataCache: ZkMetadataCache, - val config: KafkaConfig + val config: KafkaConfig, + val quorumControllerNodeProvider: () => Option[ControllerInformation] ) extends ControllerNodeProvider { private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol - private val kraftControllerListenerName = if (config.controllerListenerNames.nonEmpty) - new ListenerName(config.controllerListenerNames.head) else null - private val kraftControllerSecurityProtocol = Option(kraftControllerListenerName) - .map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse( - listener, SecurityProtocol.forName(kraftControllerListenerName.value()))) - .orNull - private val kraftControllerSaslMechanism = config.saslMechanismControllerProtocol - - private val emptyZkControllerInfo = ControllerInformation( + val emptyZkControllerInfo = ControllerInformation( None, zkControllerListenerName, zkControllerSecurityProtocol, @@ -85,12 +78,8 @@ class MetadataCacheControllerNodeProvider( zkControllerSecurityProtocol, zkControllerSaslMechanism, isZkController = true) - case KRaftCachedControllerId(id) => ControllerInformation( - metadataCache.getAliveBrokerNode(id, kraftControllerListenerName), - kraftControllerListenerName, - kraftControllerSecurityProtocol, - kraftControllerSaslMechanism, - isZkController = false) + case KRaftCachedControllerId(_) => + quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo) }.getOrElse(emptyZkControllerInfo) } } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d13f9e0b7f14..9616f059a0eb 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -158,7 +158,6 @@ class ZkMetadataCache( brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures, - kraftControllerNodes: Seq[Node] = Seq.empty, zkMigrationEnabled: Boolean = false) extends MetadataCache with ZkFinalizedFeatureCache with Logging { @@ -182,8 +181,6 @@ class ZkMetadataCache( private val featureLock = new ReentrantLock() private val featureCond = featureLock.newCondition() - private val kraftControllerNodeMap = kraftControllerNodes.map(node => node.id() -> node).toMap - // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. Relatedly, `brokers` is // `List[Integer]` instead of `List[Int]` to avoid a collection copy. @@ -350,11 +347,7 @@ class ZkMetadataCache( override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { val snapshot = metadataSnapshot - brokerId match { - case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) => - kraftControllerNodeMap.get(id) - case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) - } + snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) } override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ecd644b56dd3..48ff022a79a1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -217,7 +217,7 @@ public RecordsSend toSend() { // TODO: fix to support raft ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0, - config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null, false); + config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), false); metadataCache.updateMetadata(0, updateMetadataRequest); replicaManager = new ReplicaManagerBuilder(). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 1c5e499145fc..38933e7e1d31 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -109,7 +109,7 @@ public class MetadataRequestBenchmark { private final Metrics metrics = new Metrics(); private final int brokerId = 1; private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, - MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), null, false); + MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), false); private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index fb76bab02cfa..3c4e84c6a6de 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -116,7 +116,7 @@ public void setup() { final MetadataCache metadataCache = MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), - BrokerFeatures.createEmpty(), null, false); + BrokerFeatures.createEmpty(), false); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 86db11d4e36b..c38e4b489e26 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -162,7 +162,7 @@ public Properties getEntityConfigs(String rootEntityType, String sanitizedEntity setBrokerTopicStats(brokerTopicStats). setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), - null, false)). + false)). setLogDirFailureChannel(failureChannel). setAlterPartitionManager(alterPartitionManager). build();