Skip to content

Commit

Permalink
MINOR: Close the producer batch append stream when the batch gets ful…
Browse files Browse the repository at this point in the history
…l to free up resources

Of particular importance are compression buffers (64 KB for LZ4, for example).

Author: Apurva Mehta <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#2796 from apurvam/idempotent-producer-close-data-stream
  • Loading branch information
Apurva Mehta authored and ijuma committed Apr 3, 2017
1 parent 040fde8 commit b9b2cfc
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ public void setProducerState(TransactionState.PidAndEpoch pidAndEpoch, int baseS
recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
}

/**
* Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only
* possible to update the RecordBatch header.
*/
public void closeForRecordAppends() {
recordsBuilder.closeForRecordAppends();
}

public void close() {
recordsBuilder.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,22 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag
}

/**
* Try to append to a ProducerBatch. If it is full, we return null and a new batch is created. If the existing batch is
* full, it will be closed right before send, or if it is expired, or when the producer is closed, whichever
* comes first.
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);

}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
* This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}.
* It transparently handles compression and exposes methods for appending new records, possibly with message
* format conversion.
*
* In cases where keeping memory retention low is important and there's a gap between the time that record appends stop
* and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens.
* This will release resources like compression buffers that can be relatively large (64 KB for LZ4).
*/
public class MemoryRecordsBuilder {
private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
Expand All @@ -50,7 +54,11 @@ public class MemoryRecordsBuilder {

private final TimestampType timestampType;
private final CompressionType compressionType;
// Used to append records, may compress data on the fly
private final DataOutputStream appendStream;
// Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access
// the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough,
// so it's not safe to hold a direct reference to the underlying ByteBuffer.
private final ByteBufferOutputStream bufferStream;
private final byte magic;
private final int initPos;
Expand All @@ -61,6 +69,7 @@ public class MemoryRecordsBuilder {
private final int writeLimit;
private final int initialCapacity;

private boolean appendStreamIsClosed = false;
private long producerId;
private short producerEpoch;
private int baseSequence;
Expand Down Expand Up @@ -206,15 +215,26 @@ public void setProducerState(long pid, short epoch, int baseSequence) {
this.baseSequence = baseSequence;
}

/**
* Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only
* possible to update the RecordBatch header.
*/
public void closeForRecordAppends() {
if (!appendStreamIsClosed) {
try {
appendStream.close();
appendStreamIsClosed = true;
} catch (IOException e) {
throw new KafkaException(e);
}
}
}

public void close() {
if (builtRecords != null)
return;

try {
appendStream.close();
} catch (IOException e) {
throw new KafkaException(e);
}
closeForRecordAppends();

if (numRecords == 0L) {
buffer().position(initPos);
Expand All @@ -233,6 +253,7 @@ else if (compressionType != CompressionType.NONE)
}

private void writeDefaultBatchHeader() {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
Expand All @@ -257,6 +278,7 @@ private void writeDefaultBatchHeader() {
}

private void writeLegacyCompressedWrapperHeader() {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
Expand Down Expand Up @@ -416,6 +438,7 @@ public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuff
* @param record The record to add
*/
public void appendUncheckedWithOffset(long offset, LegacyRecord record) {
ensureOpenForRecordAppend();
try {
int size = record.sizeInBytes();
AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
Expand Down Expand Up @@ -469,6 +492,7 @@ public void append(LegacyRecord record) {

private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp,
ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
Expand All @@ -478,6 +502,7 @@ private long appendDefaultRecord(long offset, boolean isControlRecord, long time
}

private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
ensureOpenForRecordAppend();
if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;

Expand Down Expand Up @@ -515,6 +540,18 @@ private void recordWritten(long offset, long timestamp, int size) {
}
}

private void ensureOpenForRecordAppend() {
if (appendStreamIsClosed)
throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends");
if (isClosed())
throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed");
}

private void ensureOpenForRecordBatchWrite() {
if (isClosed())
throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is closed");
}

/**
* Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}.
* @return The estimated number of bytes written
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,6 @@ class Log(@volatile var dir: File,
val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()

for (batch <- records.batches.asScala) {
if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0)
throw new InvalidRecordException("Client produce requests should not have more than one batch")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
// For magic version 2, we can get the first offset directly from the batch header.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ private[log] class Cleaner(val id: Int,
if (record.isControlRecord)
return true

// retain the entry if it is the last one produced by an active idempotent producer to ensure that
// retain the record if it is the last one produced by an active idempotent producer to ensure that
// the PID is not removed from the log before it has been expired
if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
return true
Expand Down

0 comments on commit b9b2cfc

Please sign in to comment.