Skip to content

Commit

Permalink
KAFKA-8362: fix the old checkpoint won't be removed after alter log d…
Browse files Browse the repository at this point in the history
…ir (apache#9178)

In KIP-113, we support replicas movement between log directories. But while the directory change, we forgot to remove the topicPartition offset data in old directory, which will cause there are more than 1 checkpoint copy stayed in the logs for the altered topicPartition. And it'll let the LogCleaner get stuck due to it's possible to always get the old topicPartition offset data from the old checkpoint file.

I added one more parameter topicPartitionToBeRemoved in updateCheckpoints() method. So, if the update parameter is None (as before), we'll do the remove action to remove the topicPartitionToBeRemoved data in dir, otherwise, update the data as before.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
showuon authored Sep 14, 2020
1 parent 77f6175 commit 77a0bba
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 26 deletions.
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
}

/**
* Update checkpoint file, removing topics and partitions that no longer exist
* Update checkpoint file to remove partitions if necessary.
*/
def updateCheckpoints(dataDir: File): Unit = {
cleanerManager.updateCheckpoints(dataDir, update=None)
def updateCheckpoints(dataDir: File, partitionToRemove: Option[TopicPartition] = None): Unit = {
cleanerManager.updateCheckpoints(dataDir, partitionToRemove = partitionToRemove)
}

/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir)
}

/**
* Stop cleaning logs in the provided directory
*
* @param dir the absolute path of the log dir
*/
def handleLogDirFailure(dir: String): Unit = {
cleanerManager.handleLogDirFailure(dir)
}
Expand Down
51 changes: 42 additions & 9 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
updateCheckpoints(log.parentDirFile, partitionToUpdateOrAdd = Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs)

Expand Down Expand Up @@ -354,13 +354,32 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
/**
* Update checkpoint file, adding or removing partitions if necessary.
*
* @param dataDir The File object to be updated
* @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
* @param topicPartitionToBeRemoved The TopicPartition to be removed
*/
def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Option[(TopicPartition, Long)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
if (checkpoint != null) {
try {
val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update
checkpoint.write(existing)
val currentCheckpoint = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap
// remove the partition offset if any
var updatedCheckpoint = partitionToRemove match {
case Some(topicPartion) => currentCheckpoint - topicPartion
case None => currentCheckpoint
}
// update or add the partition offset if any
updatedCheckpoint = partitionToUpdateOrAdd match {
case Some(updatedOffset) => updatedCheckpoint + updatedOffset
case None => updatedCheckpoint
}

checkpoint.write(updatedCheckpoint)
} catch {
case e: KafkaStorageException =>
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
Expand All @@ -369,15 +388,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

/**
* alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
*/
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
inLock(lock) {
try {
checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
case Some(offset) =>
// Remove this partition from the checkpoint file in the source log directory
updateCheckpoints(sourceLogDir, None)
// Add offset for this partition to the checkpoint file in the destination log directory
updateCheckpoints(destLogDir, Option(topicPartition, offset))
debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
s"from ${sourceLogDir.getAbsoluteFile} directory.")
updateCheckpoints(sourceLogDir, partitionToRemove = Option(topicPartition))

debug(s"Adding the partition offset data in checkpoint file for '${topicPartition}' " +
s"to ${destLogDir.getAbsoluteFile} directory.")
updateCheckpoints(destLogDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
case None =>
}
} catch {
Expand All @@ -393,13 +418,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

/**
* Stop cleaning logs in the provided directory
*
* @param dir the absolute path of the log dir
*/
def handleLogDirFailure(dir: String): Unit = {
warn(s"Stopping cleaning logs in dir $dir")
inLock(lock) {
checkpoints = checkpoints.filter { case (k, _) => k.getAbsolutePath != dir }
}
}

/**
* Truncate the checkpointed offset for the given partition if its checkpointed offset is larger than the given offset
*/
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long): Unit = {
inLock(lock) {
if (logs.get(topicPartition).config.compact) {
Expand All @@ -420,7 +453,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
inLock(lock) {
inProgress.get(topicPartition) match {
case Some(LogCleaningInProgress) =>
updateCheckpoints(dataDir, Option(topicPartition, endOffset))
updateCheckpoints(dataDir, partitionToUpdateOrAdd = Option(topicPartition, endOffset))
inProgress.remove(topicPartition)
case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused(1))
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File],
numRecoveryThreadsPerDataDir = newSize
}

// dir should be an absolute path
/**
* The log directory failure handler. It will stop log cleaning in that directory.
*
* @param dir the absolute path of the log directory
*/
def handleLogDirFailure(dir: String): Unit = {
warn(s"Stopping serving logs in dir $dir")
logCreationOrDeletionLock synchronized {
Expand Down Expand Up @@ -962,8 +966,9 @@ class LogManager(logDirs: Seq[File],
// We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
if (checkpoint)
cleaner.updateCheckpoints(removedLog.parentDirFile)
if (checkpoint) {
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
}
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
if (checkpoint) {
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1729,8 +1729,12 @@ class ReplicaManager(val config: KafkaConfig,
Partition.removeMetrics(tp)
}

// logDir should be an absolute path
// sendZkNotification is needed for unit test
/**
* The log directory failure handler for the replica
*
* @param dir the absolute path of the log directory
* @param sendZkNotification check if we need to send notification to zookeeper node (needed for unit test)
*/
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
if (!logManager.isLogDirOnline(dir))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
}

/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*/
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*
* The format in the LeaderEpoch checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- LeaderEpochCheckpointFile.currentVersion
* 2 <- following entries size
* 0 1 <- the format is: leader_epoch(int32) start_offset(int64)
* 1 2
* -----checkpoint file end----------
*/
class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
import LeaderEpochCheckpointFile._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ trait OffsetCheckpoint {
}

/**
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*/
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*
* The format in the offset checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- OffsetCheckpointFile.currentVersion
* 2 <- following entries size
* tp1 par1 1 <- the format is: TOPIC PARTITION OFFSET
* tp1 par2 2
* -----checkpoint file end----------
*/
class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) {
val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
Expand Down
100 changes: 97 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@ import scala.collection.mutable
class LogCleanerManagerTest extends Logging {

val tmpDir = TestUtils.tempDir()
val tmpDir2 = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val logConfig = LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val offset = 999

val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()

Expand All @@ -55,8 +59,11 @@ class LogCleanerManagerTest extends Logging {
cleanerCheckpoints.toMap
}

override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
val (tp, offset) = update.getOrElse(throw new IllegalArgumentException("update=None argument not yet handled"))
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Option[(TopicPartition,Long)] = None,
partitionToRemove: Option[TopicPartition] = None): Unit = {
assert(partitionToRemove.isEmpty, "partitionToRemove argument with value not yet handled")
val (tp, offset) = partitionToUpdateOrAdd.getOrElse(
throw new IllegalArgumentException("partitionToUpdateOrAdd==None argument not yet handled"))
cleanerCheckpoints.put(tp, offset)
}
}
Expand Down Expand Up @@ -361,6 +368,93 @@ class LogCleanerManagerTest extends Logging {
assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
}

@Test
def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)

// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))

cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
// expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
}

@Test
def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)

// write some data into the cleaner-offset-checkpoint file
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))

// updateCheckpoints should remove the topicPartition data in the logDir
cleanerManager.updateCheckpoints(logDir, partitionToRemove = Option(topicPartition))
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}

@Test
def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)

// write some data into the cleaner-offset-checkpoint file in logDir and logDir2
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
cleanerManager.updateCheckpoints(logDir2, partitionToUpdateOrAdd = Option(topicPartition2, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))

cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
// verify the partition data in logDir is gone, and data in logDir2 is still there
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}

@Test
def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val lowerOffset = 1L
val higherOffset = 1000L

// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))

// we should not truncate the checkpoint data for checkpointed offset <= the given offset (higherOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, higherOffset)
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))
// we should truncate the checkpoint data for checkpointed offset > the given offset (lowerOffset)
cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, lowerOffset)
assertEquals(lowerOffset, cleanerManager.allCleanerCheckpoints(topicPartition))
}

@Test
def testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir(): Unit = {
val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)

// write some data into the cleaner-offset-checkpoint file in logDir
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))

cleanerManager.alterCheckpointDir(topicPartition, logDir, logDir2)
// verify we still can get the partition offset after alterCheckpointDir
// This data should locate in logDir2, not logDir
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition))

// force delete the logDir2 from checkpoints, so that the partition data should also be deleted
cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
}

/**
* log under cleanup should still be eligible for log truncation
*/
Expand Down Expand Up @@ -642,7 +736,7 @@ class LogCleanerManagerTest extends Logging {
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(topicPartition, log)
new LogCleanerManager(Seq(logDir), logs, null)
new LogCleanerManager(Seq(logDir, logDir2), logs, null)
}

private def createCleanerManagerMock(pool: Pool[TopicPartition, Log]): LogCleanerManagerMock = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
// force a checkpoint
// and make sure its gone from checkpoint file
cleaner.logs.remove(topicPartitions(0))
cleaner.updateCheckpoints(logDir)
cleaner.updateCheckpoints(logDir, partitionToRemove = Option(topicPartitions(0)))
val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
// we expect partition 0 to be gone
assertFalse(checkpoints.contains(topicPartitions(0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
* Test whether clients can producer and consume when there is log directory failure
* Test whether clients can produce and consume when there is log directory failure
*/
class LogDirFailureTest extends IntegrationTestHarness {

Expand Down

0 comments on commit 77a0bba

Please sign in to comment.