Skip to content

Commit

Permalink
groupBy: Omit timestamp from merge key when granularity = all. (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm authored and b-slim committed Sep 1, 2016
1 parent 6d25c5e commit 8ed1894
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
Expand All @@ -49,6 +50,7 @@
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -78,7 +80,9 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>>
Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint");

final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final DateTime fudgeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query);
final Grouper.KeySerdeFactory<RowBasedKey> keySerdeFactory = new RowBasedKeySerdeFactory(
fudgeTimestamp,
query.getDimensions().size(),
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint)
);
Expand Down Expand Up @@ -295,29 +299,32 @@ public String toString()
}
}

static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
{
private final DateTime fudgeTimestamp;
private final int dimCount;
private final long maxDictionarySize;

public RowBasedKeySerdeFactory(int dimCount, long maxDictionarySize)
public RowBasedKeySerdeFactory(DateTime fudgeTimestamp, int dimCount, long maxDictionarySize)
{
this.fudgeTimestamp = fudgeTimestamp;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
}

@Override
public Grouper.KeySerde<RowBasedKey> factorize()
{
return new RowBasedKeySerde(dimCount, maxDictionarySize);
return new RowBasedKeySerde(fudgeTimestamp, dimCount, maxDictionarySize);
}
}

static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;

private final DateTime fudgeTimestamp;
private final int dimCount;
private final int keySize;
private final ByteBuffer keyBuffer;
Expand All @@ -331,11 +338,16 @@ static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedKey>
// dictionary id -> its position if it were sorted by dictionary value
private int[] sortableIds = null;

public RowBasedKeySerde(final int dimCount, final long maxDictionarySize)
public RowBasedKeySerde(
final DateTime fudgeTimestamp,
final int dimCount,
final long maxDictionarySize
)
{
this.fudgeTimestamp = fudgeTimestamp;
this.dimCount = dimCount;
this.maxDictionarySize = maxDictionarySize;
this.keySize = Longs.BYTES + dimCount * Ints.BYTES;
this.keySize = (fudgeTimestamp == null ? Longs.BYTES : 0) + dimCount * Ints.BYTES;
this.keyBuffer = ByteBuffer.allocate(keySize);
}

Expand All @@ -355,25 +367,31 @@ public Class<RowBasedKey> keyClazz()
public ByteBuffer toByteBuffer(RowBasedKey key)
{
keyBuffer.rewind();
keyBuffer.putLong(key.getTimestamp());

if (fudgeTimestamp == null) {
keyBuffer.putLong(key.getTimestamp());
}

for (int i = 0; i < key.getDimensions().length; i++) {
final int id = addToDictionary(key.getDimensions()[i]);
if (id < 0) {
return null;
}
keyBuffer.putInt(id);
}

keyBuffer.flip();
return keyBuffer;
}

@Override
public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position)
{
final long timestamp = buffer.getLong(position);
final long timestamp = fudgeTimestamp == null ? buffer.getLong(position) : fudgeTimestamp.getMillis();
final String[] dimensions = new String[dimCount];
final int dimsPosition = fudgeTimestamp == null ? position + Longs.BYTES : position;
for (int i = 0; i < dimensions.length; i++) {
dimensions[i] = dictionary.get(buffer.getInt(position + Longs.BYTES + (Ints.BYTES * i)));
dimensions[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * i)));
}
return new RowBasedKey(timestamp, dimensions);
}
Expand All @@ -393,30 +411,52 @@ public Grouper.KeyComparator comparator()
}
}

return new Grouper.KeyComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
if (fudgeTimestamp == null) {
return new Grouper.KeyComparator()
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
final int timeCompare = Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
if (timeCompare != 0) {
return timeCompare;
}

for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
);
for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + Longs.BYTES + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + Longs.BYTES + (Ints.BYTES * i))]
);

if (cmp != 0) {
return cmp;
if (cmp != 0) {
return cmp;
}
}

return 0;
}
};
} else {
return new Grouper.KeyComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
for (int i = 0; i < dimCount; i++) {
final int cmp = Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + (Ints.BYTES * i))],
sortableIds[rhsBuffer.getInt(rhsPosition + (Ints.BYTES * i))]
);

return 0;
}
};
if (cmp != 0) {
return cmp;
}
}

return 0;
}
};
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Smile;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
Expand Down Expand Up @@ -88,6 +87,29 @@ public GroupByStrategyV2(
this.queryWatcher = queryWatcher;
}

/**
* If "query" has a single universal timestamp, return it. Otherwise return null. This is useful
* for keeping timestamps in sync across partial queries that may have different intervals.
*
* @param query the query
*
* @return universal timestamp, or null
*/
public static DateTime getUniversalTimestamp(final GroupByQuery query)
{
final QueryGranularity gran = query.getGranularity();
final String timestampStringFromContext = query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "");

if (!timestampStringFromContext.isEmpty()) {
return new DateTime(Long.parseLong(timestampStringFromContext));
} else if (QueryGranularities.ALL.equals(gran)) {
final long timeStart = query.getIntervals().get(0).getStartMillis();
return new DateTime(gran.iterable(timeStart, timeStart + 1).iterator().next());
} else {
return null;
}
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand All @@ -113,17 +135,8 @@ protected BinaryFn<Row, Row, Row> createMergeFn(Query<Row> queryParam)
}
};

// Fudge timestamp, maybe. Necessary to keep timestamps in sync across partial queries.
final QueryGranularity gran = query.getGranularity();
final String fudgeTimestamp;
if (query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "").isEmpty() && gran instanceof AllGranularity) {
final long timeStart = query.getIntervals().get(0).getStartMillis();
fudgeTimestamp = String.valueOf(
new DateTime(gran.iterable(timeStart, timeStart + 1).iterator().next()).getMillis()
);
} else {
fudgeTimestamp = query.getContextValue(CTX_KEY_FUDGE_TIMESTAMP, "");
}
// Fudge timestamp, maybe.
final DateTime fudgeTimestamp = getUniversalTimestamp(query);

return query.applyLimit(
Sequences.map(
Expand All @@ -132,7 +145,7 @@ protected BinaryFn<Row, Row, Row> createMergeFn(Query<Row> queryParam)
query.getDataSource(),
query.getQuerySegmentSpec(),
query.getDimFilter(),
gran,
query.getGranularity(),
query.getDimensions(),
query.getAggregatorSpecs(),
// Don't do post aggs until the end of this method.
Expand All @@ -145,7 +158,7 @@ protected BinaryFn<Row, Row, Row> createMergeFn(Query<Row> queryParam)
ImmutableMap.<String, Object>of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis())
)
),
responseContext
Expand Down

0 comments on commit 8ed1894

Please sign in to comment.