Skip to content

Commit

Permalink
KAFKA-6768; Transactional producer may hang in close with pending req…
Browse files Browse the repository at this point in the history
…uests (apache#4842)

This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario.

Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
hachikuji authored Apr 9, 2018
1 parent e6b4d17 commit 0a8f35b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) {
return false;

AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
while (running) {
while (!forceClose) {
Node targetNode = null;
try {
if (nextRequestHandler.needsCoordinator()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ public void setup() {
client.setNode(brokerNode);
}

@Test
public void testSenderShutdownWithPendingAddPartitions() throws Exception {
long pid = 13131L;
short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();

transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

prepareAddPartitionsToTxn(tp0, Errors.NONE);
prepareProduceResponse(Errors.NONE, pid, epoch);

sender.initiateClose();
sender.run();

assertTrue(sendFuture.isDone());
}

@Test
public void testEndTxnNotSentIfIncompleteBatches() {
long pid = 13131L;
Expand Down

0 comments on commit 0a8f35b

Please sign in to comment.