Skip to content

Commit

Permalink
KAFKA-664 RequestPurgatory should clean up satisfied requests from wa…
Browse files Browse the repository at this point in the history
…tchers map. Also, simplify the purge logic - purge based on an incoming request interval.
  • Loading branch information
jjkoshy committed Dec 18, 2012
1 parent 4e5a6fc commit 04b5274
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 44 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val zkClient: ZkClient,
brokerId: Int) extends Logging {

private val producerRequestPurgatory = new ProducerRequestPurgatory
private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val producerRequestPurgatory =
new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
private val fetchRequestPurgatory =
new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
private val delayedRequestMetrics = new DelayedRequestMetrics

private val requestLogger = Logger.getLogger("kafka.request.logger")
Expand Down Expand Up @@ -496,7 +498,8 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, MessageSet](brokerId) {
class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)

/**
Expand Down Expand Up @@ -633,7 +636,8 @@ class KafkaApis(val requestChannel: RequestChannel,
/**
* A holding pen for produce requests waiting to be satisfied.
*/
private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey] {
private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)

protected def checkSatisfied(followerFetchRequestKey: RequestKey,
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro

/* the frequency with which the highwater mark is saved out to disk */
val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)


/* the purge interval (in number of requests) of the fetch request purgatory */
val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)

/* the purge interval (in number of requests) of the producer request purgatory */
val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)

}
76 changes: 40 additions & 36 deletions core/src/main/scala/kafka/server/RequestPurgatory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package kafka.server

import scala.collection._
import java.util.LinkedList
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.network._
import kafka.utils._
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import java.util
import com.yammer.metrics.core.Gauge


/**
Expand Down Expand Up @@ -61,11 +61,21 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
extends Logging with KafkaMetricsGroup {

/* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))

private val requestCounter = new AtomicInteger(0)

newGauge(
"PurgatorySize",
new Gauge[Int] {
def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
}
)

newGauge(
"NumDelayedRequests",
new Gauge[Int] {
Expand All @@ -78,10 +88,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()

def purgeSatisfied() {
expiredRequestReaper.forcePurge()
}

/**
* Add a new delayed request watching the contained keys
*/
def watch(delayedRequest: T) {
if (requestCounter.getAndIncrement() >= purgeInterval) {
requestCounter.set(0)
purgeSatisfied()
}

for(key <- delayedRequest.keys) {
var lst = watchersFor(key)
lst.add(delayedRequest)
Expand Down Expand Up @@ -125,37 +144,29 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
*/
private class Watchers {

/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5

private val requests = new LinkedList[T]
private val requests = new util.ArrayList[T]

/* you can only change this if you have added something or marked something satisfied */
var liveCount = 0.0
def numRequests = requests.size

def add(t: T) {
synchronized {
requests.add(t)
liveCount += 1
maybePurge()
}
}

private def maybePurge() {
if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
def purgeSatisfied(): Int = {
synchronized {
val iter = requests.iterator()
var purged = 0
while(iter.hasNext) {
val curr = iter.next
if(curr.satisfied.get())
if(curr.satisfied.get()) {
iter.remove()
purged += 1
}
}
}
}

def decLiveCount() {
synchronized {
liveCount -= 1
purged
}
}

Expand All @@ -177,7 +188,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
liveCount -= 1
expiredRequestReaper.satisfyRequest()
}
}
Expand All @@ -193,17 +203,16 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
*/
private class ExpiredRequestReaper extends Runnable with Logging {
this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5

private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
private val needsPurge = new AtomicBoolean(false)

/* The count of elements in the delay queue that are unsatisfied */
private [kafka] val unsatisfied = new AtomicInteger(0)

def numRequests = delayed.size()

/** Main loop for the expiry thread */
def run() {
while(running.get) {
Expand All @@ -214,10 +223,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
}
} catch {
case ie: InterruptedException =>
if(needsPurge.getAndSet(false)) {
val purged = purgeSatisfied()
debug("Forced purge of " + purged + " requests from delay queue.")
}
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
Expand All @@ -229,12 +238,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
def enqueue(t: T) {
delayed.add(t)
unsatisfied.incrementAndGet()
if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
forcePurge()
}

private def forcePurge() {
needsPurge.set(true)
def forcePurge() {
expirationThread.interrupt()
}

Expand All @@ -259,8 +265,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
unsatisfied.getAndDecrement()
for(key <- curr.keys)
watchersFor(key).decLiveCount()
return curr
}
}
Expand All @@ -284,4 +288,4 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
// topic metadata request only requires 1 call from the replica manager
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(replicaManager.config).andReturn(configs.head).times(2)
EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.replay(replicaManager)

// create a topic metadata request
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SimpleFetchTest extends JUnit3Suite {
partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L

EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)

Expand Down Expand Up @@ -175,7 +175,7 @@ class SimpleFetchTest extends JUnit3Suite {
partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]

EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
Expand Down

0 comments on commit 04b5274

Please sign in to comment.