Skip to content

Commit

Permalink
MINOR: Fix handling of dummy record in EndToEndLatency tool
Browse files Browse the repository at this point in the history
EndToEndLatency tool produces a dummy record in case the topic does not exist. This behavior was introduced in this PR apache#5319  as part of updating the tool to use latest consumer API. However, if we run the tool with producer acks == 1, the high watermark may not be updated before we reset consumer offsets to latest. In rare cases when this happens, the tool will throw an exception in the for loop where the consumer will unexpectedly consume the dummy record. As a result, we occasionally see Benchmark.test_end_to_end_latency system test failures.

This PR checks if topic exists, and creates the topic using AdminClient if it does not exist.

Author: Anna Povzner <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#5950 from apovzner/fix-EndToEndLatency
  • Loading branch information
apovzner authored and ewencp committed Nov 30, 2018
1 parent 944f24c commit 3acebe6
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions core/src/main/scala/kafka/tools/EndToEndLatency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package kafka.tools

import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.{Arrays, Properties}
import java.util.{Collections, Arrays, Properties}

import kafka.utils.Exit
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.{admin, CommonClientConfigs}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
Expand All @@ -44,6 +46,8 @@ import scala.util.Random

object EndToEndLatency {
private val timeout: Long = 60000
private val defaultReplicationFactor: Short = 1
private val defaultNumPartitions: Int = 1

def main(args: Array[String]) {
if (args.length != 5 && args.length != 6) {
Expand All @@ -61,10 +65,13 @@ object EndToEndLatency {
if (!List("1", "all").contains(producerAcks))
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")

def loadProps: Properties = propsFile.map(Utils.loadProps).getOrElse(new Properties())
def loadPropsWithBootstrapServers: Properties = {
val props = propsFile.map(Utils.loadProps).getOrElse(new Properties())
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props
}

val consumerProps = loadProps
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val consumerProps = loadPropsWithBootstrapServers
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Expand All @@ -73,24 +80,30 @@ object EndToEndLatency {
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)

val producerProps = loadProps
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val producerProps = loadPropsWithBootstrapServers
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)

// sends a dummy message to create the topic if it doesn't exist
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get()

def finalise() {
consumer.commitSync()
producer.close()
consumer.close()
}

// create topic if it does not exist
if (!consumer.listTopics().containsKey(topic)) {
try {
createTopic(topic, loadPropsWithBootstrapServers)
} catch {
case t: Throwable =>
finalise()
throw new RuntimeException(s"Failed to create topic $topic", t)
}
}

val topicPartitions = consumer.partitionsFor(topic).asScala
.map(p => new TopicPartition(p.topic(), p.partition())).asJava
Expand Down Expand Up @@ -153,4 +166,14 @@ object EndToEndLatency {
def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
Array.fill(len)((random.nextInt(26) + 65).toByte)
}

def createTopic(topic: String, props: Properties): Unit = {
println("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d"
.format(topic, defaultNumPartitions, defaultReplicationFactor))

val adminClient = admin.AdminClient.create(props)
val newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor)
try adminClient.createTopics(Collections.singleton(newTopic)).all().get()
finally Utils.closeQuietly(adminClient, "AdminClient")
}
}

0 comments on commit 3acebe6

Please sign in to comment.