Skip to content

Commit

Permalink
KAFKA-5634; Do not allow segment deletion beyond high watermark
Browse files Browse the repository at this point in the history
This patch changes the segment deletion behavior to take the high watermark of the partition into account. In particular, segments containing offsets equal to or larger than the high watermark are no longer eligible for deletion. This is needed to ensure that the log start offset reported in fetch responses does not get ahead of the high watermark.

Impact: segment deletion may be delayed compared to existing behavior since the broker must await advancement of the high watermark. For topics with heavy load, this may make the active segment effectively ineligible for deletion since the high watermark may never catch up to the log end offset.

Author: Jason Gustafson <[email protected]>

Reviewers: Jiangjie Qin <[email protected]>, Apurva Mehta <[email protected]>, Ismael Juma <[email protected]>

Closes apache#3575 from hachikuji/KAFKA-5634
  • Loading branch information
hachikuji committed Jul 27, 2017
1 parent 9d6020e commit 6bd7302
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 81 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class Partition(val topic: String,
if (!offsetMap.contains(topicPartition))
info(s"No checkpointed highwatermark is found for partition $topicPartition")
val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
new Replica(replicaId, this, time, offset, Some(log))
} else new Replica(replicaId, this, time)
new Replica(replicaId, topicPartition, time, offset, Some(log))
} else new Replica(replicaId, topicPartition, time)
})
}

Expand Down
30 changes: 18 additions & 12 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import kafka.log.Log
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Time

class Replica(val brokerId: Int,
val partition: Partition,
val topicPartition: TopicPartition,
time: Time = Time.SYSTEM,
initialHighWatermarkValue: Long = 0L,
val log: Option[Log] = None) extends Logging {
Expand All @@ -50,14 +51,15 @@ class Replica(val brokerId: Int,
// the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
@volatile private[this] var _lastCaughtUpTimeMs = 0L

val topicPartition = partition.topicPartition

def isLocal: Boolean = log.isDefined

def lastCaughtUpTimeMs = _lastCaughtUpTimeMs

val epochs = log.map(_.leaderEpochCache)

info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))

/*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
Expand Down Expand Up @@ -97,18 +99,22 @@ class Replica(val brokerId: Int,
}
}

def logEndOffset =
def logEndOffset: LogOffsetMetadata =
if (isLocal)
log.get.logEndOffsetMetadata
else
logEndOffsetMetadata

def maybeIncrementLogStartOffset(offset: Long) {
/**
* Increment the log start offset if the new offset is greater than the previous log start offset. The replica
* must be local and the new log start offset must be lower than the current high watermark.
*/
def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
if (isLocal) {
if (highWatermark.messageOffset < offset)
throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than the high watermark" +
s" ${highWatermark.messageOffset} of the partition $topicPartition")
log.get.maybeIncrementLogStartOffset(offset)
if (newLogStartOffset > highWatermark.messageOffset)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark ${highWatermark.messageOffset}")
log.get.maybeIncrementLogStartOffset(newLogStartOffset)
} else {
throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
}
Expand All @@ -124,7 +130,7 @@ class Replica(val brokerId: Int,
}
}

def logStartOffset =
def logStartOffset: Long =
if (isLocal)
log.get.logStartOffset
else
Expand Down Expand Up @@ -177,8 +183,8 @@ class Replica(val brokerId: Int,
override def toString: String = {
val replicaString = new StringBuilder
replicaString.append("ReplicaId: " + brokerId)
replicaString.append("; Topic: " + partition.topic)
replicaString.append("; Partition: " + partition.partitionId)
replicaString.append("; Topic: " + topicPartition.topic)
replicaString.append("; Partition: " + topicPartition.partition)
replicaString.append("; isLocal: " + isLocal)
replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
if (isLocal) {
Expand Down
107 changes: 68 additions & 39 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import kafka.common.{InvalidOffsetException, KafkaException, LongRef}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.{KafkaStorageException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, mutable}
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
Expand Down Expand Up @@ -169,6 +169,13 @@ class Log(@volatile var dir: File,
*/
@volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None

/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
*/
@volatile private var replicaHighWatermark: Option[Long] = None

/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

Expand Down Expand Up @@ -715,6 +722,7 @@ class Log(@volatile var dir: File,

def onHighWatermarkIncremented(highWatermark: Long): Unit = {
lock synchronized {
replicaHighWatermark = Some(highWatermark)
producerStateManager.onHighWatermarkUpdated(highWatermark)
updateFirstUnstableOffset()
}
Expand All @@ -739,14 +747,15 @@ class Log(@volatile var dir: File,
/**
* Increment the log start offset if the provided offset is larger.
*/
def maybeIncrementLogStartOffset(offset: Long) {
def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
// We don't have to write the log start offset to log-start-offset-checkpoint immediately.
// The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $offset in dir ${dir.getParent}") {
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
if (offset > logStartOffset) {
logStartOffset = offset
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset in dir ${dir.getParent}")
logStartOffset = newLogStartOffset
leaderEpochCache.clearAndFlushEarliest(logStartOffset)
producerStateManager.truncateHead(logStartOffset)
updateFirstUnstableOffset()
Expand Down Expand Up @@ -1078,12 +1087,15 @@ class Log(@volatile var dir: File,
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.
*
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @param predicate A function that takes in a candidate log segment and the next higher segment
* (if there is one) and returns true iff it is deletable
* @return The number of segments deleted
*/
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
deleteSegments(deletable)
}
}
Expand All @@ -1106,60 +1118,77 @@ class Log(@volatile var dir: File,
}

/**
* Find segments starting from the oldest until the user-supplied predicate is false.
* A final segment that is empty will never be returned (since we would just end up re-creating it).
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return the segments ready to be deleted
*/
private def deletableSegments(predicate: LogSegment => Boolean) = {
val lastEntry = segments.lastEntry
if (lastEntry == null) Seq.empty
else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
* Find segments starting from the oldest until the user-supplied predicate is false or the segment
* containing the current high watermark is reached. We do not delete segments with offsets at or beyond
* the high watermark to ensure that the log start offset can never exceed it. If the high watermark
* has not yet been initialized, no segments are eligible for deletion.
*
* A final segment that is empty will never be returned (since we would just end up re-creating it).
*
* @param predicate A function that takes in a candidate log segment and the next higher segment
* (if there is one) and returns true iff it is deletable
* @return the segments ready to be deleted
*/
private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
if (segments.isEmpty || replicaHighWatermark.isEmpty) {
Seq.empty
} else {
val highWatermark = replicaHighWatermark.get
val deletable = ArrayBuffer.empty[LogSegment]
var segmentEntry = segments.firstEntry
while (segmentEntry != null) {
val segment = segmentEntry.getValue
val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
(nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
else
(null, logEndOffset, segment.size == 0)

if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
deletable += segment
segmentEntry = nextSegmentEntry
} else {
segmentEntry = null
}
}
deletable
}
}

/**
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
*/
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
}

private def deleteRetentionMsBreachedSegments() : Int = {
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
reason = s"retention time ${config.retentionMs}ms breach")
}

private def deleteRetentionSizeBreachedSegments() : Int = {
private def deleteRetentionSizeBreachedSegments(): Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
var diff = size - config.retentionSize
def shouldDelete(segment: LogSegment) = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
if (diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
deleteOldSegments(shouldDelete)
deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
}

private def deleteLogStartOffsetBreachedSegments() : Int = {
// keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
lock synchronized {
val deletable = {
if (segments.size() < 2)
Seq.empty
else
logSegments.sliding(2).takeWhile { iterable =>
val nextSegment = iterable.toSeq(1)
nextSegment.baseOffset <= logStartOffset
}.map(_.toSeq(0)).toSeq
}
deleteSegments(deletable)
}
private def deleteLogStartOffsetBreachedSegments(): Int = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ class ReplicaManager(val config: KafkaConfig,
val replicas = allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId))
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
for ((dir, reps) <- replicasByDir) {
val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap
val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
try {
highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
} catch {
Expand Down
Loading

0 comments on commit 6bd7302

Please sign in to comment.