Skip to content

Commit

Permalink
KAFKA-1337: Fix incorrect producer configs after config renaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Apr 18, 2014
1 parent b9351e0 commit 8b05215
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
37 changes: 21 additions & 16 deletions core/src/main/scala/kafka/producer/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ object ConsoleProducer {

val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, config.cmdLineProps)
val props = new Properties
props.put("topic", config.topic)
props.putAll(config.cmdLineProps)
reader.init(System.in, props)

try {
val producer =
Expand Down Expand Up @@ -201,7 +204,6 @@ object ConsoleProducer {
val readerClass = options.valueOf(messageReaderOpt)
val socketBuffer = options.valueOf(socketBufferSizeOpt)
val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
cmdLineProps.put("topic", topic)
/* new producer related configs */
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
Expand Down Expand Up @@ -262,22 +264,24 @@ object ConsoleProducer {
}

class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
import org.apache.kafka.clients.producer.ProducerConfig
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
props.put("compression.type", producerConfig.compressionCodec)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
props.put("metadata.fetch.timeout.ms", producerConfig.metadataFetchTimeoutMs.toString)
props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
props.put("request.retries", producerConfig.messageSendMaxRetries.toString)
props.put("linger.ms", producerConfig.sendTimeout.toString)
props.putAll(producerConfig.cmdLineProps)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec)
props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString)
props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString)
props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString)
props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString)
props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString)
if(producerConfig.queueEnqueueTimeoutMs != -1)
props.put("block.on.buffer.full", "false")
props.put("total.memory.bytes", producerConfig.maxMemoryBytes.toString)
props.put("max.partition.bytes", producerConfig.maxPartitionMemoryBytes.toString)
props.put("client.id", "console-producer")
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
val producer = new KafkaProducer(props)

def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
Expand All @@ -294,6 +298,7 @@ object ConsoleProducer {

class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
val props = new Properties()
props.putAll(producerConfig.cmdLineProps)
props.put("metadata.broker.list", producerConfig.brokerList)
props.put("compression.codec", producerConfig.compressionCodec)
props.put("producer.type", if(producerConfig.sync) "sync" else "async")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/ReplayLogProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties
import kafka.consumer._
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}

object ReplayLogProducer extends Logging {

Expand Down Expand Up @@ -122,7 +122,7 @@ object ReplayLogProducer extends Logging {
val isSync = options.has(syncOpt)
import scala.collection.JavaConversions._
val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
producerProps.put("metadata.broker.list", brokerList)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
}

class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.tools

import java.util.Properties
import kafka.consumer._
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}

object TestEndToEndLatency {
def main(args: Array[String]) {
Expand All @@ -46,9 +46,9 @@ object TestEndToEndLatency {
val iter = stream.iterator

val producerProps = new Properties()
producerProps.put("metadata.broker.list", brokerList)
producerProps.put("linger.ms", "0")
producerProps.put("block.on.buffer.full", "true")
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
val producer = new KafkaProducer(producerProps)

val message = "hello there beautiful".getBytes
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/TestLogCleaning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.serializer._
import kafka.utils._
import kafka.log.FileMessageSet
import kafka.log.Log
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}

/**
* This is a torture test that runs against an existing broker. Here is how it works:
Expand Down Expand Up @@ -240,8 +240,8 @@ object TestLogCleaning {
dups: Int,
percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty("block.on.buffer.full", "true")
producerProps.setProperty("metadata.broker.list", brokerUrl)
producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
val producer = new KafkaProducer(producerProps)
val rand = new Random(1)
val keyCount = (messages / dups).toInt
Expand Down

0 comments on commit 8b05215

Please sign in to comment.