Skip to content

Commit

Permalink
KAFKA-1355; Avoid sending all topic metadata on state changes. Review…
Browse files Browse the repository at this point in the history
…ed by Neha Narkhede and Timothy Chen
  • Loading branch information
jjkoshy committed Apr 7, 2014
1 parent 640f3b0 commit 62fcaed
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 54 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import collection.Set
object LeaderAndIsr {
val initialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0
val NoLeader = -1
val LeaderDuringDelete = -2
}

case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
Expand Down
55 changes: 31 additions & 24 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
leaderAndIsrRequestMap(brokerId).put((topic, partition),
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}

def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
Expand All @@ -232,34 +233,40 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
*
*/
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: (RequestOrResponse) => Unit = null) {
val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile(
p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(partitionList.size > 0) {
partitionList.foreach { partition =>
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
}
case None =>
info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition))
}
}
} else {
if(controllerContext.partitionLeadershipInfo.keySet.size > 0) {
// last set of topics are being deleted
controllerContext.partitionLeadershipInfo.foreach { case(partition, leaderIsrAndControllerEpoch) =>
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.put(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
}
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}

val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}

def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
Expand Down Expand Up @@ -933,9 +933,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* metadata requests
* @param brokers The brokers that the update metadata request should be sent to
*/
def sendUpdateMetadataRequest(brokers: Seq[Int]) {
def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers)
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
}

Expand Down Expand Up @@ -967,7 +967,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
if (leaderAndIsr.isr.contains(replicaId)) {
// if the replica to be removed from the ISR is also the leader, set the new leader value to -1
val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)

// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
Expand Down
17 changes: 11 additions & 6 deletions core/src/main/scala/kafka/controller/TopicDeletionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ class TopicDeletionManager(controller: KafkaController,
val controllerContext = controller.controllerContext
val partitionStateMachine = controller.partitionStateMachine
val replicaStateMachine = controller.replicaStateMachine
var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
val deleteLock = new ReentrantLock()
var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
(initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
val deleteTopicsCond = deleteLock.newCondition()
var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable

Expand All @@ -99,6 +100,7 @@ class TopicDeletionManager(controller: KafkaController,
if(isDeleteTopicEnabled) {
deleteTopicsThread.shutdown()
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
}
}
Expand All @@ -112,6 +114,7 @@ class TopicDeletionManager(controller: KafkaController,
def enqueueTopicsForDeletion(topics: Set[String]) {
if(isDeleteTopicEnabled) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
resumeTopicDeletionThread()
}
}
Expand Down Expand Up @@ -264,6 +267,7 @@ class TopicDeletionManager(controller: KafkaController,
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
Expand All @@ -277,7 +281,8 @@ class TopicDeletionManager(controller: KafkaController,
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
Expand Down Expand Up @@ -322,8 +327,8 @@ class TopicDeletionManager(controller: KafkaController,
/**
* This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted
* It does the following -
* 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) with all partitions except those for
* which the topics are being deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
* 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
* deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
* 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
* and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
* it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
Expand Down
34 changes: 14 additions & 20 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,20 @@ class KafkaApis(val requestChannel: RequestChannel,
// cache the list of alive brokers in the cluster
updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
metadataCache.put(partitionState._1, partitionState._2)
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
// remove the topics that don't exist in the UpdateMetadata request since those are the topics that are
// currently being deleted by the controller
val topicsKnownToThisBroker = metadataCache.map {
case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
val partitionsToBeDeleted = metadataCache.filter {
case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic)
}.keySet
partitionsToBeDeleted.foreach { partition =>
metadataCache.remove(partition)
stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
if (partitionState._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
val partition = partitionState._1
metadataCache.remove(partition)
stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d")
.format(brokerId, partition, updateMetadataRequest.controllerId,
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
} else {
metadataCache.put(partitionState._1, partitionState._2)
stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
"sent by controller %d epoch %d with correlation id %d")
.format(brokerId, partitionState._2, partitionState._1, updateMetadataRequest.controllerId,
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
}
}
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
Expand Down

0 comments on commit 62fcaed

Please sign in to comment.