Skip to content

Commit

Permalink
Merge pull request apache#528 from metamx/union-query-source
Browse files Browse the repository at this point in the history
Union query source
  • Loading branch information
fjy committed May 6, 2014
2 parents 6f3f5c6 + 66450ca commit 79e6d4e
Show file tree
Hide file tree
Showing 34 changed files with 419 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"dataSource": "wikipedia",
"timestampSpec" : {
"column": "timestamp",
"format": "iso",
"format": "iso"
},
"dataSpec": {
"format": "json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -153,13 +154,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
String queryDataSource;
try {
queryDataSource = ((TableDataSource)query.getDataSource()).getName();
}
catch (ClassCastException e) {
throw new IllegalArgumentException("Subqueries are not welcome here");
}
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());

for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
Expand Down
7 changes: 5 additions & 2 deletions processing/src/main/java/io/druid/query/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import java.util.List;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type",
defaultImpl = LegacyDataSource.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = TableDataSource.class, name = "table"),
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query")
@JsonSubTypes.Type(value = QueryDataSource.class, name = "query"),
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union")
})
public interface DataSource
{
public String getName();
public List<String> getNames();
}
31 changes: 31 additions & 0 deletions processing/src/main/java/io/druid/query/DataSourceUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.query;

import java.util.List;

public class DataSourceUtil
{
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();
return names.size() == 1 ? names.get(0) : names.toString();
}
}
2 changes: 1 addition & 1 deletion processing/src/main/java/io/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public SearchQuery build()
public SearchQueryBuilder copy(SearchQuery query)
{
return new SearchQueryBuilder()
.dataSource(((TableDataSource)query.getDataSource()).getName())
.dataSource(query.getDataSource())
.intervals(query.getQuerySegmentSpec())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
Expand Down
2 changes: 2 additions & 0 deletions processing/src/main/java/io/druid/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@ public interface Query<T>
public Query<T> withId(String id);

public String getId();

Query<T> withDataSource(DataSource dataSource);
}
6 changes: 4 additions & 2 deletions processing/src/main/java/io/druid/query/QueryDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.List;

@JsonTypeName("query")
public class QueryDataSource implements DataSource
{
Expand All @@ -38,9 +40,9 @@ public QueryDataSource(@JsonProperty("query") Query query)
}

@Override
public String getName()
public List<String> getNames()
{
return query.getDataSource().getName();
return query.getDataSource().getNames();
}

@JsonProperty
Expand Down
12 changes: 10 additions & 2 deletions processing/src/main/java/io/druid/query/TableDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;

@JsonTypeName("table")
public class TableDataSource implements DataSource
Expand All @@ -37,10 +41,14 @@ public TableDataSource(@JsonProperty("name") String name)
}

@JsonProperty
public String getName(){
return name;
}

@Override
public String getName()
public List<String> getNames()
{
return name;
return Arrays.asList(name);
}

public String toString() { return name; }
Expand Down
100 changes: 100 additions & 0 deletions processing/src/main/java/io/druid/query/UnionDataSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/

package io.druid.query;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import java.util.List;

public class UnionDataSource implements DataSource
{
@JsonProperty
private final List<TableDataSource> dataSources;

@JsonCreator
public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSources)
{
Preconditions.checkNotNull(dataSources, "dataSources cannot be null for unionDataSource");
this.dataSources = dataSources;
}

@Override
public List<String> getNames()
{
return Lists.transform(
dataSources,
new Function<TableDataSource, String>()
{
@Override
public String apply(TableDataSource input)
{
return Iterables.getOnlyElement(input.getNames());
}
}
);
}

@JsonProperty
public List<TableDataSource> getDataSources()
{
return dataSources;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

UnionDataSource that = (UnionDataSource) o;

if (!dataSources.equals(that.dataSources)) {
return false;
}

return true;
}

@Override
public int hashCode()
{
return dataSources.hashCode();
}

@Override
public String toString()
{
return "UnionDataSource{" +
"dataSources=" + dataSources +
'}';
}
}
63 changes: 63 additions & 0 deletions processing/src/main/java/io/druid/query/UnionQueryRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Druid - a distributed column store.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* This file Copyright (C) 2014 N3TWORK, Inc. and contributed to the Druid project
* under the Druid Corporate Contributor License Agreement.
*/

package io.druid.query;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;

public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;

public UnionQueryRunner(QueryRunner<T> baseRunner)
{
this.baseRunner = baseRunner;
}

@Override
public Sequence<T> run(final Query<T> query)
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return Sequences.concat(
Iterables.transform(
((UnionDataSource) dataSource).getDataSources(),
new Function<DataSource, Sequence<T>>()
{
@Override
public Sequence<T> apply(DataSource singleSource)
{
return baseRunner.run(
query.withDataSource(singleSource)
);
}
}
)
);
} else {
return baseRunner.run(query);
}
}

}
18 changes: 18 additions & 0 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec)
);
}

@Override
public Query<Row> withDataSource(DataSource dataSource)
{
return new GroupByQuery(
dataSource,
getQuerySegmentSpec(),
dimFilter,
granularity,
dimensions,
aggregatorSpecs,
postAggregatorSpecs,
havingSpec,
limitSpec,
orderByLimitFn,
getContext()
);
}

public static class Builder
{
private DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.DataSource;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
Expand Down Expand Up @@ -163,7 +164,7 @@ public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
}

return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString())
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser4("groupBy")
.setUser5(Joiner.on(",").join(query.getIntervals()))
Expand Down Expand Up @@ -203,6 +204,7 @@ public TypeReference<Row> getResultTypeReference()
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new SubqueryQueryRunner<Row>(
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod()));
new IntervalChunkingQueryRunner<Row>(runner, configSupplier.get().getChunkPeriod())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
Expand Down Expand Up @@ -147,7 +148,7 @@ public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
}

return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource().toString())
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
Expand Down
Loading

0 comments on commit 79e6d4e

Please sign in to comment.