Skip to content

Commit

Permalink
groupBy improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
nishantmonu51 committed Feb 11, 2014
1 parent 533a263 commit 3302cff
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
Expand All @@ -52,9 +51,10 @@
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
* must be fully cached in memory before the results for Aa and Ab are computed.
*/
public class ChainedExecutionQueryRunner<T> implements ParallelQueryRunner<T>
public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);

private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec;
private final Ordering<T> ordering;
Expand Down Expand Up @@ -157,63 +157,4 @@ public void cleanup(Iterator<T> tIterator)
}
);
}

@Override
public <OutType> OutType runAndAccumulate(
final Query<T> query,
final OutType outType, final Accumulator<OutType, T> outTypeTAccumulator
)
{
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));

if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
return outType;
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<T> input)
{
return exec.submit(
new PrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(query).accumulate(outType, outTypeTAccumulator);
return true;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);

// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}


return outType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.druid.query;

import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;

public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, Arrays.asList(queryables));
}

public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = exec;
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
}

@Override
public Sequence<Row> run(final Query<Row> queryParam)
{

final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get()
);
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));

if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new PrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);

// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
}

}
14 changes: 0 additions & 14 deletions processing/src/main/java/io/druid/query/ParallelQueryRunner.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.query.groupby;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;

import javax.annotation.Nullable;
import java.util.List;

public class GroupByQueryHelper
{
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config
)
{
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();

// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();

final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
);
IncrementalIndex index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
);

Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}

return accumulated;
}
};
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator);
}

}
Loading

0 comments on commit 3302cff

Please sign in to comment.