diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index db611213ca968..d4a8a230befac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -216,18 +216,15 @@ public List abortExpiredBatches(int requestTimeout, Cluster cluster Iterator batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); - Node leader = cluster.leaderFor(topicAndPartition); - if (leader == null) { - // check if the batch is expired - if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { - expiredBatches.add(batch); - count++; - batchIterator.remove(); - deallocate(batch); - } else { - if (!batch.inRetry()) { - break; - } + // check if the batch is expired + if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { + expiredBatches.add(batch); + count++; + batchIterator.remove(); + deallocate(batch); + } else { + if (!batch.inRetry()) { + break; } } }