Skip to content

Commit

Permalink
Adds support for empty merge metrics. fixes apache#2256
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelschiff authored and michael.schiff committed Jan 21, 2016
1 parent 5a932d2 commit 1e44445
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 6 deletions.
25 changes: 19 additions & 6 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ public Iterable<String> apply(@Nullable IndexableAdapter input)
}
}
)
.concat(Arrays.<Iterable<String>>asList(new AggFactoryStringIndexed(metricAggs)))
)
),
new Function<String, String>()
Expand All @@ -362,22 +361,36 @@ public String apply(@Nullable String input)
}
}
);
if (mergedMetrics.size() != metricAggs.length) {
throw new IAE("Bad number of metrics[%d], expected [%d]", mergedMetrics.size(), metricAggs.length);
}

final AggregatorFactory[] sortedMetricAggs = new AggregatorFactory[mergedMetrics.size()];
for (int i = 0; i < metricAggs.length; i++) {
AggregatorFactory metricAgg = metricAggs[i];
sortedMetricAggs[mergedMetrics.indexOf(metricAgg.getName())] = metricAgg;
int metricIndex = mergedMetrics.indexOf(metricAgg.getName());
/*
If metricIndex is negative, one of the metricAggs was not present in the union of metrics from the indices
we are merging
*/
if (metricIndex > -1) {
sortedMetricAggs[metricIndex] = metricAgg;
}
}

/*
If there is nothing at sortedMetricAggs[i], then we did not have a metricAgg whose name matched the name
of the ith element of mergedMetrics. I.e. There was a metric in the indices to merge that we did not ask for.
*/
for (int i = 0; i < sortedMetricAggs.length; i++) {
if (sortedMetricAggs[i] == null) {
throw new IAE("Indices to merge contained metric[%s], but requested metrics did not", mergedMetrics.get(i));
}
}

for (int i = 0; i < mergedMetrics.size(); i++) {
if (!sortedMetricAggs[i].getName().equals(mergedMetrics.get(i))) {
throw new IAE(
"Metric mismatch, index[%d] [%s] != [%s]",
i,
metricAggs[i].getName(),
sortedMetricAggs[i].getName(),
mergedMetrics.get(i)
);
}
Expand Down
102 changes: 102 additions & 0 deletions processing/src/test/java/io/druid/segment/IndexMergerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.collections.bitmap.RoaringBitmapFactory;
import com.metamx.common.IAE;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity;
Expand Down Expand Up @@ -1466,7 +1467,108 @@ public void testMismatchedDimensions() throws IOException, IndexSizeExceededExce
tmpDirMerged,
indexSpec
);
}

@Test
public void testMismatchedMetrics() throws IOException
{
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A")
});
closer.closeLater(index1);

IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
closer.closeLater(index2);

IncrementalIndex index3 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index3);

IncrementalIndex index4 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index4);

IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index5);


Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index1, factory),
new IncrementalIndexAdapter(interval, index2, factory),
new IncrementalIndexAdapter(interval, index3, factory),
new IncrementalIndexAdapter(interval, index4, factory),
new IncrementalIndexAdapter(interval, index5, factory)
);

final File tmpDirMerged = temporaryFolder.newFolder();

File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("D", "D")
},
tmpDirMerged,
indexSpec
);

// Since D was not present in any of the indices, it is not present in the output
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));

}

@Test(expected = IAE.class)
public void testMismatchedMetricsVarying() throws IOException
{

IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("C", "C")
});
closer.closeLater(index2);

IncrementalIndex index5 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
new LongSumAggregatorFactory("C", "C"),
new LongSumAggregatorFactory("B", "B")
});
closer.closeLater(index5);


Interval interval = new Interval(0, new DateTime().getMillis());
RoaringBitmapFactory factory = new RoaringBitmapFactory();
ArrayList<IndexableAdapter> toMerge = Lists.<IndexableAdapter>newArrayList(
new IncrementalIndexAdapter(interval, index2, factory)
);

final File tmpDirMerged = temporaryFolder.newFolder();

final File merged = INDEX_MERGER.merge(
toMerge,
new AggregatorFactory[] {
new LongSumAggregatorFactory("B", "B"),
new LongSumAggregatorFactory("A", "A"),
new LongSumAggregatorFactory("D", "D")
},
tmpDirMerged,
indexSpec
);
final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(INDEX_IO.loadIndex(merged)));
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}

private IncrementalIndex getIndexD3() throws Exception
Expand Down

0 comments on commit 1e44445

Please sign in to comment.