Skip to content

Commit

Permalink
[FLINK-12254][table] Update value literals to new type system
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Jun 5, 2019
1 parent 0bd3cf6 commit 758780f
Show file tree
Hide file tree
Showing 37 changed files with 721 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
package org.apache.flink.table.expressions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;

import java.util.Arrays;
import java.util.Optional;

/**
* Utilities for API-specific {@link Expression}s.
Expand Down Expand Up @@ -55,8 +52,8 @@ public static ValueLiteralExpression valueLiteral(Object value) {
return new ValueLiteralExpression(value);
}

public static ValueLiteralExpression valueLiteral(Object value, TypeInformation<?> type) {
return new ValueLiteralExpression(value, type);
public static ValueLiteralExpression valueLiteral(Object value, DataType dataType) {
return new ValueLiteralExpression(value, dataType);
}

public static TypeLiteralExpression typeLiteral(DataType dataType) {
Expand All @@ -80,39 +77,32 @@ public static LookupCallExpression lookupCall(String name, Expression... args) {
}

public static Expression toMonthInterval(Expression e, int multiplier) {
// check for constant
return ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
.map((v) -> (Expression) valueLiteral(v * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS))
.orElseThrow(() -> new ValidationException("Only constant intervals are supported: " + e));
return ExpressionUtils.extractValue(e, Integer.class)
.map((v) -> intervalOfMonths(v * multiplier))
.orElseThrow(() -> new ValidationException("Invalid constant for year-month interval: " + e));
}

public static ValueLiteralExpression intervalOfMillis(long millis) {
return valueLiteral(
millis,
DataTypes.INTERVAL(DataTypes.SECOND(3)).notNull().bridgedTo(Long.class));
}

public static Expression toMilliInterval(Expression e, long multiplier) {
final Optional<Expression> intInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
.map((v) -> valueLiteral(v * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS));

final Optional<Expression> longInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO)
.map((v) -> valueLiteral(v * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS));

if (intInterval.isPresent()) {
return intInterval.get();
} else if (longInterval.isPresent()) {
return longInterval.get();
}
throw new ValidationException("Only constant intervals are supported:" + e);
return ExpressionUtils.extractValue(e, Long.class)
.map((v) -> intervalOfMillis(v * multiplier))
.orElseThrow(() -> new ValidationException("Invalid constant for day-time interval: " + e));
}

public static ValueLiteralExpression intervalOfMonths(int months) {
return valueLiteral(
months,
DataTypes.INTERVAL(DataTypes.MONTH()).notNull().bridgedTo(Integer.class));
}

public static Expression toRowInterval(Expression e) {
final Optional<Expression> intInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
.map((v) -> valueLiteral((long) v, BasicTypeInfo.LONG_TYPE_INFO));

final Optional<Expression> longInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO)
.map((v) -> valueLiteral(v, BasicTypeInfo.LONG_TYPE_INFO));

if (intInterval.isPresent()) {
return intInterval.get();
} else if (longInterval.isPresent()) {
return longInterval.get();
}
throw new ValidationException("Invalid value for row interval literal: " + e);
return ExpressionUtils.extractValue(e, Long.class)
.map(ApiExpressionUtils::valueLiteral)
.orElseThrow(() -> new ValidationException("Invalid constant for row interval: " + e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
Expand Down Expand Up @@ -224,7 +223,7 @@ private static class ExtractNameVisitor extends ApiExpressionDefaultVisitor<Opti
@Override
public Optional<String> visitCall(CallExpression call) {
if (call.getFunctionDefinition().equals(AS)) {
return extractValue(call.getChildren().get(1), Types.STRING);
return extractValue(call.getChildren().get(1), String.class);
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public Optional<FieldInfo> visitCall(CallExpression call) {
if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
List<Expression> children = call.getChildren();
Expression origExpr = children.get(0);
String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
String newName = ExpressionUtils.extractValue(children.get(1), String.class)
.orElseThrow(() ->
new TableException("Alias expects string literal as new name. Got: " + children.get(1)));

Expand Down Expand Up @@ -444,7 +444,7 @@ public Optional<FieldInfo> visitCall(CallExpression call) {
if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
List<Expression> children = call.getChildren();
Expression origExpr = children.get(0);
String newName = ExpressionUtils.extractValue(children.get(1), Types.STRING)
String newName = ExpressionUtils.extractValue(children.get(1), String.class)
.orElseThrow(() ->
new TableException("Alias expects string literal as new name. Got: " + children.get(1)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;

import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
import static org.junit.Assert.assertEquals;

/**
Expand Down Expand Up @@ -72,9 +71,7 @@ public void testWindowAggregationSummaryString() {
Collections.singletonList(new CallExpression(BuiltInFunctionDefinitions.SUM,
Collections.singletonList(field))),
Collections.emptyList(),
WindowAggregateTableOperation.ResolvedGroupWindow.sessionWindow("w", field, ApiExpressionUtils.valueLiteral(
10,
TimeIntervalTypeInfo.INTERVAL_MILLIS)),
WindowAggregateTableOperation.ResolvedGroupWindow.sessionWindow("w", field, intervalOfMillis(10)),
new CatalogTableOperation(
Arrays.asList("cat1", "db1", "tab1"),
schema),
Expand All @@ -86,7 +83,7 @@ public void testWindowAggregationSummaryString() {
assertEquals(
"Distinct:\n" +
" WindowAggregate: (group: [a], agg: [sum(a)], windowProperties: []," +
" window: [SessionWindow(field: [a], gap: [10.millis])])\n" +
" window: [SessionWindow(field: [a], gap: [10])])\n" +
" CatalogTable: (path: [cat1, db1, tab1], fields: [a])",
distinctTableOperation.asSummaryString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.table.expressions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Optional;

Expand All @@ -30,20 +29,18 @@
public final class ExpressionUtils {

/**
* Extracts value of given type from expression assuming it is a {@link ValueLiteralExpression}.
* Extracts the value (excluding null) of a given class from an expression assuming it is a
* {@link ValueLiteralExpression}.
*
* @param expr literal to extract the value from
* @param type expected type to extract from the literal
* @param targetClass expected class to extract from the literal
* @param <V> type of extracted value
* @return extracted value or empty if could not extract value of given type
*/
@SuppressWarnings("unchecked")
public static <V> Optional<V> extractValue(Expression expr, TypeInformation<V> type) {
public static <V> Optional<V> extractValue(Expression expr, Class<V> targetClass) {
if (expr instanceof ValueLiteralExpression) {
final ValueLiteralExpression valueLiteral = (ValueLiteralExpression) expr;
if (valueLiteral.getType().equals(type)) {
return Optional.of((V) valueLiteral.getValue());
}
return valueLiteral.getValueAs(targetClass);
}
return Optional.empty();
}
Expand Down
Loading

0 comments on commit 758780f

Please sign in to comment.