Skip to content

Commit

Permalink
Merge pull request apache#2571 from himanshug/gp_by_avoid_sort
Browse files Browse the repository at this point in the history
avoid sort while doing groupBy merging when possible
  • Loading branch information
fjy committed Mar 14, 2016
2 parents a41a70d + dc0214b commit 06813b5
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private IncrementalIndex makeIncIndex()
aggs,
false,
false,
true,
maxRows
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class GroupByQueryHelper
{
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
public final static String CTX_KEY_SORT_RESULTS = "sortResults";

public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
Expand Down Expand Up @@ -81,7 +82,9 @@ public String apply(DimensionSpec input)
);
final IncrementalIndex index;

if (query.getContextBoolean("useOffheap", false)) {
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);

if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
Expand All @@ -90,6 +93,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
bufferPool
);
Expand All @@ -102,6 +106,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,18 @@ private Sequence<Row> mergeGroupByResults(
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}

final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
final Sequence<Row> subqueryResult = mergeGroupByResults(
subquery.withOverriddenContext(
ImmutableMap.<String, Object>of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user.
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
false
)
),
runner,
context
);
final Set<AggregatorFactory> aggs = Sets.newHashSet();

// Nested group-bys work by first running the inner query and then materializing the results in an incremental
Expand Down Expand Up @@ -200,7 +211,14 @@ public boolean apply(AggregatorFactory agg)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();

final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult);
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
innerQuery.withOverriddenContext(
ImmutableMap.<String, Object>of(
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
)
),
subqueryResult
);

//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec.
Expand Down Expand Up @@ -253,7 +271,10 @@ public Sequence<Row> apply(Interval interval)
query.getContext()
).withOverriddenContext(
ImmutableMap.<String, Object>of(
"finalize", false
"finalize", false,
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user.
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false
)
)
, context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -340,6 +341,7 @@ public int lookupId(String name)
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean sortFacts;
private final Metadata metadata;

private final Map<String, MetricDesc> metricDescs;
Expand Down Expand Up @@ -374,7 +376,8 @@ public InputRow get()
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
final boolean reportParseExceptions,
final boolean sortFacts
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
Expand All @@ -383,6 +386,7 @@ public IncrementalIndex(
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;
this.sortFacts = sortFacts;

this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));

Expand Down Expand Up @@ -441,7 +445,7 @@ private DimDim newDimDim(String dimension, ValueType type) {
// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);

public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public abstract ConcurrentMap<TimeAndDims, Integer> getFacts();

public abstract boolean canAppendRow();

Expand Down Expand Up @@ -673,12 +677,20 @@ public int size()

private long getMinTimeMillis()
{
return getFacts().firstKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}

private long getMaxTimeMillis()
{
return getFacts().lastKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}

private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValues)
Expand Down Expand Up @@ -831,7 +843,11 @@ public ColumnCapabilities getCapabilities(String column)

public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return getFacts().subMap(start, end);
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).subMap(start, end);
} else {
throw new UnsupportedOperationException("can't get subMap from unsorted facts data.");
}
}

public Metadata getMetadata()
Expand Down Expand Up @@ -862,7 +878,14 @@ public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> pos
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();

Map<TimeAndDims, Integer> facts = null;
if (descending && sortFacts) {
facts = ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap();
} else {
facts = getFacts();
}

return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -54,7 +55,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();

private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;

private final AtomicInteger indexIncrement = new AtomicInteger(0);

Expand All @@ -74,14 +75,20 @@ public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}

//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
Expand All @@ -103,6 +110,7 @@ public OffheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
Expand All @@ -114,6 +122,7 @@ public OffheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount,
bufferPool
);
Expand All @@ -134,13 +143,14 @@ public OffheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount,
bufferPool
);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -50,7 +49,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private static final Logger log = new Logger(OnheapIncrementalIndex.class);

private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
Expand All @@ -61,12 +60,18 @@ public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}
}

public OnheapIncrementalIndex(
Expand All @@ -75,6 +80,7 @@ public OnheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
Expand All @@ -85,6 +91,7 @@ public OnheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount
);
}
Expand All @@ -103,6 +110,7 @@ public OnheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount
);
}
Expand All @@ -113,11 +121,11 @@ public OnheapIncrementalIndex(
int maxRowCount
)
{
this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount);
this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static void setupClass() throws Exception
},
true,
true,
true,
5000
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
Expand Down Expand Up @@ -311,15 +310,15 @@ public void createIndex(
List<File> toMerge = new ArrayList<>();

try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
File tmp = tempFolder.newFolder();
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void createTestIndex(File segmentDir) throws Exception

IncrementalIndex index = null;
try {
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000);
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, true, 5000);
for (String line : rows) {
index.add(parser.parse(line));
}
Expand Down

0 comments on commit 06813b5

Please sign in to comment.