Skip to content

Commit

Permalink
Exclude pagingIdentifiers that don't apply to a datasource (apache#4078)
Browse files Browse the repository at this point in the history
* exclude pagingIdentifiers that don't apply to a datasource to support union datasources

* code review changes

* code review changes
  • Loading branch information
dclim authored and fjy committed Mar 22, 2017
1 parent 1f48198 commit f68ba41
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 33 deletions.
66 changes: 41 additions & 25 deletions api/src/main/java/io/druid/timeline/DataSegmentUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Function;

import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;

import org.joda.time.DateTime;
Expand All @@ -44,51 +45,66 @@ public static Function<String, Interval> INTERVAL_EXTRACTOR(final String datasou
@Override
public Interval apply(String identifier)
{
return valueOf(datasource, identifier).getInterval();
SegmentIdentifierParts segmentIdentifierParts = valueOf(datasource, identifier);
if (segmentIdentifierParts == null) {
throw new IAE("Invalid identifier [%s]", identifier);
}

return segmentIdentifierParts.getInterval();
}
};
}

// ignores shard spec
/**
* Parses a segment identifier into its components: dataSource, interval, version, and any trailing tags. Ignores
* shard spec.
*
* It is possible that this method may incorrectly parse an identifier, for example if the dataSource name in the
* identifier contains a DateTime parseable string such as 'datasource_2000-01-01T00:00:00.000Z' and dataSource was
* provided as 'datasource'. The desired behavior in this case would be to return null since the identifier does not
* actually belong to the provided dataSource but a non-null result would be returned. This is an edge case that would
* currently only affect paged select queries with a union dataSource of two similarly-named dataSources as in the
* given example.
*
* @param dataSource the dataSource corresponding to this identifier
* @param identifier segment identifier
* @return a {@link io.druid.timeline.DataSegmentUtils.SegmentIdentifierParts} object if the identifier could be
* parsed, null otherwise
*/
public static SegmentIdentifierParts valueOf(String dataSource, String identifier)
{
SegmentIdentifierParts segmentDesc = parse(dataSource, identifier);
if (segmentDesc == null) {
throw new IllegalArgumentException("Invalid identifier " + identifier);
}
return segmentDesc;
}

private static SegmentIdentifierParts parse(String dataSource, String identifier)
{
if (!identifier.startsWith(String.format("%s_", dataSource))) {
LOGGER.info("Invalid identifier %s", identifier);
return null;
}

String remaining = identifier.substring(dataSource.length() + 1);
String[] splits = remaining.split(DataSegment.delimiter);
if (splits.length < 3) {
LOGGER.info("Invalid identifier %s", identifier);
return null;
}

DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
DateTime start = formatter.parseDateTime(splits[0]);
DateTime end = formatter.parseDateTime(splits[1]);
String version = splits[2];
String trail = splits.length > 3 ? join(splits, DataSegment.delimiter, 3, splits.length) : null;

return new SegmentIdentifierParts(
dataSource,
new Interval(start.getMillis(), end.getMillis()),
version,
trail
);

try {
DateTime start = formatter.parseDateTime(splits[0]);
DateTime end = formatter.parseDateTime(splits[1]);
String version = splits[2];
String trail = splits.length > 3 ? join(splits, DataSegment.delimiter, 3, splits.length) : null;

return new SegmentIdentifierParts(
dataSource,
new Interval(start.getMillis(), end.getMillis()),
version,
trail
);
} catch (IllegalArgumentException e) {
return null;
}
}

public static String withInterval(final String dataSource, final String identifier, Interval newInterval)
{
SegmentIdentifierParts segmentDesc = DataSegmentUtils.parse(dataSource, identifier);
SegmentIdentifierParts segmentDesc = DataSegmentUtils.valueOf(dataSource, identifier);
if (segmentDesc == null) {
// happens for test segments which has invalid segment id.. ignore for now
LOGGER.warn("Invalid segment identifier " + identifier);
Expand Down
12 changes: 6 additions & 6 deletions api/src/test/java/io/druid/timeline/DataSegmentUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,21 @@ public void testDataSourceWithUnderscore2()
Assert.assertEquals(desc, DataSegmentUtils.valueOf(dataSource, desc.toString()));
}

@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat0()
{
DataSegmentUtils.valueOf("ds", "datasource_2015-01-02T00:00:00.000Z_2014-10-20T00:00:00.000Z_version");
Assert.assertNull(DataSegmentUtils.valueOf("ds", "datasource_2015-01-02T00:00:00.000Z_2014-10-20T00:00:00.000Z_version"));
}

@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat1()
{
DataSegmentUtils.valueOf("datasource", "datasource_invalid_interval_version");
Assert.assertNull(DataSegmentUtils.valueOf("datasource", "datasource_invalid_interval_version"));
}

@Test(expected = IllegalArgumentException.class)
@Test
public void testInvalidFormat2()
{
DataSegmentUtils.valueOf("datasource", "datasource_2015-01-02T00:00:00.000Z_version");
Assert.assertNull(DataSegmentUtils.valueOf("datasource", "datasource_2015-01-02T00:00:00.000Z_version"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -350,7 +351,7 @@ public Sequence<Result<SelectResultValue>> run(
public <T extends LogicalSegment> List<T> filterSegments(SelectQuery query, List<T> segments)
{
// at the point where this code is called, only one datasource should exist.
String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
final String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames());

PagingSpec pagingSpec = query.getPagingSpec();
Map<String, Integer> paging = pagingSpec.getPagingIdentifiers();
Expand All @@ -360,8 +361,22 @@ public <T extends LogicalSegment> List<T> filterSegments(SelectQuery query, List

final Granularity granularity = query.getGranularity();

// A paged select query using a UnionDataSource will return pagingIdentifiers from segments in more than one
// dataSource which confuses subsequent queries and causes a failure. To avoid this, filter only the paging keys
// that are applicable to this dataSource so that each dataSource in a union query gets the appropriate keys.
final Iterable<String> filteredPagingKeys = Iterables.filter(
paging.keySet(), new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return DataSegmentUtils.valueOf(dataSource, input) != null;
}
}
);

List<Interval> intervals = Lists.newArrayList(
Iterables.transform(paging.keySet(), DataSegmentUtils.INTERVAL_EXTRACTOR(dataSource))
Iterables.transform(filteredPagingKeys, DataSegmentUtils.INTERVAL_EXTRACTOR(dataSource))
);
Collections.sort(
intervals, query.isDescending() ? Comparators.intervalsByEndThenStart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
Expand All @@ -33,6 +34,8 @@
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.UnionDataSource;
import io.druid.query.UnionQueryRunner;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.ordering.StringComparators;
import io.druid.segment.IncrementalIndexSegment;
Expand Down Expand Up @@ -313,6 +316,38 @@ private void runDayGranularityTest(SelectQuery query, int[][] expectedOffsets)
}
}

@Test
public void testPagingIdentifiersForUnionDatasource()
{
Druids.SelectQueryBuilder selectQueryBuilder = Druids
.newSelectQueryBuilder()
.dataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(QueryRunnerTestHelper.dataSource),
new TableDataSource("testing-2")
)
)
)
.intervals(SelectQueryRunnerTest.I_0112_0114)
.granularity(QueryRunnerTestHelper.allGran)
.dimensionSpecs(DefaultDimensionSpec.toSpec(QueryRunnerTestHelper.dimensions))
.pagingSpec(PagingSpec.newSpec(3));

SelectQuery query = selectQueryBuilder.build();
QueryRunner unionQueryRunner = new UnionQueryRunner(runner);

List<Result<SelectResultValue>> results = Sequences.toList(
unionQueryRunner.run(query, ImmutableMap.of()),
Lists.<Result<SelectResultValue>>newArrayList()
);

Map<String, Integer> pagingIdentifiers = results.get(0).getValue().getPagingIdentifiers();
query = query.withPagingSpec(toNextCursor(PagingSpec.merge(Arrays.asList(pagingIdentifiers)), query, 3));

Sequences.toList(unionQueryRunner.run(query, ImmutableMap.of()), Lists.<Result<SelectResultValue>>newArrayList());
}

private PagingSpec toNextCursor(Map<String, Integer> merged, SelectQuery query, int threshold)
{
if (!fromNext) {
Expand Down

0 comments on commit f68ba41

Please sign in to comment.