Skip to content

Commit

Permalink
IncrementalIndex: Fix multi-value dimensions returned from iterators. (
Browse files Browse the repository at this point in the history
…apache#3344)

They had arrays as values, which MapBasedRow doesn't understand and
toStrings rather than converting to lists.
  • Loading branch information
gianm authored and b-slim committed Aug 10, 2016
1 parent b21a98e commit a2bcd97
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
Expand All @@ -46,6 +46,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -65,11 +66,9 @@ public void setUp() throws Exception
"website",
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "keywords")), null, null)
),
null
),
Expand Down Expand Up @@ -145,17 +144,19 @@ public void testMultipleRowsMerged() throws Exception

InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableList.<String>of("keywords"),
ImmutableMap.<String, Object>of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableList.<String>of("keywords"),
ImmutableMap.<String, Object>of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
Expand All @@ -176,10 +177,85 @@ public void testMultipleRowsMerged() throws Exception
context
);

EasyMock.verify(context);

Assert.assertTrue(captureKey.getValue() == key);

InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow.getDimensions());
Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords"));
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
}

@Test
public void testMultipleRowsNotMerged() throws Exception
{
long timestamp = System.currentTimeMillis();

Bucket bucket = new Bucket(0, new DateTime(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();

InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of("host", "keywords"),
ImmutableMap.<String, Object>of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of("host", "keywords"),
ImmutableMap.<String, Object>of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true))
);

Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey1 = Capture.newInstance();
Capture<BytesWritable> captureVal1 = Capture.newInstance();
Capture<BytesWritable> captureKey2 = Capture.newInstance();
Capture<BytesWritable> captureVal2 = Capture.newInstance();
context.write(EasyMock.capture(captureKey1), EasyMock.capture(captureVal1));
context.write(EasyMock.capture(captureKey2), EasyMock.capture(captureVal2));
EasyMock.replay(context);

combiner.reduce(
key,
rows,
context
);

EasyMock.verify(context);

Assert.assertTrue(captureKey1.getValue() == key);
Assert.assertTrue(captureKey2.getValue() == key);

InputRow capturedRow1 = InputRowSerde.fromBytes(captureVal1.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow1.getDimensions());
Assert.assertEquals(Arrays.asList("host1"), capturedRow1.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords"));
Assert.assertEquals(10, capturedRow1.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts")), 0.001);

InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions());
Assert.assertEquals(Arrays.asList("host2"), capturedRow2.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords"));
Assert.assertEquals(5, capturedRow2.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts")), 0.001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -939,13 +939,13 @@ public Row apply(final Map.Entry<TimeAndDims, Integer> input)
}
theVals.put(dimensionName, val);
} else {
Comparable[] dimVals = new Comparable[dim.length];
for (int j = 0; j < dimVals.length; j++) {
List<Comparable> dimVals = new ArrayList<Comparable>(dim.length);
for (int j = 0; j < dim.length; j++) {
Comparable val = dimensionDesc.getValues().getValue(dim[j]);
if (type == ValueType.STRING) {
val = Strings.nullToEmpty((String) val);
}
dimVals[j] = val;
dimVals.add(val);
}
theVals.put(dimensionName, dimVals);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,43 @@ public void testFilteredAggregators() throws Exception
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_bound_filtered"),
new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_multivaldim_filtered"),
new SelectorDimFilter("dim3", "b", null)
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_numeric_filtered"),
new SelectorDimFilter("met1", "11", null)
)
})
);

populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2", "dim3"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "dim3", Lists.newArrayList("b", "a"), "met1", 10)
)
);

index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2", "dim3"),
ImmutableMap.<String, Object>of("dim1", "3", "dim2", "4", "dim3", Lists.newArrayList("c", "d"), "met1", 11)
)
);

Assert.assertEquals(Arrays.asList("dim1", "dim2", "dim3"), index.getDimensionNames());
Assert.assertEquals(
Arrays.asList("count", "count_selector_filtered", "count_bound_filtered"),
Arrays.asList(
"count",
"count_selector_filtered",
"count_bound_filtered",
"count_multivaldim_filtered",
"count_numeric_filtered"
),
index.getMetricNames()
);
Assert.assertEquals(2, index.size());
Expand All @@ -324,15 +353,23 @@ public void testFilteredAggregators() throws Exception
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("1"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("2"), row.getDimension("dim2"));
Assert.assertEquals(Arrays.asList("a", "b"), row.getDimension("dim3"));
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(1L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_bound_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_multivaldim_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_numeric_filtered"));

row = rows.next();
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
Assert.assertEquals(Arrays.asList("c", "d"), row.getDimension("dim3"));
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(0L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_bound_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_multivaldim_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_numeric_filtered"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public void testNullDimensionTransform() throws IndexSizeExceededException

Row row = index.iterator().next();

Assert.assertArrayEquals(new String[]{"", "", "A"}, (Object[]) row.getRaw("string"));
Assert.assertArrayEquals(new Float[]{null, null, Float.MAX_VALUE}, (Object[]) row.getRaw("float"));
Assert.assertArrayEquals(new Long[]{null, null, Long.MIN_VALUE}, (Object[]) row.getRaw("long"));
Assert.assertEquals(Arrays.asList(new String[]{"", "", "A"}), row.getRaw("string"));
Assert.assertEquals(Arrays.asList(new Float[]{null, null, Float.MAX_VALUE}), row.getRaw("float"));
Assert.assertEquals(Arrays.asList(new Long[]{null, null, Long.MIN_VALUE}), row.getRaw("long"));
}

@Test
Expand Down

0 comments on commit a2bcd97

Please sign in to comment.