Skip to content

Commit

Permalink
KAFKA-2791; removed deprecated properties
Browse files Browse the repository at this point in the history
Removed support for BLOCK_ON_BUFFER_FULL_CONFIG (block.on.buffer.full)
Removed support for METADATA_FETCH_TIMEOUT_CONFIG
Removed support for TIMEOUT_CONFIG (aka timeout.ms)

Added support for MAX_BLOCK_MS_CONFIG
Added support for REQUEST_TIMEOUT_MS_CONFIG

Author: Ben Stopford <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#525 from benstopford/KAFKA-2791
  • Loading branch information
benstopford authored and junrao committed Nov 13, 2015
1 parent 397306c commit 4efe4ac
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,11 @@ object ConsoleProducer {
props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.maxBlockMs.toString)
props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.requestTimeoutMs.toString)
props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
if(config.queueEnqueueTimeoutMs != -1)
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
Expand Down Expand Up @@ -190,10 +188,10 @@ object ConsoleProducer {
.describedAs("metadata expiration interval")
.ofType(classOf[java.lang.Long])
.defaultsTo(5*60*1000L)
val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
"The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
val maxBlockMsOpt = parser.accepts("max-block-ms",
"The max time that the producer will block for during a send request")
.withRequiredArg
.describedAs("metadata fetch timeout")
.describedAs("max block on send")
.ofType(classOf[java.lang.Long])
.defaultsTo(60*1000L)
val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
Expand Down Expand Up @@ -276,7 +274,7 @@ object ConsoleProducer {
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
val maxBlockMs = options.valueOf(maxBlockMsOpt)
}

trait MessageReader {
Expand Down

0 comments on commit 4efe4ac

Please sign in to comment.