Skip to content

Commit

Permalink
KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and …
Browse files Browse the repository at this point in the history
…controllers

ZkMetadataCache could theoretically return KRaft controller information from a call to
ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the
set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider
during ZK migration, where it allowed ZK brokers in migration mode to forward requests to
kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply
delegate to quorumControllerNodeProvider in this case.

Reviewers: José Armando García Sancio <[email protected]>
  • Loading branch information
cmccabe committed May 24, 2024
1 parent 90892ae commit 4f55786
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 40 deletions.
17 changes: 7 additions & 10 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ class KafkaServer(

var kafkaScheduler: KafkaScheduler = _

var kraftControllerNodes: Seq[Node] = _
@volatile var metadataCache: ZkMetadataCache = _

@volatile var quorumControllerNodeProvider: RaftControllerNodeProvider = _

var quotaManagers: QuotaFactory.QuotaManagers = _

val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
Expand Down Expand Up @@ -324,20 +326,13 @@ class KafkaServer(

remoteLogManagerOpt = createRemoteLogManager()

if (config.migrationEnabled) {
kraftControllerNodes = QuorumConfig.voterConnectionsToNodes(
QuorumConfig.parseVoterConnections(config.quorumVoters)
).asScala
} else {
kraftControllerNodes = Seq.empty
}
metadataCache = MetadataCache.zkMetadataCache(
config.brokerId,
config.interBrokerProtocolVersion,
brokerFeatures,
kraftControllerNodes,
config.migrationEnabled)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config,
() => Option(quorumControllerNodeProvider).map(_.getControllerInfo()))

/* initialize feature change listener */
_featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
Expand Down Expand Up @@ -1075,6 +1070,8 @@ class KafkaServer(
}
_brokerState = BrokerState.NOT_RUNNING

quorumControllerNodeProvider = null

startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node],
zkMigrationEnabled: Boolean = false)
: ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes, zkMigrationEnabled)
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled)
}

def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,15 @@ trait ControllerNodeProvider {

class MetadataCacheControllerNodeProvider(
val metadataCache: ZkMetadataCache,
val config: KafkaConfig
val config: KafkaConfig,
val quorumControllerNodeProvider: () => Option[ControllerInformation]
) extends ControllerNodeProvider {

private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol

private val kraftControllerListenerName = if (config.controllerListenerNames.nonEmpty)
new ListenerName(config.controllerListenerNames.head) else null
private val kraftControllerSecurityProtocol = Option(kraftControllerListenerName)
.map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse(
listener, SecurityProtocol.forName(kraftControllerListenerName.value())))
.orNull
private val kraftControllerSaslMechanism = config.saslMechanismControllerProtocol

private val emptyZkControllerInfo = ControllerInformation(
val emptyZkControllerInfo = ControllerInformation(
None,
zkControllerListenerName,
zkControllerSecurityProtocol,
Expand All @@ -85,12 +78,8 @@ class MetadataCacheControllerNodeProvider(
zkControllerSecurityProtocol,
zkControllerSaslMechanism,
isZkController = true)
case KRaftCachedControllerId(id) => ControllerInformation(
metadataCache.getAliveBrokerNode(id, kraftControllerListenerName),
kraftControllerListenerName,
kraftControllerSecurityProtocol,
kraftControllerSaslMechanism,
isZkController = false)
case KRaftCachedControllerId(_) =>
quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo)
}.getOrElse(emptyZkControllerInfo)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures,
kraftControllerNodes: Seq[Node] = Seq.empty,
zkMigrationEnabled: Boolean = false)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {

Expand All @@ -182,8 +181,6 @@ class ZkMetadataCache(
private val featureLock = new ReentrantLock()
private val featureCond = featureLock.newCondition()

private val kraftControllerNodeMap = kraftControllerNodes.map(node => node.id() -> node).toMap

// This method is the main hotspot when it comes to the performance of metadata requests,
// we should be careful about adding additional logic here. Relatedly, `brokers` is
// `List[Integer]` instead of `List[Int]` to avoid a collection copy.
Expand Down Expand Up @@ -350,11 +347,7 @@ class ZkMetadataCache(

override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = {
val snapshot = metadataSnapshot
brokerId match {
case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) =>
kraftControllerNodeMap.get(id)
case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
}
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
}

override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public RecordsSend<? extends BaseRecords> toSend() {

// TODO: fix to support raft
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null, false);
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), false);
metadataCache.updateMetadata(0, updateMetadataRequest);

replicaManager = new ReplicaManagerBuilder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class MetadataRequestBenchmark {
private final Metrics metrics = new Metrics();
private final int brokerId = 1;
private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), null, false);
MetadataVersion.latestTesting(), BrokerFeatures.createEmpty(), false);
private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void setup() {
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),
BrokerFeatures.createEmpty(), null, false);
BrokerFeatures.createEmpty(), false);
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Properties getEntityConfigs(String rootEntityType, String sanitizedEntity
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(),
null, false)).
false)).
setLogDirFailureChannel(failureChannel).
setAlterPartitionManager(alterPartitionManager).
build();
Expand Down

0 comments on commit 4f55786

Please sign in to comment.