Skip to content

Commit

Permalink
KAFKA-2819; catch NoSuchElementException in ConsoleConsumer
Browse files Browse the repository at this point in the history
Author: Guozhang Wang <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#516 from guozhangwang/K2819
  • Loading branch information
guozhangwang authored and junrao committed Nov 13, 2015
1 parent 969d0cb commit 4170847
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/common/StreamEndException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kafka.common

/**
* An exception that indicates KafkaStream has ended.
*/
class StreamEndException() extends RuntimeException {
}
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/consumer/BaseConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.consumer
import java.util.Properties
import java.util.regex.Pattern

import kafka.common.StreamEndException
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener

/**
Expand Down Expand Up @@ -84,7 +85,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
val iter = stream.iterator

override def receive(): BaseConsumerRecord = {
// we do not need to check hasNext for KafkaStream iterator
if (!iter.hasNext())
throw new StreamEndException

val messageAndMetadata = iter.next
BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.tools
import java.io.PrintStream
import java.util.{Properties, Random}
import joptsimple._
import kafka.common.StreamEndException
import kafka.consumer._
import kafka.message._
import kafka.metrics.KafkaMetricsReporter
Expand Down Expand Up @@ -100,11 +101,14 @@ object ConsoleConsumer extends Logging {
val msg: BaseConsumerRecord = try {
consumer.receive()
} catch {
case e: Throwable => {
case nse: StreamEndException =>
trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
// Consumer is already closed
return
case e: Throwable =>
error("Error processing message, terminating consumer process: ", e)
// Consumer will be closed
return
}
}
try {
formatter.writeTo(msg.key, msg.value, System.out)
Expand Down

0 comments on commit 4170847

Please sign in to comment.