Skip to content

Commit

Permalink
vector group by support for string expressions (apache#11010)
Browse files Browse the repository at this point in the history
* vector group by support for string expressions

* fix test

* comments, javadoc
  • Loading branch information
clintropolis authored Apr 9, 2021
1 parent de69180 commit 338886f
Show file tree
Hide file tree
Showing 18 changed files with 503 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ public String getFormatString()
// 24: group by long expr with non-expr agg
"SELECT (long1 * long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
// 25: group by non-expr with expr agg
"SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2"
"SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
// 26: group by string expr with non-expr agg
"SELECT CONCAT(string2, '-', long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
// 27: group by string expr with expr agg
"SELECT CONCAT(string2, '-', long2), SUM(long1 * double4) FROM foo GROUP BY 1 ORDER BY 2"
);

@Param({"5000000"})
Expand Down Expand Up @@ -211,7 +215,9 @@ public String getFormatString()
"22",
"23",
"24",
"25"
"25",
"26",
"27"
})
private String query;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.math.expr.vector.VectorMathProcessors;
import org.apache.druid.math.expr.vector.VectorStringProcessors;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -63,12 +64,19 @@ protected double evalDouble(double left, double right)
@Override
public boolean canVectorize(InputBindingInspector inspector)
{
return inspector.areNumeric(left, right) && inspector.canVectorize(left, right);
return inspector.areScalar(left, right) && inspector.canVectorize(left, right);
}

@Override
public <T> ExprVectorProcessor<T> buildVectorized(VectorInputBindingInspector inspector)
{
ExprType type = ExprTypeConversion.operator(
left.getOutputType(inspector),
right.getOutputType(inspector)
);
if (ExprType.STRING.equals(type)) {
return VectorStringProcessors.concat(inspector, left, right);
}
return VectorMathProcessors.plus(inspector, left, right);
}
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,36 @@ default boolean areNumeric(Expr... args)
return areNumeric(Arrays.asList(args));
}

/**
* Check if all provided {@link Expr} can infer the output type as {@link ExprType#isScalar()} (non-array) with a
* value of true.
*
* There must be at least one expression with a computable scalar output type for this method to return true.
*/
default boolean areScalar(List<Expr> args)
{
boolean scalar = true;
for (Expr arg : args) {
ExprType argType = arg.getOutputType(this);
if (argType == null) {
continue;
}
scalar &= argType.isScalar();
}
return scalar;
}

/**
* Check if all provided {@link Expr} can infer the output type as {@link ExprType#isScalar()} (non-array) with a
* value of true.
*
* There must be at least one expression with a computable scalar output type for this method to return true.
*/
default boolean areScalar(Expr... args)
{
return areScalar(Arrays.asList(args));
}

/**
* Check if every provided {@link Expr} computes {@link Expr#canVectorize(InputBindingInspector)} to a value of true
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/ExprType.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public boolean isNumeric()
return isNumeric(this);
}

public boolean isScalar()
{
return isScalar(this);
}

/**
* The expression system does not distinguish between {@link ValueType#FLOAT} and {@link ValueType#DOUBLE}, and
* cannot currently handle {@link ValueType#COMPLEX} inputs. This method will convert {@link ValueType#FLOAT} to
Expand Down Expand Up @@ -131,6 +136,11 @@ public static boolean isNumeric(@Nullable ExprType type)
return LONG.equals(type) || DOUBLE.equals(type);
}

public static boolean isScalar(@Nullable ExprType exprType)
{
return !isArray(exprType);
}

public static boolean isArray(@Nullable ExprType type)
{
return LONG_ARRAY.equals(type) || DOUBLE_ARRAY.equals(type) || STRING_ARRAY.equals(type);
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/druid/math/expr/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.math.expr.vector.VectorMathProcessors;
import org.apache.druid.math.expr.vector.VectorProcessors;
import org.apache.druid.math.expr.vector.VectorStringProcessors;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -2191,6 +2192,21 @@ public ExprType getOutputType(Expr.InputBindingInspector inspector, List<Expr> a
{
return ExprType.STRING;
}

@Override
public boolean canVectorize(Expr.InputBindingInspector inspector, List<Expr> args)
{
return inspector.areScalar(args) && inspector.canVectorize(args);
}

@Override
public <T> ExprVectorProcessor<T> asVectorProcessor(
Expr.VectorInputBindingInspector inspector,
List<Expr> args
)
{
return VectorStringProcessors.concat(inspector, args);
}
}

class StrlenFunc implements Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import java.lang.reflect.Array;

/**
* Base {@link ExprVectorProcessor} for expressions and functions with 2 'object' typed inputs (strings, arrays)
* Base {@link ExprVectorProcessor} for expressions and functions with 2 'object' typed inputs (strings, arrays).
*
* In SQL compatible null handling mode, for a row with either left or right input as a null value, it will be handled
* by {@link #processNull(int)} instead of {@link #processIndex(Object, Object, int)}.
*/
public abstract class BivariateFunctionVectorObjectProcessor<TLeftInput, TRightInput, TOutput>
implements ExprVectorProcessor<TOutput>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr.vector;

import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprType;

/**
* many strings enter, one string leaves...
*/
public abstract class StringOutMultiStringInVectorProcessor implements ExprVectorProcessor<String[]>
{
final ExprVectorProcessor<String[]>[] inputs;
final int maxVectorSize;
final String[] outValues;
final boolean sqlCompatible = NullHandling.sqlCompatible();

protected StringOutMultiStringInVectorProcessor(
ExprVectorProcessor<String[]>[] inputs,
int maxVectorSize
)
{
this.inputs = inputs;
this.maxVectorSize = maxVectorSize;
this.outValues = new String[maxVectorSize];
}

@Override
public ExprType getOutputType()
{
return ExprType.STRING;
}

@Override
public ExprEvalVector<String[]> evalVector(Expr.VectorInputBinding bindings)
{
final int currentSize = bindings.getCurrentVectorSize();
final String[][] in = new String[inputs.length][];
for (int i = 0; i < inputs.length; i++) {
in[i] = inputs[i].evalVector(bindings).values();
}

for (int i = 0; i < currentSize; i++) {
processIndex(in, i);
}
return new ExprEvalStringVector(outValues);
}

abstract void processIndex(String[][] in, int i);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@

package org.apache.druid.math.expr.vector;

import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.ExprType;

import javax.annotation.Nullable;

public abstract class StringOutStringsInFunctionVectorProcessor
extends BivariateFunctionVectorObjectProcessor<String[], String[], String[]>
{
final boolean sqlCompatible = NullHandling.sqlCompatible();

protected StringOutStringsInFunctionVectorProcessor(
ExprVectorProcessor<String[]> left,
ExprVectorProcessor<String[]> right,
Expand All @@ -44,7 +41,7 @@ protected StringOutStringsInFunctionVectorProcessor(
}

@Nullable
abstract String processValue(@Nullable String leftVal, @Nullable String rightVal);
protected abstract String processValue(@Nullable String leftVal, @Nullable String rightVal);

@Override
void processIndex(String[] strings, String[] strings2, int i)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.math.expr.vector;

import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprType;

import javax.annotation.Nullable;
import java.util.List;

public class VectorStringProcessors
{
public static <T> ExprVectorProcessor<T> concat(Expr.VectorInputBindingInspector inspector, Expr left, Expr right)
{
final ExprVectorProcessor processor;
if (NullHandling.sqlCompatible()) {
processor = new StringOutStringsInFunctionVectorProcessor(
left.buildVectorized(inspector),
right.buildVectorized(inspector),
inspector.getMaxVectorSize()
)
{
@Nullable
@Override
protected String processValue(@Nullable String leftVal, @Nullable String rightVal)
{
// in sql compatible mode, nulls are handled by super class and never make it here...
return leftVal + rightVal;
}
};
} else {
processor = new StringOutStringsInFunctionVectorProcessor(
left.buildVectorized(inspector),
right.buildVectorized(inspector),
inspector.getMaxVectorSize()
)
{
@Nullable
@Override
protected String processValue(@Nullable String leftVal, @Nullable String rightVal)
{
return NullHandling.nullToEmptyIfNeeded(leftVal) + NullHandling.nullToEmptyIfNeeded(rightVal);
}
};
}
return processor;
}

public static <T> ExprVectorProcessor<T> concat(Expr.VectorInputBindingInspector inspector, List<Expr> inputs)
{
final ExprVectorProcessor<String[]>[] inputProcessors = new ExprVectorProcessor[inputs.size()];
for (int i = 0; i < inputs.size(); i++) {
inputProcessors[i] = CastToTypeVectorProcessor.cast(inputs.get(i).buildVectorized(inspector), ExprType.STRING);
}
final ExprVectorProcessor processor = new StringOutMultiStringInVectorProcessor(
inputProcessors,
inspector.getMaxVectorSize()
)
{
@Override
void processIndex(String[][] in, int i)
{
// Result of concatenation is null if any of the Values is null.
// e.g. 'select CONCAT(null, "abc") as c;' will return null as per Standard SQL spec.
String first = NullHandling.nullToEmptyIfNeeded(in[0][i]);
if (first == null) {
outValues[i] = null;
return;
}
final StringBuilder builder = new StringBuilder(first);
for (int inputNumber = 1; inputNumber < in.length; inputNumber++) {
final String s = NullHandling.nullToEmptyIfNeeded(in[inputNumber][i]);
if (s == null) {
outValues[i] = null;
return;
} else {
builder.append(s);
}
}
outValues[i] = builder.toString();
}
};
return processor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ public void testCast()
testFunctions(types, templates, args);
}

@Test
public void testStringFns()
{
testExpression("s1 + s2", types);
testExpression("s1 + '-' + s2", types);
testExpression("concat(s1, s2)", types);
testExpression("concat(s1,'-',s2,'-',l1,'-',d1)", types);
}

static void testFunctions(Map<String, ExprType> types, String[] templates, String[] args)
{
for (String template : templates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ public void testBloomFilterExprFilter() throws Exception
@Test
public void testBloomFilterVirtualColumn() throws Exception
{
// Cannot vectorize due to expression virtual columns.
cannotVectorize();

BloomKFilter filter = new BloomKFilter(1500);
filter.addString("def-foo");
byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
Expand Down
Loading

0 comments on commit 338886f

Please sign in to comment.