Skip to content

Commit

Permalink
[FLINK-28288][table][python] Support decode and encode in Table API (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
a49a authored Jul 1, 2022
1 parent ea0368e commit 2f4f462
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ string:
table: INT.chr()
description: Returns the ASCII character having the binary equivalent to integer. If integer is larger than 255, we will get the modulus of integer divided by 255 first, and returns CHR of the modulus. Returns NULL if integer is NULL. E.g., chr(97) returns a, chr(353) returns a, and ascii(CAST(NULL AS VARCHAR)) returns NULL.
- sql: DECODE(binary, string)
table: BINARY.decode(STRING)
description: Decodes the first argument into a String using the provided character set (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). If either argument is null, the result will also be null.
- sql: ENCODE(string1, string2)
table: STRING1.encode(STRING2)
description: Encodes the string1 into a BINARY using the provided string2 character set (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). If either argument is null, the result will also be null.
- sql: INSTR(string1, string2)
description: Returns the position of the first occurrence of string2 in string1. Returns NULL if any of arguments is NULL.
Expand Down
2 changes: 2 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,12 @@ string:
并返回模数的 CHR。如果整数为 `NULL`,则返回 `NULL`。例如 `chr(97)` 返回 `a`,`chr(353)` 返回 `a`,
`ascii(CAST(NULL AS VARCHAR))` 返回 `NULL`。
- sql: DECODE(binary, string)
table: BINARY.decode(STRING)
description: |
使用提供的字符集('US-ASCII','ISO-8859-1','UTF-8','UTF-16BE','UTF-16LE','UTF-16')解码。
如果任一参数为空,则结果也将为空。
- sql: ENCODE(string1, string2)
table: STRING1.encode(STRING2)
description: |
使用提供的字符集('US-ASCII','ISO-8859-1','UTF-8','UTF-16BE','UTF-16LE','UTF-16')编码。
如果任一参数为空,则结果也将为空。
Expand Down
12 changes: 12 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,18 @@ def chr(self) -> 'Expression[str]':
"""
return _unary_op("chr")(self)

def decode(self, charset: Union[str, 'Expression[str]']) -> 'Expression[str]':
"""
Decodes the first argument into a String using the provided character set.
"""
return _binary_op("decode")(self, charset)

def encode(self, charset: Union[str, 'Expression[str]']) -> 'Expression[bytes]':
"""
Encodes the string into a BINARY using the provided character set.
"""
return _binary_op("encode")(self, charset)

@property
def ltrim(self) -> 'Expression[str]':
"""
Expand Down
2 changes: 2 additions & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def test_expression(self):
self.assertEqual('toBase64(a)', str(expr1.to_base64))
self.assertEqual('ascii(a)', str(expr1.ascii))
self.assertEqual('chr(a)', str(expr1.chr))
self.assertEqual("decode(a, 'utf-8')", str(expr1.decode('utf-8')))
self.assertEqual("encode(a, 'utf-8')", str(expr1.encode('utf-8')))
self.assertEqual('ltrim(a)', str(expr1.ltrim))
self.assertEqual('rtrim(a)', str(expr1.rtrim))
self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.COSH;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.COT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.COUNT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DECODE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DEGREES;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DISTINCT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DIVIDE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ENCODE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXTRACT;
Expand Down Expand Up @@ -1034,6 +1036,18 @@ public OutType chr() {
return toApiSpecificExpression(unresolvedCall(CHR, toExpr()));
}

/** Decodes the first argument into a String using the provided character set. */
public OutType decode(InType charset) {
return toApiSpecificExpression(
unresolvedCall(DECODE, toExpr(), objectToExpression(charset)));
}

/** Encodes the string into a BINARY using the provided character set. */
public OutType encode(InType charset) {
return toApiSpecificExpression(
unresolvedCall(ENCODE, toExpr(), objectToExpression(charset)));
}

/** Returns a string that removes the left whitespaces from the given string. */
public OutType ltrim() {
return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,28 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.CHAR(1))))
.build();

public static final BuiltInFunctionDefinition DECODE =
BuiltInFunctionDefinition.newBuilder()
.name("decode")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.BINARY_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING)))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition ENCODE =
BuiltInFunctionDefinition.newBuilder()
.name("encode")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING)))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BYTES())))
.build();

public static final BuiltInFunctionDefinition UUID =
BuiltInFunctionDefinition.newBuilder()
.name("uuid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public class DirectConvertRule implements CallExpressionConvertRule {
BuiltInFunctionDefinitions.TO_BASE64, FlinkSqlOperatorTable.TO_BASE64);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ASCII, FlinkSqlOperatorTable.ASCII);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CHR, FlinkSqlOperatorTable.CHR);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.DECODE, FlinkSqlOperatorTable.DECODE);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.ENCODE, FlinkSqlOperatorTable.ENCODE);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.UUID, FlinkSqlOperatorTable.UUID);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LTRIM, FlinkSqlOperatorTable.LTRIM);
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RTRIM, FlinkSqlOperatorTable.RTRIM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2532,7 +2532,10 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {

@Test
def testEncodeAndDecode(): Unit = {
testSqlApi("decode(encode('aabbef', 'UTF-16LE'), 'UTF-16LE')", "aabbef")
testAllApis(
"aabbef".encode("UTF-16LE").decode("UTF-16LE"),
"decode(encode('aabbef', 'UTF-16LE'), 'UTF-16LE')",
"aabbef")

testSqlApi("decode(encode('aabbef', 'utf-8'), 'utf-8')", "aabbef")

Expand Down

0 comments on commit 2f4f462

Please sign in to comment.