在broker当选为controller之后,controller会在zk上注册一堆的handler, 处理broker/topic/partions等变化
private def onControllerFailover(): Unit = {
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
//...other code
}
BrokerChangeHandler
, 处理broker上线下线
主要更新controller中的cache,并且controller发送sendUpdateMetadata通知所有的borker更新metadata.