Skip to content

Commit

Permalink
KAFKA2805; RecordAccumulator request timeout not enforced when all br…
Browse files Browse the repository at this point in the history
…okers are gone

Removed the check for expiring only those batches whose metadata is unavailable. Now the batches will be expired irrespective of whether the leader is available or not, as soon as it reaches the requestimeout threshold.

Author: Mayuresh Gharat <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#503 from MayureshGharat/kafka-2805
  • Loading branch information
Mayuresh Gharat authored and junrao committed Nov 12, 2015
1 parent 124f73b commit 1cd22ed
Showing 1 changed file with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,15 @@ public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
Node leader = cluster.leaderFor(topicAndPartition);
if (leader == null) {
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
} else {
if (!batch.inRetry()) {
break;
}
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
} else {
if (!batch.inRetry()) {
break;
}
}
}
Expand Down

0 comments on commit 1cd22ed

Please sign in to comment.