Skip to content

Commit

Permalink
KAFKA-9189: Use MetadataCache instead of ZK during controlled shutdow…
Browse files Browse the repository at this point in the history
…n to avoid hang (apache#10361)

This avoids hanging during shutdown if ZK is unavailable. We could change ZK
calls to get the controller id and the broker information to have a timeout, but I
think this approach is better.

The downside is that the metadata cache may be slightly out of date, but we will
retry as per the controlled shutdown configuration. If this broker is partitioned
away from the Controller and is not receiving metadata updates, then we want
to shutdown asap anyway.

I added a test that timed out without this change and included a couple of clean-ups
in `ServerShutdownTest`:
* Removed `testCleanShutdownWithDeleteTopicEnabled`, which is redundant
since delete topics is enabled by default.
* Removed redundant method arguments

Reviewers: David Jacot <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
ijuma authored Mar 19, 2021
1 parent 69eebbf commit ca166ea
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
18 changes: 7 additions & 11 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
Expand All @@ -32,7 +31,7 @@ import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.server.metadata.ZkConfigRepository
import kafka.server.metadata.{MetadataBroker, ZkConfigRepository}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
Expand Down Expand Up @@ -498,7 +497,7 @@ class KafkaServer(
*/
private def controlledShutdown(): Unit = {

def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
def node(broker: MetadataBroker): Node = broker.node(config.interBrokerListenerName)

val socketTimeoutMs = config.controllerSocketTimeoutMs

Expand Down Expand Up @@ -549,20 +548,17 @@ class KafkaServer(
try {

var remainingRetries = retries
var prevController: Broker = null
var prevController: MetadataBroker = null
var ioException = false

while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1

// 1. Find the controller and establish a connection to it.

// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request.
// If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
zkClient.getControllerId match {
metadataCache.getControllerId match {
case Some(controllerId) =>
zkClient.getBroker(controllerId) match {
metadataCache.getAliveBroker(controllerId) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
Expand All @@ -577,10 +573,10 @@ class KafkaServer(
metadataUpdater.setNodes(Seq(node(prevController)).asJava)
}
case None =>
info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
info(s"Broker registration for controller $controllerId is not available in the metadata cache")
}
case None =>
info("No controller registered in ZooKeeper")
info("No controller present in the metadata cache")
}

// 2. issue a controlled shutdown to the controller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ case class MetadataBroker(id: Int,
case Some(node) => new BrokerEndPoint(node.id, node.host, node.port)
}
}

def node(listenerName: ListenerName): Node = {
endpoints.getOrElse(listenerName.value, throw new BrokerEndPointNotAvailableException(
s"End point with listener name ${listenerName.value} not found for broker $id"))
}
}

class MetadataBrokersBuilder(log: Logger, prevBrokers: MetadataBrokers) {
Expand Down
35 changes: 17 additions & 18 deletions core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package kafka.server
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils._

import java.io.{DataInputStream, File}
import java.net.ServerSocket
import java.util.Collections
import java.util.concurrent.{Executors, TimeUnit}

import kafka.cluster.Broker
import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
import kafka.log.LogManager
Expand All @@ -40,12 +40,13 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
import org.junit.jupiter.api.Assertions._

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

@Timeout(60)
class ServerShutdownTest extends ZooKeeperTestHarness {
var config: KafkaConfig = null
val host = "localhost"
Expand Down Expand Up @@ -83,7 +84,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
var producer = createProducer(server)

// create topic
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
createTopic(zkClient, topic, servers = Seq(server))

// send some messages
sent1.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get)
Expand Down Expand Up @@ -124,19 +125,6 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
verifyNonDaemonThreadsStatus()
}

@Test
def testCleanShutdownWithDeleteTopicEnabled(): Unit = {
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("delete.topic.enable", "true")
val newConfig = KafkaConfig.fromProps(newProps)
val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
server.startup()
server.shutdown()
server.awaitShutdown()
CoreUtils.delete(server.config.logDirs)
verifyNonDaemonThreadsStatus()
}

@Test
def testCleanShutdownAfterFailedStartup(): Unit = {
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
Expand All @@ -148,9 +136,9 @@ class ServerShutdownTest extends ZooKeeperTestHarness {

@Test
def testCleanShutdownAfterFailedStartupDueToCorruptLogs(): Unit = {
val server = new KafkaServer(config)
val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
server.startup()
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
createTopic(zkClient, topic, servers = Seq(server))
server.shutdown()
server.awaitShutdown()
config.logDirs.foreach { dirName =>
Expand All @@ -160,6 +148,17 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
verifyCleanShutdownAfterFailedStartup[KafkaStorageException](config)
}

@Test
def testCleanShutdownWithZkUnavailable(): Unit = {
val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
server.startup()
shutdownZooKeeper()
server.shutdown()
server.awaitShutdown()
CoreUtils.delete(server.config.logDirs)
verifyNonDaemonThreadsStatus()
}

private def verifyCleanShutdownAfterFailedStartup[E <: Exception](config: KafkaConfig)(implicit exceptionClassTag: ClassTag[E]): Unit = {
val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
try {
Expand Down
8 changes: 6 additions & 2 deletions core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ abstract class ZooKeeperTestHarness extends Logging {

@AfterEach
def tearDown(): Unit = {
shutdownZooKeeper()
Configuration.setConfiguration(null)
}

def shutdownZooKeeper(): Unit = {
if (zkClient != null)
zkClient.close()
zkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
}

// Trigger session expiry by reusing the session id in another client
Expand Down

0 comments on commit ca166ea

Please sign in to comment.