Skip to content

Commit

Permalink
KAFKA-13078: Fix a bug where we were closing the RawSnapshotWriter to…
Browse files Browse the repository at this point in the history
… early (apache#11040)

Reviewers: David Arthur <[email protected]>
  • Loading branch information
jsancio authored Jul 13, 2021
1 parent d33a874 commit 0f00f36
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/FollowerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ public Optional<RawSnapshotWriter> fetchingSnapshot() {
return fetchingSnapshot;
}

public void setFetchingSnapshot(Optional<RawSnapshotWriter> fetchingSnapshot) {
public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {
if (fetchingSnapshot.isPresent()) {
fetchingSnapshot.get().close();
}
this.fetchingSnapshot = fetchingSnapshot;
fetchingSnapshot = newSnapshot;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public long sizeInBytes() {
return channel.size();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error calculating snapshot size.temp path = %s, snapshotId = %s.",
this.tempSnapshotPath, this.snapshotId), e);
String.format(
"Error calculating snapshot size. temp path = %s, snapshotId = %s.",
tempSnapshotPath,
snapshotId),
e
);
}
}

Expand Down Expand Up @@ -177,14 +181,17 @@ public static FileRawSnapshotWriter create(
try {
return new FileRawSnapshotWriter(
path,
FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)),
FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.APPEND),
snapshotId,
replicatedLog
);
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error creating snapshot writer, " +
"temp path = %s, snapshotId %s.", path, snapshotId),
String.format(
"Error creating snapshot writer. path = %s, snapshotId %s.",
path,
snapshotId
),
e
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class MockRawSnapshotWriter implements RawSnapshotWriter {
private final Consumer<ByteBuffer> frozenHandler;

private boolean frozen = false;
private boolean closed = false;

public MockRawSnapshotWriter(
OffsetAndEpoch snapshotId,
Expand All @@ -45,19 +46,19 @@ public OffsetAndEpoch snapshotId() {

@Override
public long sizeInBytes() {
ensureNotFrozen();
ensureNotFrozenOrClosed();
return data.position();
}

@Override
public void append(UnalignedMemoryRecords records) {
ensureNotFrozen();
ensureNotFrozenOrClosed();
data.write(records.buffer());
}

@Override
public void append(MemoryRecords records) {
ensureNotFrozen();
ensureNotFrozenOrClosed();
data.write(records.buffer());
}

Expand All @@ -68,7 +69,7 @@ public boolean isFrozen() {

@Override
public void freeze() {
ensureNotFrozen();
ensureNotFrozenOrClosed();

frozen = true;
ByteBuffer buffer = data.buffer();
Expand All @@ -78,16 +79,26 @@ public void freeze() {
}

@Override
public void close() {}
public void close() {
ensureOpen();
closed = true;
}

@Override
public String toString() {
return String.format("MockRawSnapshotWriter(snapshotId=%s, data=%s)", snapshotId, data.buffer());
}

private void ensureNotFrozen() {
private void ensureNotFrozenOrClosed() {
if (frozen) {
throw new IllegalStateException("Snapshot is already frozen " + snapshotId);
}
ensureOpen();
}

private void ensureOpen() {
if (closed) {
throw new IllegalStateException("Snapshot is already closed " + snapshotId);
}
}
}

0 comments on commit 0f00f36

Please sign in to comment.