Skip to content

Commit

Permalink
KAFKA-5093; Avoid loading full batch data when possible when iteratin…
Browse files Browse the repository at this point in the history
…g FileRecords

Author: Jason Gustafson <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#3160 from hachikuji/KAFKA-5093
  • Loading branch information
hachikuji committed May 31, 2017
1 parent da9a171 commit 81f0c1e
Show file tree
Hide file tree
Showing 13 changed files with 504 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;

Expand Down Expand Up @@ -510,4 +511,76 @@ public int hashCode() {
}
}

static class LegacyFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {

LegacyFileChannelRecordBatch(long offset,
byte magic,
FileChannel channel,
int position,
int batchSize) {
super(offset, magic, channel, position, batchSize);
}

@Override
protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
return new ByteBufferLegacyRecordBatch(buffer);
}

@Override
public long baseOffset() {
return loadFullBatch().baseOffset();
}

@Override
public long lastOffset() {
return offset;
}

@Override
public long producerId() {
return RecordBatch.NO_PRODUCER_ID;
}

@Override
public short producerEpoch() {
return RecordBatch.NO_PRODUCER_EPOCH;
}

@Override
public int baseSequence() {
return RecordBatch.NO_SEQUENCE;
}

@Override
public int lastSequence() {
return RecordBatch.NO_SEQUENCE;
}

@Override
public Integer countOrNull() {
return null;
}

@Override
public boolean isTransactional() {
return false;
}

@Override
public boolean isControlBatch() {
return false;
}

@Override
public int partitionLeaderEpoch() {
return RecordBatch.NO_PARTITION_LEADER_EPOCH;
}

@Override
protected int headerSize() {
return LOG_OVERHEAD + LegacyRecord.headerSize(magic);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -498,4 +499,80 @@ public void remove() {

}

static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {

DefaultFileChannelRecordBatch(long offset,
byte magic,
FileChannel channel,
int position,
int batchSize) {
super(offset, magic, channel, position, batchSize);
}

@Override
protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
return new DefaultRecordBatch(buffer);
}

@Override
public long baseOffset() {
return offset;
}

@Override
public long lastOffset() {
return loadBatchHeader().lastOffset();
}

@Override
public long producerId() {
return loadBatchHeader().producerId();
}

@Override
public short producerEpoch() {
return loadBatchHeader().producerEpoch();
}

@Override
public int baseSequence() {
return loadBatchHeader().baseSequence();
}

@Override
public int lastSequence() {
return loadBatchHeader().lastSequence();
}

@Override
public long checksum() {
return loadBatchHeader().checksum();
}

@Override
public Integer countOrNull() {
return loadBatchHeader().countOrNull();
}

@Override
public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}

@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
}

@Override
public int partitionLeaderEpoch() {
return loadBatchHeader().partitionLeaderEpoch();
}

@Override
protected int headerSize() {
return RECORD_BATCH_OVERHEAD;
}
}

}
Loading

0 comments on commit 81f0c1e

Please sign in to comment.