Skip to content

Commit

Permalink
initial working version
Browse files Browse the repository at this point in the history
  • Loading branch information
nishantmonu51 committed Feb 10, 2014
1 parent 38e9a79 commit 533a263
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
Expand All @@ -51,10 +52,9 @@
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
* must be fully cached in memory before the results for Aa and Ab are computed.
*/
public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public class ChainedExecutionQueryRunner<T> implements ParallelQueryRunner<T>
{
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);

private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec;
private final Ordering<T> ordering;
Expand Down Expand Up @@ -157,4 +157,63 @@ public void cleanup(Iterator<T> tIterator)
}
);
}

@Override
public <OutType> OutType runAndAccumulate(
final Query<T> query,
final OutType outType, final Accumulator<OutType, T> outTypeTAccumulator
)
{
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));

if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
return outType;
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<T> input)
{
return exec.submit(
new PrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(query).accumulate(outType, outTypeTAccumulator);
return true;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);

// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}


return outType;
}
}
14 changes: 14 additions & 0 deletions processing/src/main/java/io/druid/query/ParallelQueryRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.druid.query;

public interface ParallelQueryRunner<T> extends QueryRunner<T>
{

/**
* accumulator passed should be thread safe
*/
<OutType> OutType runAndAccumulate(
Query<T> query,
OutType outType,
com.metamx.common.guava.Accumulator<OutType, T> outTypeTAccumulator
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.druid.query.aggregation;

public class Aggregators
{

public static Aggregator synchronizedAggregator(Aggregator aggregator){
return new SynchronizedAggregator(aggregator);
}

private static class SynchronizedAggregator implements Aggregator
{

private final Aggregator delegate;

SynchronizedAggregator(Aggregator delegate)
{
this.delegate = delegate;
}

@Override
public synchronized void aggregate()
{
delegate.aggregate();
}

@Override
public synchronized void reset()
{
delegate.reset();
}

@Override
public synchronized Object get()
{
return delegate.get();
}

@Override
public synchronized float getFloat()
{
return delegate.getFloat();
}

@Override
public synchronized String getName()
{
return delegate.getName();
}

@Override
public synchronized void close()
{
delegate.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.ParallelQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
Expand All @@ -61,7 +62,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");

private final Supplier<GroupByQueryConfig> configSupplier;

@Inject
Expand Down Expand Up @@ -121,28 +121,32 @@ public String apply(@Nullable DimensionSpec input)
}
}
);
IncrementalIndex index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
);
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}

final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
};

return accumulated;
}
}
);

if (runner instanceof ParallelQueryRunner && Boolean.getBoolean("optimize")) {
index = (IncrementalIndex) ((ParallelQueryRunner) runner).runAndAccumulate(query, index, accumulator);
} else {
index = runner.run(query).accumulate(index, accumulator);
}

// convert millis back to timestamp according to granularity to preserve time zone information
Sequence<Row> retVal = Sequences.map(
Expand Down
Loading

0 comments on commit 533a263

Please sign in to comment.