Skip to content

Commit

Permalink
KAFKA-16509: CurrentControllerId metric is unreliable in ZK mode (apa…
Browse files Browse the repository at this point in the history
…che#15695)

The CurrentControllerId metric added by KIP-1001 is unreliable in ZK
mode. Sometimes when there is no active ZK-based controller, it still
shows the previous controller ID. Instead, it should show -1 in that
situation.

This PR fixes that by using the controller ID from the
KafkaController.scala, which is obtained directly from the controller
znode. It also adds a new test, ControllerIdMetricTest.scala.

Reviewers: David Arthur <[email protected]>
  • Loading branch information
cmccabe authored Apr 11, 2024
1 parent 4aba807 commit b67a3fa
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class KafkaController(val config: KafkaConfig,
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)

@volatile private var activeControllerId = -1
@volatile var activeControllerId = -1
@volatile private var offlinePartitionCount = 0
@volatile private var preferredReplicaImbalanceCount = 0
@volatile private var globalTopicCount = 0
Expand Down
24 changes: 14 additions & 10 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class KafkaServer(

override def logManager: LogManager = _logManager

def kafkaController: KafkaController = _kafkaController
@volatile def kafkaController: KafkaController = _kafkaController

var lifecycleManager: BrokerLifecycleManager = _
private var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
Expand Down Expand Up @@ -657,15 +657,19 @@ class KafkaServer(
}

private def createCurrentControllerIdMetric(): Unit = {
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID, () => {
Option(metadataCache) match {
case None => -1
case Some(cache) => cache.getControllerId match {
case None => -1
case Some(id) => id.id
}
}
})
KafkaYammerMetrics.defaultRegistry().newGauge(MetadataLoaderMetrics.CURRENT_CONTROLLER_ID,
() => getCurrentControllerIdFromOldController())
}

/**
* Get the current controller ID from the old controller code.
* This is the most up-to-date controller ID we can get when in ZK mode.
*/
def getCurrentControllerIdFromOldController(): Int = {
Option(_kafkaController) match {
case None => -1
case Some(controller) => controller.activeControllerId
}
}

private def unregisterCurrentControllerIdMetric(): Unit = {
Expand Down
52 changes: 52 additions & 0 deletions core/src/test/scala/unit/kafka/server/ControllerIdMetricTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.integration.KafkaServerTestHarness
import kafka.utils.TestUtils
import kafka.zk.ZkVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

class ControllerIdMetricTest extends KafkaServerTestHarness {
@Override
def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).
map(KafkaConfig.fromProps(_)).toSeq
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testZkControllerId(): Unit = {
val server = servers.head
TestUtils.retry(30000) {
assertEquals(server.config.brokerId, server.getCurrentControllerIdFromOldController())
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testZkControllerIdWhenZnodeIsDeleted(): Unit = {
val server = servers.head
TestUtils.retry(30000) {
zkClient.deleteController(ZkVersion.MatchAnyVersion)
assertEquals(-1, server.getCurrentControllerIdFromOldController())
}
}
}

0 comments on commit b67a3fa

Please sign in to comment.