Skip to content

Commit

Permalink
KAFKA-10312; Fix error code returned in Metadata response when leader…
Browse files Browse the repository at this point in the history
… is not available (apache#9112)

MetadataCache#getPartitionMetadata returns an error when the topic's leader Id
is present at MetadataCache but listener endpoint is not present for this leader.
For older versions, LEADER_NOT_AVAILABLE is returned while LISTENER_NOT_FOUND is
returned for new metadata versions.

The problem is that getPartitionMetadata was looking up MetadataCache's host brokerId rather
than the topic's leader id while determining what error to return. This
could result in the call returning LISTENER_NOT_FOUND when it should
have returned LEADER_NOT_AVAILABLE. This commit corrects this behavior.

Unit tests were already present to test out the error codes returned
under different situations but they were giving out a false positive.
The test was using same broker id for both the MetadataCache's host as
well as for the topic's leader. Error manifests when the MetadataCache's
host id is changed. Improved the test.

This commit also consolidated couple of related tests to reduce code
duplication.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
RamanVerma authored Aug 24, 2020
1 parent 8af7b96 commit f19cd6c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MetadataCache(brokerId: Int) extends Logging {

maybeLeader match {
case None =>
val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) { // we are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition: leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
Expand Down
49 changes: 14 additions & 35 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ class MetadataCacheTest {
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)).asJava))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
val metadataCacheBrokerId = 0
// leader is not available. expect LEADER_NOT_AVAILABLE for any metadata version.
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
}

@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
// when listener name is not present in the metadata cache for a broker, getTopicMetadata should
// return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and new versions respectively.
val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
Expand All @@ -187,49 +191,24 @@ class MetadataCacheTest {
new UpdateMetadataBroker()
.setId(1)
.setEndpoints(broker1Endpoints.asJava))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
val metadataCacheBrokerId = 0
// leader available in cache but listener name not present. expect LISTENER_NOT_FOUND error for new metadata version
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
}

@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
new UpdateMetadataEndpoint()
.setHost("host0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListenerName.value),
new UpdateMetadataEndpoint()
.setHost("host0")
.setPort(9093)
.setSecurityProtocol(SecurityProtocol.SSL.id)
.setListener(sslListenerName.value))
val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
.setHost("host1")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListenerName.value))
val brokers = Seq(
new UpdateMetadataBroker()
.setId(0)
.setEndpoints(broker0Endpoints.asJava),
new UpdateMetadataBroker()
.setId(1)
.setEndpoints(broker1Endpoints.asJava))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
// leader available in cache but listener name not present. expect LEADER_NOT_AVAILABLE error for old metadata version
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
}

private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Seq[UpdateMetadataBroker],
private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId: Int,
brokers: Seq[UpdateMetadataBroker],
listenerName: ListenerName,
leader: Int,
expectedError: Errors,
errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"

val cache = new MetadataCache(1)
val cache = new MetadataCache(metadataCacheBrokerId)

val zkVersion = 3
val controllerId = 2
Expand Down

0 comments on commit f19cd6c

Please sign in to comment.