From 9068bcd06292e6aa4623995175af207b5b0824b6 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 9 Jul 2015 12:11:38 -0700 Subject: [PATCH] Consider null inputRows and parse errors as unparseable during realtime ingestion. Also, harmonize exception handling between the RealtimeIndexTask and the RealtimeManager. Conditions other than null inputRows and parse errors bubble up in both. --- .../common/task/RealtimeIndexTask.java | 42 +++--- .../segment/realtime/RealtimeManager.java | 8 +- .../segment/realtime/RealtimeManagerTest.java | 131 +++++++++++------- 3 files changed, 113 insertions(+), 68 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 4cef65fcc5e6..a56da1a700d1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -58,7 +58,6 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Random; public class RealtimeIndexTask extends AbstractTask @@ -293,35 +292,40 @@ public String getVersion(final Interval interval) long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { final InputRow inputRow; + try { inputRow = firehose.nextRow(); + if (inputRow == null) { + log.debug("thrown away null input row, considering unparseable"); + fireDepartment.getMetrics().incrementUnparseable(); continue; } + } + catch (ParseException e) { + log.debug(e, "thrown away line due to exception, considering unparseable"); + fireDepartment.getMetrics().incrementUnparseable(); + continue; + } - int currCount = plumber.add(inputRow); - if (currCount == -1) { - fireDepartment.getMetrics().incrementThrownAway(); - log.debug("Throwing away event[%s]", inputRow); - - if (System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } + int currCount = plumber.add(inputRow); + if (currCount == -1) { + fireDepartment.getMetrics().incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); - continue; - } - - fireDepartment.getMetrics().incrementProcessed(); - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { + if (System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } + + continue; } - catch (ParseException e) { - log.warn(e, "unparseable line"); - fireDepartment.getMetrics().incrementUnparseable(); + + fireDepartment.getMetrics().incrementProcessed(); + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 5a68e516f3aa..efcba1704742 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -252,8 +252,14 @@ public void run() try { try { inputRow = firehose.nextRow(); + + if (inputRow == null) { + log.debug("thrown away null input row, considering unparseable"); + metrics.incrementUnparseable(); + continue; + } } - catch (Exception e) { + catch (ParseException e) { log.debug(e, "thrown away line due to exception, considering unparseable"); metrics.incrementUnparseable(); continue; diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 99f1aca44638..d2e124d1205a 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.ISE; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -40,10 +41,10 @@ import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; import io.druid.utils.Runnables; -import junit.framework.Assert; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -64,8 +65,11 @@ public class RealtimeManagerTest @Before public void setUp() throws Exception { - final List rows = Arrays.asList( - makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis()) + final List rows = Arrays.asList( + makeRow(new DateTime("9000-01-01").getMillis()), + makeRow(new ParseException("parse error")), + null, + makeRow(new DateTime().getMillis()) ); schema = new DataSchema( @@ -137,71 +141,97 @@ public void testRun() throws Exception Assert.assertEquals(1, realtimeManager.getMetrics("test").processed()); Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway()); + Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable()); Assert.assertTrue(plumber.isStartedJob()); Assert.assertTrue(plumber.isFinishedJob()); Assert.assertEquals(1, plumber.getPersistCount()); } - private InputRow makeRow(final long timestamp) + private TestInputRowHolder makeRow(final long timestamp) + { + return new TestInputRowHolder(timestamp, null); + } + + private TestInputRowHolder makeRow(final RuntimeException e) + { + return new TestInputRowHolder(0, e); + } + + private static class TestInputRowHolder { - return new InputRow() + private long timestamp; + private RuntimeException exception; + + public TestInputRowHolder(long timestamp, RuntimeException exception) { - @Override - public List getDimensions() - { - return Arrays.asList("testDim"); - } + this.timestamp = timestamp; + this.exception = exception; + } - @Override - public long getTimestampFromEpoch() - { - return timestamp; + public InputRow getRow() + { + if (exception != null) { + throw exception; } - @Override - public DateTime getTimestamp() + return new InputRow() { - return new DateTime(timestamp); - } + @Override + public List getDimensions() + { + return Arrays.asList("testDim"); + } - @Override - public List getDimension(String dimension) - { - return Lists.newArrayList(); - } + @Override + public long getTimestampFromEpoch() + { + return timestamp; + } - @Override - public float getFloatMetric(String metric) - { - return 0; - } + @Override + public DateTime getTimestamp() + { + return new DateTime(timestamp); + } - @Override - public long getLongMetric(String metric) - { - return 0L; - } + @Override + public List getDimension(String dimension) + { + return Lists.newArrayList(); + } - @Override - public Object getRaw(String dimension) - { - return null; - } + @Override + public float getFloatMetric(String metric) + { + return 0; + } - @Override - public int compareTo(Row o) - { - return 0; - } - }; - } + @Override + public long getLongMetric(String metric) + { + return 0L; + } + @Override + public Object getRaw(String dimension) + { + return null; + } + + @Override + public int compareTo(Row o) + { + return 0; + } + }; + } + } private static class TestFirehose implements Firehose { - private final Iterator rows; + private final Iterator rows; - private TestFirehose(Iterator rows) + private TestFirehose(Iterator rows) { this.rows = rows; } @@ -215,7 +245,12 @@ public boolean hasMore() @Override public InputRow nextRow() { - return rows.next(); + final TestInputRowHolder holder = rows.next(); + if (holder == null) { + return null; + } else { + return holder.getRow(); + } } @Override