diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 2ad4347b20c8d..7a8795566b01b 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -341,8 +341,10 @@ string: - sql: INSTR(string1, string2) description: Returns the position of the first occurrence of string2 in string1. Returns NULL if any of arguments is NULL. - sql: LEFT(string, integer) + table: STRING.LEFT(INT) description: Returns the leftmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL. - sql: RIGHT(string, integer) + table: STRING.RIGHT(INT) description: Returns the rightmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL. - sql: LOCATE(string1, string2[, integer]) description: Returns the position of the first occurrence of string1 in string2 after position integer. Returns 0 if not found. Returns NULL if any of arguments is NULL. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index ad9940b3486af..f9056bf62c35c 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -434,10 +434,12 @@ string: - sql: INSTR(string1, string2) description: 返回 string2 在 string1 中第一次出现的位置。如果有任一参数为 `NULL`,则返回 `NULL`。 - sql: LEFT(string, integer) + table: STRING.LEFT(INT) description: | 返回字符串中最左边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数 为 `NULL` 则返回 `NULL`。 - sql: RIGHT(string, integer) + table: STRING.RIGHT(INT) description: | 返回字符串中最右边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数 为 `NULL` 则返回 `NULL`。 diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 1f7e64f9576ad..b22dd0f02b0f1 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1212,6 +1212,18 @@ def encode(self, charset: Union[str, 'Expression[str]']) -> 'Expression[bytes]': """ return _binary_op("encode")(self, charset) + def left(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]': + """ + Returns the leftmost integer characters from the input string. + """ + return _binary_op("left")(self, length) + + def right(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]': + """ + Returns the rightmost integer characters from the input string. + """ + return _binary_op("right")(self, length) + @property def ltrim(self) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index dd494cb1cdf8c..e17f07ba9634e 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -147,6 +147,8 @@ def test_expression(self): self.assertEqual('chr(a)', str(expr1.chr)) self.assertEqual("decode(a, 'utf-8')", str(expr1.decode('utf-8'))) self.assertEqual("encode(a, 'utf-8')", str(expr1.encode('utf-8'))) + self.assertEqual('left(a, 2)', str(expr1.left(2))) + self.assertEqual('right(a, 2)', str(expr1.right(2))) self.assertEqual('ltrim(a)', str(expr1.ltrim)) self.assertEqual('rtrim(a)', str(expr1.rtrim)) self.assertEqual('repeat(a, 3)', str(expr1.repeat(3))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 908a2ab3957a3..e68fd84c3f36f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -103,6 +103,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_VALUE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LAST_VALUE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LEFT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LIKE; @@ -135,6 +136,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RIGHT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROUND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROWTIME; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RPAD; @@ -1048,6 +1050,16 @@ public OutType encode(InType charset) { unresolvedCall(ENCODE, toExpr(), objectToExpression(charset))); } + /** Returns the leftmost integer characters from the input string. */ + public OutType left(InType len) { + return toApiSpecificExpression(unresolvedCall(LEFT, toExpr(), objectToExpression(len))); + } + + /** Returns the rightmost integer characters from the input string. */ + public OutType right(InType len) { + return toApiSpecificExpression(unresolvedCall(RIGHT, toExpr(), objectToExpression(len))); + } + /** Returns a string that removes the left whitespaces from the given string. */ public OutType ltrim() { return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr())); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index bb00d2e45cbd1..645d8e5372ba5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -760,6 +760,28 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BYTES()))) .build(); + public static final BuiltInFunctionDefinition LEFT = + BuiltInFunctionDefinition.newBuilder() + .name("left") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.INTEGER_NUMERIC))) + .outputTypeStrategy(nullableIfArgs(varyingString(argument(0)))) + .build(); + + public static final BuiltInFunctionDefinition RIGHT = + BuiltInFunctionDefinition.newBuilder() + .name("right") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.INTEGER_NUMERIC))) + .outputTypeStrategy(nullableIfArgs(varyingString(argument(0)))) + .build(); + public static final BuiltInFunctionDefinition UUID = BuiltInFunctionDefinition.newBuilder() .name("uuid") diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index 010a0d062c22a..5410191f5cdfa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -116,6 +116,8 @@ public class DirectConvertRule implements CallExpressionConvertRule { BuiltInFunctionDefinitions.DECODE, FlinkSqlOperatorTable.DECODE); DEFINITION_OPERATOR_MAP.put( BuiltInFunctionDefinitions.ENCODE, FlinkSqlOperatorTable.ENCODE); + DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LEFT, FlinkSqlOperatorTable.LEFT); + DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RIGHT, FlinkSqlOperatorTable.RIGHT); DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.UUID, FlinkSqlOperatorTable.UUID); DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LTRIM, FlinkSqlOperatorTable.LTRIM); DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RTRIM, FlinkSqlOperatorTable.RTRIM); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index 9136e9162f7ff..5e1995265faaf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -195,6 +195,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { ) } + @Test + def testLeftAndRight(): Unit = { + val str = "Hello" + testAllApis(str.left(3), s"LEFT('$str', 3)", "Hel") + testAllApis(str.right(3), s"RIGHT('$str', 3)", "llo") + } + @Test def testInstr(): Unit = { testSqlApi("instr('Corporate Floor', 'or', 3, 2)", "14")