Skip to content

Commit

Permalink
KAFKA-5283; Handle producer epoch/sequence overflow
Browse files Browse the repository at this point in the history
- Producer sequence numbers should wrap around
- Generate a new producerId if the producer epoch would overflow

Author: Jason Gustafson <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Apurva Mehta <[email protected]>, Guozhang Wang <[email protected]>

Closes apache#3183 from hachikuji/KAFKA-5283
  • Loading branch information
hachikuji committed Jun 2, 2017
1 parent 0c3e466 commit 1c882ee
Show file tree
Hide file tree
Showing 16 changed files with 605 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private static DefaultRecord readFrom(ByteBuffer buffer,

int offsetDelta = ByteUtils.readVarint(buffer);
long offset = baseOffset + offsetDelta;
int sequence = baseSequence >= 0 ? baseSequence + offsetDelta : RecordBatch.NO_SEQUENCE;
int sequence = baseSequence >= 0 ?
DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
RecordBatch.NO_SEQUENCE;

ByteBuffer key = null;
int keySize = ByteUtils.readVarint(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ public int lastSequence() {
int baseSequence = baseSequence();
if (baseSequence == RecordBatch.NO_SEQUENCE)
return RecordBatch.NO_SEQUENCE;
return baseSequence() + lastOffsetDelta();

int delta = lastOffsetDelta();
return incrementSequence(baseSequence, delta);
}

@Override
Expand Down Expand Up @@ -462,6 +464,12 @@ static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] header
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}

static int incrementSequence(int baseSequence, int increment) {
if (baseSequence > Integer.MAX_VALUE - increment)
return increment - (Integer.MAX_VALUE - baseSequence) - 1;
return baseSequence + increment;
}

private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,35 @@ public void buildDefaultRecordBatchWithProducerId() {
}
}

@Test
public void buildDefaultRecordBatchWithSequenceWrapAround() {
long pid = 23423L;
short epoch = 145;
int baseSequence = Integer.MAX_VALUE - 1;
ByteBuffer buffer = ByteBuffer.allocate(2048);

MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.CREATE_TIME, 1234567L, RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence);
builder.appendWithOffset(1234567, 1L, "a".getBytes(), "v".getBytes());
builder.appendWithOffset(1234568, 2L, "b".getBytes(), "v".getBytes());
builder.appendWithOffset(1234569, 3L, "c".getBytes(), "v".getBytes());

MemoryRecords records = builder.build();
List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);

assertEquals(pid, batch.producerId());
assertEquals(epoch, batch.producerEpoch());
assertEquals(baseSequence, batch.baseSequence());
assertEquals(0, batch.lastSequence());
List<Record> allRecords = TestUtils.toList(batch);
assertEquals(3, allRecords.size());
assertEquals(Integer.MAX_VALUE - 1, allRecords.get(0).sequence());
assertEquals(Integer.MAX_VALUE, allRecords.get(1).sequence());
assertEquals(0, allRecords.get(2).sequence());
}

@Test
public void testSizeInBytes() {
Header[] headers = new Header[] {
Expand Down Expand Up @@ -265,4 +294,11 @@ public void testStreamingIteratorConsistency() {
}
}

@Test
public void testIncrementSequence() {
assertEquals(10, DefaultRecordBatch.incrementSequence(5, 5));
assertEquals(0, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE, 1));
assertEquals(4, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE - 5, 10));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,34 +110,31 @@ class TransactionCoordinator(brokerId: Int,
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
} else {
val producerId = producerIdManager.generateProducerId()
val now = time.milliseconds()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
producerEpoch = 0,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = now)

// only try to get a new producerId and update the cache if the transactional id is unknown
val result: Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId, Some(createdMetadata)) match {
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId) match {
case Right(None) =>
val producerId = producerIdManager.generateProducerId()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds())
txnManager.putTransactionStateIfNotExists(transactionalId, createdMetadata)

case other => other
}

val result: Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata match {
case Left(err) =>
Left(initTransactionError(err))

case Right(Some(existingEpochAndMetadata)) =>
val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
val txnMetadata = existingEpochAndMetadata.transactionMetadata

// there might be a concurrent thread that has just updated the mapping
// with the transactional id at the same time (hence reference equality will fail);
// in this case we will treat it as the metadata has existed already
txnMetadata synchronized {
if (!txnMetadata.eq(createdMetadata)) {
initProducerIdWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
} else {
Right(coordinatorEpoch, txnMetadata.prepareNewProducerId(time.milliseconds()))
}
prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
}

case Right(None) =>
Expand Down Expand Up @@ -182,26 +179,36 @@ class TransactionCoordinator(brokerId: Int,
}
}

private def initProducerIdWithExistingMetadata(transactionalId: String,
transactionTimeoutMs: Int,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = {
private def prepareInitProduceIdTransit(transactionalId: String,
transactionTimeoutMs: Int,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata): Either[InitProducerIdResult, (Int, TxnTransitMetadata)] = {
if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
} else {
// caller should have synchronized on txnMetadata already
txnMetadata.state match {
case PrepareAbort | PrepareCommit =>
// reply to client and let client backoff and retry
// reply to client and let it backoff and retry
Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))

case CompleteAbort | CompleteCommit | Empty =>
// try to append and then update
Right(coordinatorEpoch, txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds()))
val transitMetadata = if (txnMetadata.isProducerEpochExhausted) {
val newProducerId = producerIdManager.generateProducerId()
txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds())
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds())
}

Right(coordinatorEpoch, transitMetadata)

case Ongoing =>
// indicate to abort the current ongoing txn first
// indicate to abort the current ongoing txn first. Note that this epoch is never returned to the
// user. We will abort the ongoing transaction and return CONCURRENT_TRANSACTIONS to the client.
// This forces the client to retry, which will ensure that the epoch is bumped a second time. In
// particular, if fencing the current producer exhausts the available epochs for the current producerId,
// then when the client retries, we will generate a new producerId.
Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
case Dead =>
throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
Expand All @@ -220,7 +227,7 @@ class TransactionCoordinator(brokerId: Int,
} else {
// try to update the transaction metadata and append the updated metadata to txn log;
// if there is no such metadata treat it as invalid producerId mapping error.
val result: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
val result: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
case Left(err) =>
Left(err)

Expand Down Expand Up @@ -286,7 +293,7 @@ class TransactionCoordinator(brokerId: Int,
if (transactionalId == null || transactionalId.isEmpty)
responseCallback(Errors.INVALID_REQUEST)
else {
val preAppendResult: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
val preAppendResult: Either[Errors, (Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
case Left(err) =>
Left(err)

Expand All @@ -296,7 +303,6 @@ class TransactionCoordinator(brokerId: Int,
case Right(Some(epochAndTxnMetadata)) =>
val txnMetadata = epochAndTxnMetadata.transactionMetadata
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
val now = time.milliseconds()

txnMetadata synchronized {
if (txnMetadata.producerId != producerId)
Expand Down Expand Up @@ -349,7 +355,7 @@ class TransactionCoordinator(brokerId: Int,
case Right((coordinatorEpoch, newMetadata)) =>
def sendTxnMarkersCallback(error: Errors): Unit = {
if (error == Errors.NONE) {
val preSendResult: Either[Errors, (TransactionMetadata, TxnTransitMetadata)] = txnManager.getAndMaybeAddTransactionState(transactionalId) match {
val preSendResult: Either[Errors, (TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId) match {
case Left(err) =>
Left(err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
case Errors.NONE =>
trace(s"Completed sending transaction markers for $transactionalId as $txnResult")

txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
txnStateManager.getTransactionState(transactionalId) match {
case Left(Errors.NOT_COORDINATOR) =>
info(s"I am no longer the coordinator for $transactionalId with coordinator epoch $coordinatorEpoch; cancel appending $newMetadata to transaction log")

Expand Down Expand Up @@ -291,7 +291,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}

case None =>
txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
txnStateManager.getTransactionState(transactionalId) match {
case Left(error) =>
info(s"Encountered $error trying to fetch transaction metadata for $transactionalId with coordinator epoch $coordinatorEpoch; cancel sending markers to its partition leaders")
txnMarkerPurgatory.cancelForKey(transactionalId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry

txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
txnStateManager.getTransactionState(transactionalId) match {

case Left(Errors.NOT_COORDINATOR) =>
info(s"I am no longer the coordinator for $transactionalId; cancel sending transaction markers $txnMarker to the brokers")
Expand Down Expand Up @@ -93,7 +93,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
if (errors == null)
throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for producer id ${txnMarker.producerId}")

txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
txnStateManager.getTransactionState(transactionalId) match {
case Left(Errors.NOT_COORDINATOR) =>
info(s"I am no longer the coordinator for $transactionalId; cancel sending transaction markers $txnMarker to the brokers")

Expand Down
Loading

0 comments on commit 1c882ee

Please sign in to comment.