Skip to content

Commit

Permalink
fix kafka8 unparsable message halt job issue (apache#4164)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijianding authored and fjy committed Apr 18, 2017
1 parent 0bcfd93 commit db656c5
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Committer;
Expand Down Expand Up @@ -182,7 +183,6 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object
@Override
public void start() throws Exception
{
nextMessage();
}

@Override
Expand Down Expand Up @@ -224,6 +224,15 @@ public InputRow currRow()
if (stopped) {
return null;
}
// currRow will be called before the first advance
if (row == null) {
try {
nextMessage();
}
catch (ParseException e) {
return null;
}
}
return row;
}

Expand Down

0 comments on commit db656c5

Please sign in to comment.