Skip to content

Commit

Permalink
KAFKA-3875; Transient test failure: kafka.api.SslProducerSendTest.tes…
Browse files Browse the repository at this point in the history
…tSendNonCompressedMessageWithCreateTime

1. The IllegalStateException is actually thrown from testCloseWithZeroTimeoutFromSenderThread() due to a bug. We call producer.close() in the callback. Once the first callback is called, producing records in the callback will hit the IllegalStateException. This only pollutes the output, but doesn't fail the test. I fixed this by only calling producer.send() in the first callback.
2. It's not clear which test throws TimeoutException and it's not reproducible locally. One thing is that the error message in TimeoutException is mis-leading since the timeout is not necessarily due to metadata. Improved this by making the error message in TimeoutException clearer.
3. It's not clear what actually failed testSendNonCompressedMessageWithCreateTime(). One thing I found is that since we set the linger time to MAX_LONG and are sending small messages, those produced messages won't be drained until we call producer.close(10000L, TimeUnit.MILLISECONDS). Normally, 10 secs should be enough for the records to be sent. My only hypothesis is that since SSL is more expensive, occasionally, 10 secs is still not enough. So, I bumped up the timeout from 10 secs to 20 secs.

Author: Jun Rao <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]>

Closes apache#1703 from junrao/kafka-3875
  • Loading branch information
junrao authored and ijuma committed Aug 4, 2016
1 parent 2e3722a commit 6fb33af
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,23 @@ public String toString() {
*/
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
boolean expire = false;
String errorMessage = null;

if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) {
expire = true;
else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
errorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
} else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) {
expire = true;
else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time";
} else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) {
expire = true;
errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time";
}

if (expire) {
this.records.close();
this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing " + recordCount + " record(s) expired due to timeout while requesting metadata from brokers for " + topicPartition));
this.done(-1L, Record.NO_TIMESTAMP,
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage));
}

return expire;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes)
producer.send(record, callback)
}
producer.close(10000L, TimeUnit.MILLISECONDS)
producer.close(20000L, TimeUnit.MILLISECONDS)
assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}", numRecords, callback.offset)
} finally {
producer.close()
Expand Down Expand Up @@ -408,11 +408,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)

// Test closing from sender thread.
class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception) {
// Trigger another batch in accumulator before close the producer. These messages should
// not be sent.
(0 until numRecords) map (i => producer.send(record))
if (sendRecords)
(0 until numRecords) foreach (i => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
// Test close with non zero timeout. Should not block at all.
Expand All @@ -423,7 +424,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
try {
// send message to partition 0
val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))
// Only send the records in the first callback since we close the producer in the callback and no records
// can be sent afterwards.
val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer, i == 0)))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
// flush the messages.
producer.flush()
Expand Down

0 comments on commit 6fb33af

Please sign in to comment.