diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index f677bc93c228..fd2755aff2f8 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Committer; @@ -182,7 +183,6 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object @Override public void start() throws Exception { - nextMessage(); } @Override @@ -224,6 +224,15 @@ public InputRow currRow() if (stopped) { return null; } + // currRow will be called before the first advance + if (row == null) { + try { + nextMessage(); + } + catch (ParseException e) { + return null; + } + } return row; }