Skip to content

Commit

Permalink
Add "stringEncoding" parameter to DataSketches HLL. (apache#11201)
Browse files Browse the repository at this point in the history
* Add "stringEncoding" parameter to DataSketches HLL.

Builds on the concept from apache#11172 and adds a way to feed HLL sketches
with UTF-8 bytes.

This must be an option rather than always-on, because prior to this
patch, HLL sketches used UTF-16LE encoding when hashing strings. To
remain compatible with sketch images created prior to this patch -- which
matters during rolling updates and when reading sketches that have been
written to segments -- we must keep UTF-16LE as the default.

Not currently documented, because I'm not yet sure how best to expose
this functionality to users. I think the first place would be in the SQL
layer: we could have it automatically select UTF-8 or UTF-16LE when
building sketches at query time. We need to be careful about this, though,
because UTF-8 isn't always faster. Sometimes, like for the results of
expressions, UTF-16LE is faster. I expect we will sort this out in
future patches.

* Fix benchmark.

* Fix style issues, improve test coverage.

* Put round back, to make IT updates easier.

* Fix test.

* Fix issue with filtered aggregators and add test.

* Use DS native update(ByteBuffer) method. Improve test coverage.

* Add another suppression.

* Fix ITAutoCompactionTest.

* Update benchmarks.

* Updates.

* Fix conflict.

* Adjustments.
  • Loading branch information
gianm authored Jun 30, 2023
1 parent 4b2d873 commit 67fbd8e
Show file tree
Hide file tree
Showing 38 changed files with 2,171 additions and 538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class DataSketchesHllBenchmark
null,
null,
null,
null,
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctUtf8SqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
Expand Down Expand Up @@ -405,26 +407,35 @@ public class SqlBenchmark
"SELECT * FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100')",
"SELECT * FROM foo WHERE dimSequential > '10' AND dimSequential < '8500'",
"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100') GROUP BY 1, 2",
"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2"

"SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2",

// 28, 29, 30, 31: Approximate count distinct of strings
"SELECT APPROX_COUNT_DISTINCT_BUILTIN(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_HLL(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_HLL_UTF8(dimZipf) FROM foo",
"SELECT APPROX_COUNT_DISTINCT_DS_THETA(dimZipf) FROM foo"
);

@Param({"5000000"})
private int rowsPerSegment;

@Param({"false", "force"})
// Can be "false", "true", or "force"
@Param({"force"})
private String vectorize;
@Param({"none", "front-coded-4", "front-coded-16"})

// Can be "none" or "front-coded-N"
@Param({"none", "front-coded-4"})
private String stringEncoding;

@Param({"4", "5", "6", "7", "8", "10", "11", "12", "19", "21", "22", "23", "26", "27"})
@Param({"28", "29", "30", "31"})
private String query;

@Param({STORAGE_MMAP, STORAGE_FRAME_ROW, STORAGE_FRAME_COLUMNAR})
// Can be STORAGE_MMAP, STORAGE_FRAME_ROW, or STORAGE_FRAME_COLUMNAR
@Param({STORAGE_MMAP})
private String storageType;

private SqlEngine engine;

@Nullable
private PlannerFactory plannerFactory;
private final Closer closer = Closer.create();
Expand Down Expand Up @@ -520,13 +531,19 @@ private static DruidOperatorTable createOperatorTable()
try {
final Set<SqlOperatorConversion> extractionOperators = new HashSet<>();
extractionOperators.add(CalciteTests.INJECTOR.getInstance(QueryLookupOperatorConversion.class));
final Set<SqlAggregator> aggregators = new HashSet<>();
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchApproxQuantileSqlAggregator.class));
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchObjectSqlAggregator.class));
final ApproxCountDistinctSqlAggregator countDistinctSqlAggregator =
new ApproxCountDistinctSqlAggregator(new HllSketchApproxCountDistinctSqlAggregator());
aggregators.add(new CountSqlAggregator(countDistinctSqlAggregator));
aggregators.add(countDistinctSqlAggregator);
final Set<SqlAggregator> aggregators = new HashSet<>(
ImmutableList.of(
new DoublesSketchApproxQuantileSqlAggregator(),
new DoublesSketchObjectSqlAggregator(),
new HllSketchApproxCountDistinctSqlAggregator(),
new HllSketchApproxCountDistinctUtf8SqlAggregator(),
new ThetaSketchApproxCountDistinctSqlAggregator(),
new CountSqlAggregator(countDistinctSqlAggregator),
countDistinctSqlAggregator
)
);
return new DruidOperatorTable(aggregators, extractionOperators);
}
catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions codestyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
<Or>
<Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/>
<Class name="org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.hll.Union;
import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
Expand All @@ -47,6 +49,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
public static final boolean DEFAULT_SHOULD_FINALIZE = true;
public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;
public static final StringEncoding DEFAULT_STRING_ENCODING = StringEncoding.UTF16LE;

static final Comparator<HllSketchHolder> COMPARATOR =
Comparator.nullsFirst(Comparator.comparingDouble(HllSketchHolder::getEstimate));
Expand All @@ -55,6 +58,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final int lgK;
private final TgtHllType tgtHllType;
private final StringEncoding stringEncoding;
private final boolean shouldFinalize;
private final boolean round;

Expand All @@ -63,6 +67,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
final String fieldName,
@Nullable final Integer lgK,
@Nullable final String tgtHllType,
@Nullable final StringEncoding stringEncoding,
final Boolean shouldFinalize,
final boolean round
)
Expand All @@ -71,6 +76,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory
this.fieldName = Objects.requireNonNull(fieldName);
this.lgK = lgK == null ? DEFAULT_LG_K : lgK;
this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType);
this.stringEncoding = stringEncoding == null ? DEFAULT_STRING_ENCODING : stringEncoding;
this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize;
this.round = round;
}
Expand Down Expand Up @@ -100,6 +106,13 @@ public String getTgtHllType()
return tgtHllType.toString();
}

@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = StringEncodingDefaultUTF16LEJsonIncludeFilter.class)
public StringEncoding getStringEncoding()
{
return stringEncoding;
}

@JsonProperty
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class)
public boolean isShouldFinalize()
Expand All @@ -121,14 +134,23 @@ public List<String> requiredFields()
}

/**
* This is a convoluted way to return a list of input field names this aggregator needs.
* Currently the returned factories are only used to obtain a field name by calling getName() method.
* Used by groupBy v1 to create a "transfer aggregator".
*
* {@inheritDoc}
*/
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(
new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), shouldFinalize, round)
new HllSketchBuildAggregatorFactory(
fieldName,
fieldName,
lgK,
tgtHllType.toString(),
stringEncoding,
shouldFinalize,
round
)
);
}

Expand Down Expand Up @@ -228,6 +250,7 @@ public AggregatorFactory getCombiningFactory()
getName(),
getLgK(),
getTgtHllType(),
getStringEncoding(),
isShouldFinalize(),
isRound()
);
Expand All @@ -236,8 +259,13 @@ public AggregatorFactory getCombiningFactory()
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName)
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
return new CacheKeyBuilder(getCacheTypeId())
.appendString(name)
.appendString(fieldName)
.appendInt(lgK)
.appendInt(tgtHllType.ordinal())
.appendCacheable(stringEncoding)
.build();
}

@Override
Expand All @@ -255,13 +283,14 @@ public boolean equals(Object o)
&& round == that.round
&& Objects.equals(name, that.name)
&& Objects.equals(fieldName, that.fieldName)
&& tgtHllType == that.tgtHllType;
&& tgtHllType == that.tgtHllType
&& stringEncoding == that.stringEncoding;
}

@Override
public int hashCode()
{
return Objects.hash(name, fieldName, lgK, tgtHllType, shouldFinalize, round);
return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round);
}

@Override
Expand All @@ -272,6 +301,7 @@ public String toString()
", fieldName='" + fieldName + '\'' +
", lgK=" + lgK +
", tgtHllType=" + tgtHllType +
(stringEncoding != DEFAULT_STRING_ENCODING ? ", stringEncoding=" + stringEncoding : "") +
(shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") +
(round != DEFAULT_ROUND ? ", round=" + round : "") +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,27 @@

import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or any numeric type.
*/
public class HllSketchBuildAggregator implements Aggregator
{

private final ColumnValueSelector<Object> selector;
private final Consumer<Supplier<HllSketch>> processor;
private HllSketch sketch;

public HllSketchBuildAggregator(
final ColumnValueSelector<Object> selector,
final Consumer<Supplier<HllSketch>> processor,
final int lgK,
final TgtHllType tgtHllType
)
{
this.selector = selector;
this.processor = processor;
this.sketch = new HllSketch(lgK, tgtHllType);
}

Expand All @@ -54,15 +51,9 @@ public HllSketchBuildAggregator(
* See https://github.com/druid-io/druid/pull/3956
*/
@Override
public void aggregate()
public synchronized void aggregate()
{
final Object value = selector.getObject();
if (value == null) {
return;
}
synchronized (this) {
updateSketch(sketch, value);
}
processor.accept(() -> sketch);
}

/*
Expand Down Expand Up @@ -93,36 +84,4 @@ public long getLong()
{
throw new UnsupportedOperationException("Not implemented");
}

static void updateSketch(final HllSketch sketch, final Object value)
{
if (value instanceof Integer || value instanceof Long) {
sketch.update(((Number) value).longValue());
} else if (value instanceof Float || value instanceof Double) {
sketch.update(((Number) value).doubleValue());
} else if (value instanceof String) {
sketch.update(((String) value).toCharArray());
} else if (value instanceof List) {
// noinspection rawtypes
for (Object entry : (List) value) {
if (entry != null) {
final String asString = entry.toString();
if (!NullHandling.isNullOrEquivalent(asString)) {
sketch.update(asString);
}
}
}
} else if (value instanceof char[]) {
sketch.update((char[]) value);
} else if (value instanceof byte[]) {
sketch.update((byte[]) value);
} else if (value instanceof int[]) {
sketch.update((int[]) value);
} else if (value instanceof long[]) {
sketch.update((long[]) value);
} else {
throw new IAE("Unsupported type " + value.getClass());
}
}

}
Loading

0 comments on commit 67fbd8e

Please sign in to comment.