Skip to content

Commit

Permalink
Better handling for parseExceptions for Batch Ingestion (apache#3171)
Browse files Browse the repository at this point in the history
* Better handling for parseExceptions

* make parseException handling consistent with Realtime

* change combiner default val to true

* review comments

* review comments
  • Loading branch information
nishantmonu51 authored and fjy committed Jun 22, 2016
1 parent 24860a1 commit 6f330dc
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ protected void setup(Context context)
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ protected void setup(Context context)
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
Expand Down Expand Up @@ -349,7 +350,8 @@ protected void setup(Context context)
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.metamx.common.RE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
Expand All @@ -38,6 +39,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
private boolean reportParseExceptions;

@Override
protected void setup(Context context)
Expand All @@ -46,6 +48,7 @@ protected void setup(Context context)
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
granularitySpec = config.getGranularitySpec();
reportParseExceptions = !config.isIgnoreInvalidRows();
}

public HadoopDruidIndexerConfig getConfig()
Expand All @@ -68,20 +71,20 @@ protected void map(
try {
inputRow = parseInputRow(value, parser);
}
catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
} else {
catch (ParseException e) {
if (reportParseExceptions) {
throw e;
}
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row

}

if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, value, context);
innerMap(inputRow, value, context, reportParseExceptions);
}
}
catch (RuntimeException e) {
Expand All @@ -103,7 +106,7 @@ public final static InputRow parseInputRow(Object value, InputRowParser parser)
}
}

abstract protected void innerMap(InputRow inputRow, Object value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context, boolean reportParseExceptions)
throws IOException, InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ protected void setup(Context context)
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp
Expand All @@ -287,9 +288,9 @@ protected void innerMap(
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs)
InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions)
:
InputRowSerde.toBytes(inputRow, aggregators);
InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions);

context.write(
new SortableBytes(
Expand Down Expand Up @@ -369,9 +370,10 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, dimensions);
// reportParseExceptions is true as any unparseable data is already handled by the mapper.
context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true))
);
}
index.close();
Expand Down
14 changes: 12 additions & 2 deletions indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.io.ByteStreams;
import com.metamx.common.IAE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.query.aggregation.Aggregator;
Expand All @@ -50,7 +51,7 @@ public class InputRowSerde
{
private static final Logger log = new Logger(InputRowSerde.class);

public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs)
public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions)
{
try {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
Expand Down Expand Up @@ -91,7 +92,16 @@ public InputRow get()
true
)
);
agg.aggregate();
try {
agg.aggregate();
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName());
}
log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName());
}

String t = aggFactory.getTypeName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ public void testMultipleRowsMerged() throws Exception
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators))
new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true))
);

Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.AggregatorsModule;
Expand All @@ -44,6 +45,10 @@ public class InputRowSerdeTest
private List<String> dims;
private Map<String, Object> event;

{
new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique
}

public InputRowSerdeTest()
{
this.timestamp = System.currentTimeMillis();
Expand All @@ -60,22 +65,23 @@ public InputRowSerdeTest()
@Test
public void testSerde()
{
new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique


InputRow in = new MapBasedInputRow(
timestamp,
dims,
event
);

AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new HyperUniquesAggregatorFactory("m3out", "m3")
new HyperUniquesAggregatorFactory("m3out", "m3"),
new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long
};

byte[] data = InputRowSerde.toBytes(in, aggregatorFactories);
byte[] data = InputRowSerde.toBytes(in, aggregatorFactories, false); // Ignore Unparseable aggregator
InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories);

Assert.assertEquals(timestamp, out.getTimestampFromEpoch());
Expand All @@ -87,6 +93,28 @@ public void testSerde()
Assert.assertEquals(0.0f, out.getFloatMetric("agg_non_existing"), 0.00001);
Assert.assertEquals(5.0f, out.getFloatMetric("m1out"), 0.00001);
Assert.assertEquals(100L, out.getLongMetric("m2out"));
Assert.assertEquals(1, ((HyperLogLogCollector)out.getRaw("m3out")).estimateCardinality(), 0.001);
Assert.assertEquals(1, ((HyperLogLogCollector) out.getRaw("m3out")).estimateCardinality(), 0.001);
Assert.assertEquals(0L, out.getLongMetric("unparseable"));

}

@Test(expected = ParseException.class)
public void testThrowParseExceptions()
{
InputRow in = new MapBasedInputRow(
timestamp,
dims,
event
);
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new HyperUniquesAggregatorFactory("m3out", "m3"),
new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long
};

InputRowSerde.toBytes(in, aggregatorFactories, true);

}
}

0 comments on commit 6f330dc

Please sign in to comment.