Skip to content

Commit

Permalink
Merge pull request apache#1506 from gianm/realtime-plumber-nulls
Browse files Browse the repository at this point in the history
Consider null inputRows and parse errors as unparseable during realtime ingestion.
  • Loading branch information
himanshug committed Jul 13, 2015
2 parents cac7229 + 9068bcd commit 725086c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;

public class RealtimeIndexTask extends AbstractTask
Expand Down Expand Up @@ -293,35 +292,40 @@ public String getVersion(final Interval interval)
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;

try {
inputRow = firehose.nextRow();

if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}

int currCount = plumber.add(inputRow);
if (currCount == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);

if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
int currCount = plumber.add(inputRow);
if (currCount == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);

continue;
}

fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}

continue;
}
catch (ParseException e) {
log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable();

fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,14 @@ public void run()
try {
try {
inputRow = firehose.nextRow();

if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (Exception e) {
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
Expand All @@ -40,10 +41,10 @@
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.utils.Runnables;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -64,8 +65,11 @@ public class RealtimeManagerTest
@Before
public void setUp() throws Exception
{
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
final List<TestInputRowHolder> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()),
makeRow(new ParseException("parse error")),
null,
makeRow(new DateTime().getMillis())
);

schema = new DataSchema(
Expand Down Expand Up @@ -137,71 +141,97 @@ public void testRun() throws Exception

Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}

private InputRow makeRow(final long timestamp)
private TestInputRowHolder makeRow(final long timestamp)
{
return new TestInputRowHolder(timestamp, null);
}

private TestInputRowHolder makeRow(final RuntimeException e)
{
return new TestInputRowHolder(0, e);
}

private static class TestInputRowHolder
{
return new InputRow()
private long timestamp;
private RuntimeException exception;

public TestInputRowHolder(long timestamp, RuntimeException exception)
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
this.timestamp = timestamp;
this.exception = exception;
}

@Override
public long getTimestampFromEpoch()
{
return timestamp;
public InputRow getRow()
{
if (exception != null) {
throw exception;
}

@Override
public DateTime getTimestamp()
return new InputRow()
{
return new DateTime(timestamp);
}
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}

@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}

@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timestamp);
}

@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}

@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}

@Override
public int compareTo(Row o)
{
return 0;
}
};
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}

@Override
public Object getRaw(String dimension)
{
return null;
}

@Override
public int compareTo(Row o)
{
return 0;
}
};
}
}

private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private final Iterator<TestInputRowHolder> rows;

private TestFirehose(Iterator<InputRow> rows)
private TestFirehose(Iterator<TestInputRowHolder> rows)
{
this.rows = rows;
}
Expand All @@ -215,7 +245,12 @@ public boolean hasMore()
@Override
public InputRow nextRow()
{
return rows.next();
final TestInputRowHolder holder = rows.next();
if (holder == null) {
return null;
} else {
return holder.getRow();
}
}

@Override
Expand Down

0 comments on commit 725086c

Please sign in to comment.