Skip to content

Commit

Permalink
KAFKA-2821; fix deadlock in group metadata write callback
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>

Closes apache#519 from hachikuji/KAFKA-2821
  • Loading branch information
Jason Gustafson authored and junrao committed Nov 13, 2015
1 parent 4efe4ac commit 002ec9c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
61 changes: 45 additions & 16 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ class Partition(val topic: String,
* and setting the new leader and ISR
*/
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
partitionStateInfo: PartitionStateInfo, correlationId: Int) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
Expand All @@ -186,8 +186,11 @@ class Partition(val topic: String,
if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(newLeaderReplica)
true
}

// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
}

/**
Expand Down Expand Up @@ -255,7 +258,7 @@ class Partition(val topic: String,
* This function can be triggered when a replica's LEO has incremented
*/
def maybeExpandIsr(replicaId: Int) {
inWriteLock(leaderIsrUpdateLock) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
// check if this replica needs to be added to the ISR
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
Expand All @@ -277,9 +280,13 @@ class Partition(val topic: String,
// since the replica maybe now be in the ISR and its LEO has just incremented
maybeIncrementLeaderHW(leaderReplica)

case None => // nothing to do if no longer leader
case None => false // nothing to do if no longer leader
}
}

// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
}

/*
Expand Down Expand Up @@ -333,28 +340,36 @@ class Partition(val topic: String,
* 1. Partition ISR changed
* 2. Any replica's LEO changed
*
* Returns true if the HW was incremented, and false otherwise.
* Note There is no need to acquire the leaderIsrUpdate lock here
* since all callers of this private API acquire that lock
*/
private def maybeIncrementLeaderHW(leaderReplica: Replica) {
private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
if(oldHighWatermark.precedes(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
// some delayed operations may be unblocked after HW changed
val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
true
} else {
debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
.format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
false
}
}

/**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
}

def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
inWriteLock(leaderIsrUpdateLock) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
Expand All @@ -366,12 +381,20 @@ class Partition(val topic: String,
// update ISR in zk and in cache
updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)

replicaManager.isrShrinkRate.mark()
maybeIncrementLeaderHW(leaderReplica)
} else {
false
}
case None => // do nothing if no longer leader

case None => false // do nothing if no longer leader
}
}

// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
}

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
Expand All @@ -397,7 +420,7 @@ class Partition(val topic: String,
}

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
inReadLock(leaderIsrUpdateLock) {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
Expand All @@ -415,13 +438,19 @@ class Partition(val topic: String,
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
(info, maybeIncrementLeaderHW(leaderReplica))

case None =>
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}

// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()

info
}

private def updateIsr(newIsr: Set[Replica]) {
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.log.LogConfig
import kafka.message.UncompressedCodec
import kafka.message.{Message, UncompressedCodec}
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.protocol.Errors
Expand Down Expand Up @@ -254,6 +254,8 @@ class GroupCoordinator(val brokerId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
var delayedGroupStore: Option[DelayedStore] = None

group synchronized {
if (!group.has(memberId)) {
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
Expand All @@ -279,9 +281,7 @@ class GroupCoordinator(val brokerId: Int,
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

// persist the group metadata and upon finish transition to stable and propagate the assignment
val generationId = group.generationId
groupManager.storeGroup(group, assignment, (errorCode: Short) => {
delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment, (errorCode: Short) => {
group synchronized {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the AwaitingSync state and the same generation
Expand All @@ -296,7 +296,7 @@ class GroupCoordinator(val brokerId: Int,
}
}
}
})
}))
}

case Stable =>
Expand All @@ -307,6 +307,10 @@ class GroupCoordinator(val brokerId: Int,
}
}
}

// store the group metadata without holding the group lock to avoid the potential
// for deadlock when the callback is invoked
delayedGroupStore.foreach(groupManager.store)
}

def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
Expand Down Expand Up @@ -385,6 +389,8 @@ class GroupCoordinator(val brokerId: Int,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
var delayedOffsetStore: Option[DelayedStore] = None

if (!isActive.get) {
responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
} else if (!isCoordinatorForGroup(groupId)) {
Expand All @@ -396,7 +402,8 @@ class GroupCoordinator(val brokerId: Int,
if (group == null) {
if (generationId < 0)
// the group is not relying on Kafka for partition management, so allow the commit
groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata,
responseCallback))
else
// the group has failed over to this coordinator (which will be handled in KAFKA-2017),
// or this is a request coming from an older generation. either way, reject the commit
Expand All @@ -412,11 +419,15 @@ class GroupCoordinator(val brokerId: Int,
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else {
groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
offsetMetadata, responseCallback))
}
}
}
}

// store the offsets without holding the group lock
delayedOffsetStore.foreach(groupManager.store)
}

def handleFetchOffsets(groupId: String,
Expand Down
46 changes: 24 additions & 22 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import com.yammer.metrics.core.Gauge


case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
callback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)

class GroupMetadataManager(val brokerId: Int,
val config: OffsetConfig,
replicaManager: ReplicaManager,
Expand Down Expand Up @@ -165,9 +169,9 @@ class GroupMetadataManager(val brokerId: Int,
}
}

def storeGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Short => Unit) {
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Short => Unit): DelayedStore = {
// construct the message to append
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
Expand All @@ -179,6 +183,8 @@ class GroupMetadataManager(val brokerId: Int,
val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))

val generationId = group.generationId

// set the callback function to insert the created group into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
// the append response should only contain the topics partition
Expand All @@ -193,7 +199,7 @@ class GroupMetadataManager(val brokerId: Int,
var responseCode = Errors.NONE.code
if (status.error != ErrorMapping.NoError) {
debug("Metadata from group %s with generation %d failed when appending to log due to %s"
.format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
.format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))

// transform the log append error code to the corresponding the commit status error code
responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) {
Expand All @@ -205,13 +211,13 @@ class GroupMetadataManager(val brokerId: Int,
|| status.error == ErrorMapping.InvalidFetchSizeCode) {

error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
.format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
.format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))

Errors.UNKNOWN.code
} else {

error("Appending metadata message for group %s generation %d failed due to unexpected error: %s"
.format(group.groupId, group.generationId, status.error))
.format(group.groupId, generationId, status.error))

status.error
}
Expand All @@ -220,25 +226,27 @@ class GroupMetadataManager(val brokerId: Int,
responseCallback(responseCode)
}

DelayedStore(groupMetadataMessageSet, putCacheCallback)
}

def store(delayedAppend: DelayedStore) {
// call replica manager to append the group message
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
groupMetadataMessageSet,
putCacheCallback)
delayedAppend.messageSet,
delayedAppend.callback)
}



/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/
def storeOffsets(groupId: String,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
def prepareStoreOffsets(groupId: String,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
Expand Down Expand Up @@ -304,13 +312,7 @@ class GroupMetadataManager(val brokerId: Int,
responseCallback(commitStatus)
}

// call replica manager to append the offset messages
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
offsetsAndMetadataMessageSet,
putCacheCallback)
DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}

/**
Expand Down

0 comments on commit 002ec9c

Please sign in to comment.