Skip to content

Commit

Permalink
KAFKA-13098: Fix NoSuchFileException during snapshot recovery (apache…
Browse files Browse the repository at this point in the history
…#11071)

Fix a bug where if a snapshot file is deleted while we're running snapshot recovery,
a NoSuchFileException will be thrown and snapshot recovery will fail.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
jsancio authored Jul 16, 2021
1 parent 13b2df7 commit 8e48212
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots}

import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
Expand Down Expand Up @@ -604,17 +604,11 @@ object KafkaMetadataLog {
val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
// Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
// from logStartOffset
Files
.walk(log.dir.toPath, 1)
.map[Optional[SnapshotPath]] { path =>
if (path != log.dir.toPath) {
Snapshots.parse(path)
} else {
Optional.empty()
}
}
.forEach { path =>
path.ifPresent { snapshotPath =>
val filesInDir = Files.newDirectoryStream(log.dir.toPath)

try {
filesInDir.forEach { path =>
Snapshots.parse(path).ifPresent { snapshotPath =>
if (snapshotPath.partial ||
snapshotPath.deleted ||
snapshotPath.snapshotId.offset < log.logStartOffset) {
Expand All @@ -625,6 +619,10 @@ object KafkaMetadataLog {
}
}
}
} finally {
filesInDir.close()
}

snapshots
}

Expand Down

0 comments on commit 8e48212

Please sign in to comment.