Skip to content

Commit

Permalink
Merge pull request apache#2341 from gianm/smq-test
Browse files Browse the repository at this point in the history
SegmentMetadataQuery: Fix merging of ColumnAnalysis errors.
  • Loading branch information
xvrl committed Jan 27, 2016
2 parents 25630c7 + 795343f commit 2e50040
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public ColumnAnalysis fold(ColumnAnalysis rhs)
return this;
}

if (isError() && rhs.isError()) {
return errorMessage.equals(rhs.getErrorMessage()) ? this : ColumnAnalysis.error("multiple_errors");
} else if (isError()) {
return this;
} else if (rhs.isError()) {
return rhs;
}

if (!type.equals(rhs.getType())) {
return ColumnAnalysis.error("cannot_merge_diff_types");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,34 @@ public static QueryRunner makeIncrementalIndexQueryRunner(
);
}

private final QueryRunner runner;
private final boolean usingMmappedSegment;
private final QueryRunner runner1;
private final QueryRunner runner2;
private final boolean mmap1;
private final boolean mmap2;
private final SegmentMetadataQuery testQuery;
private final SegmentAnalysis expectedSegmentAnalysis;
private final SegmentAnalysis expectedSegmentAnalysis1;
private final SegmentAnalysis expectedSegmentAnalysis2;

@Parameterized.Parameters(name = "runner = {1}")
@Parameterized.Parameters(name = "mmap1 = {0}, mmap2 = {1}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{makeMMappedQueryRunner(FACTORY), "mmap", true},
new Object[]{makeIncrementalIndexQueryRunner(FACTORY), "incremental", false}
new Object[]{true, true},
new Object[]{true, false},
new Object[]{false, true},
new Object[]{false, false}
);
}

public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean usingMmappedSegment)
public SegmentMetadataQueryTest(
boolean mmap1,
boolean mmap2
)
{
this.runner = runner;
this.usingMmappedSegment = usingMmappedSegment;
this.runner1 = mmap1 ? makeMMappedQueryRunner(FACTORY) : makeIncrementalIndexQueryRunner(FACTORY);
this.runner2 = mmap2 ? makeMMappedQueryRunner(FACTORY) : makeIncrementalIndexQueryRunner(FACTORY);
this.mmap1 = mmap1;
this.mmap2 = mmap2;
testQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
Expand All @@ -121,7 +131,7 @@ public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean u
.merge(true)
.build();

expectedSegmentAnalysis = new SegmentAnalysis(
expectedSegmentAnalysis1 = new SegmentAnalysis(
"testSegment",
ImmutableList.of(
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
Expand All @@ -139,7 +149,7 @@ public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean u
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
usingMmappedSegment ? 10881 : 0,
mmap1 ? 10881 : 0,
1,
null
),
Expand All @@ -151,7 +161,41 @@ public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean u
null,
null
)
), usingMmappedSegment ? 71982 : 32643,
), mmap1 ? 71982 : 32643,
1209,
null
);
expectedSegmentAnalysis2 = new SegmentAnalysis(
"testSegment",
ImmutableList.of(
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
),
ImmutableMap.of(
"__time",
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
12090,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
mmap2 ? 10881 : 0,
1,
null
),
"index",
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
9672,
null,
null
)
), mmap2 ? 71982 : 32643,
1209,
null
);
Expand All @@ -162,11 +206,11 @@ public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean u
public void testSegmentMetadataQuery()
{
List<SegmentAnalysis> results = Sequences.toList(
runner.run(testQuery, Maps.newHashMap()),
runner1.run(testQuery, Maps.newHashMap()),
Lists.<SegmentAnalysis>newArrayList()
);

Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis), results);
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis1), results);
}

@Test
Expand Down Expand Up @@ -194,19 +238,21 @@ public void testSegmentMetadataQueryWithHasMultipleValuesMerge()
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
Expand Down Expand Up @@ -254,19 +300,21 @@ public void testSegmentMetadataQueryWithComplexColumnMerge()
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
Expand Down Expand Up @@ -294,7 +342,7 @@ public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
ImmutableList.of(expectedSegmentAnalysis1.getIntervals().get(0)),
ImmutableMap.of(
"__time",
new ColumnAnalysis(
Expand All @@ -308,7 +356,7 @@ public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
usingMmappedSegment ? 21762 : 0,
10881 * ((mmap1 ? 1 : 0) + (mmap2 ? 1 : 0)),
1,
null
),
Expand All @@ -321,20 +369,22 @@ public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
null
)
),
expectedSegmentAnalysis.getSize() * 2,
expectedSegmentAnalysis.getNumRows() * 2,
expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(),
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
Expand Down Expand Up @@ -368,19 +418,21 @@ public void testSegmentMetadataQueryWithNoAnalysisTypesMerge()
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
Expand Down Expand Up @@ -424,19 +476,21 @@ public void testSegmentMetadataQueryWithAggregatorsMerge()
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
expectedAggregators
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
Expand All @@ -463,17 +517,17 @@ public void testSegmentMetadataQueryWithAggregatorsMerge()
public void testBySegmentResults()
{
Result<BySegmentResultValue> bySegmentResult = new Result<BySegmentResultValue>(
expectedSegmentAnalysis.getIntervals().get(0).getStart(),
expectedSegmentAnalysis1.getIntervals().get(0).getStart(),
new BySegmentResultValueClass(
Arrays.asList(
expectedSegmentAnalysis
), expectedSegmentAnalysis.getId(), testQuery.getIntervals().get(0)
expectedSegmentAnalysis1
), expectedSegmentAnalysis1.getId(), testQuery.getIntervals().get(0)
)
);

QueryToolChest toolChest = FACTORY.getToolchest();

QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner1);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.metadata.metadata;

import org.junit.Assert;
import org.junit.Test;

public class ColumnAnalysisTest
{
@Test
public void testFoldStringColumns()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", true, 3L, 4, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", true, 4L, 4, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}

@Test
public void testFoldWithNull()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
Assert.assertEquals(analysis1, analysis1.fold(null));
}

@Test
public void testFoldComplexColumns()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis expected = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}

@Test
public void testFoldDifferentTypes()
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 1L, 1, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("COMPLEX", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:cannot_merge_diff_types");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}

@Test
public void testFoldSameErrors()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("foo");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}

@Test
public void testFoldErrorAndNoError()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}

@Test
public void testFoldDifferentErrors()
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("bar");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:multiple_errors");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
}
}

0 comments on commit 2e50040

Please sign in to comment.