Skip to content

Commit

Permalink
[FLINK-25845][table] Add EXECUTE PLAN
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored and twalthr committed Feb 14, 2022
1 parent faed56f commit 38f0592
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dml.SqlBeginStatementSet"
"org.apache.flink.sql.parser.dml.SqlEndStatementSet"
"org.apache.flink.sql.parser.dml.SqlExecute"
"org.apache.flink.sql.parser.dml.SqlExecutePlan"
"org.apache.flink.sql.parser.dml.SqlStatementSet"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dml.SqlExecute"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
Expand Down Expand Up @@ -522,6 +523,7 @@
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
"SqlExecutePlan()"
"SqlExecute()"
"SqlAddJar()"
"SqlRemoveJar()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,24 @@ SqlNode SqlExecute() :
}
}

/**
* Parses a execute plan statement.
*/
SqlNode SqlExecutePlan() :
{
SqlCharStringLiteral filePath;
}
{
<EXECUTE> <PLAN> <QUOTED_STRING>
{
String path = SqlParserUtil.parseString(token.image);
filePath = SqlLiteral.createCharString(path, getPos());
}
{
return new SqlExecutePlan(getPos(), filePath);
}
}

void ParseExplainDetail(Set<String> explainDetails):
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dml;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.NlsString;

import javax.annotation.Nonnull;

import java.util.Collections;
import java.util.List;

/** AST node for {@code EXECUTE PLAN 'planfile'}. */
public class SqlExecutePlan extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("ExecutePlan", SqlKind.OTHER);

private final SqlCharStringLiteral planFile;

public SqlExecutePlan(SqlParserPos pos, SqlCharStringLiteral planFile) {
super(pos);
this.planFile = planFile;
}

public String getPlanFile() {
return planFile.getValueAs(NlsString.class).getValue();
}

@Nonnull
@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Nonnull
@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("EXECUTE");
writer.keyword("PLAN");
planFile.unparse(writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,13 @@ public void testExecuteInsert() {
this.sql("execute insert into emps1 select * from emps2").ok(expected);
}

@Test
public void testExecutePlan() {
sql("execute plan './test.json'").ok("EXECUTE PLAN './test.json'");
sql("execute plan '/some/absolute/dir/plan.json'")
.ok("EXECUTE PLAN '/some/absolute/dir/plan.json'");
}

@Test
public void testExplainUpsert() {
String sql = "explain plan for upsert into emps1 values (1, 2)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
Expand Down Expand Up @@ -773,6 +774,14 @@ public CompiledPlan compilePlanSql(String stmt) {
return planner.compilePlan(Collections.singletonList((ModifyOperation) operations.get(0)));
}

private TableResultInternal executePlan(String filePath) {
try {
return (TableResultInternal) executePlan(PlanReference.fromFile(filePath));
} catch (IOException e) {
throw new TableException(String.format("Cannot load plan '%s'", filePath), e);
}
}

@Override
public TableResult executePlan(CompiledPlan plan) {
CompiledPlanInternal planInternal = (CompiledPlanInternal) plan;
Expand Down Expand Up @@ -1350,6 +1359,8 @@ public TableResultInternal executeInternal(Operation operation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
executeInternal(createTableASOperation.getCreateTableOperation());
return executeInternal(createTableASOperation.toSinkModifyOperation(catalogManager));
} else if (operation instanceof ExecutePlanOperation) {
return executePlan(((ExecutePlanOperation) operation).getFilePath());
} else if (operation instanceof NopOperation) {
return TableResultImpl.TABLE_RESULT_OK;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.table.operations.Operation;

/** Operation to describe an EXECUTE PLAN statement. */
@Experimental
public class ExecutePlanOperation implements Operation {

private final String filePath;

public ExecutePlanOperation(String filePath) {
this.filePath = filePath;
}

public String getFilePath() {
return filePath;
}

@Override
public String asSummaryString() {
return String.format("EXECUTE PLAN '%s'", filePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.flink.sql.parser.dml.SqlBeginStatementSet;
import org.apache.flink.sql.parser.dml.SqlEndStatementSet;
import org.apache.flink.sql.parser.dml.SqlExecute;
import org.apache.flink.sql.parser.dml.SqlExecutePlan;
import org.apache.flink.sql.parser.dml.SqlStatementSet;
import org.apache.flink.sql.parser.dql.SqlLoadModule;
import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
Expand Down Expand Up @@ -131,6 +132,7 @@
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
Expand Down Expand Up @@ -328,6 +330,8 @@ private static Optional<Operation> convertValidatedSqlNode(
} else if (validated instanceof SqlExecute) {
return convertValidatedSqlNode(
flinkPlanner, catalogManager, ((SqlExecute) validated).getStatement());
} else if (validated instanceof SqlExecutePlan) {
return Optional.of(converter.convertExecutePlan((SqlExecutePlan) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
Expand Down Expand Up @@ -1146,6 +1150,10 @@ private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
}

private Operation convertExecutePlan(SqlExecutePlan sqlExecutePlan) {
return new ExecutePlanOperation(sqlExecutePlan.getPlanFile());
}

private void validateTableConstraint(SqlTableConstraint constraint) {
if (constraint.isUnique()) {
throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlEndStatementSet, SqlExecute, SqlStatementSet}
import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlEndStatementSet, SqlExecute, SqlExecutePlan, SqlStatementSet}
import org.apache.flink.sql.parser.dql._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader

import com.google.common.collect.ImmutableList
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan._
Expand All @@ -40,9 +41,11 @@ import org.apache.flink.sql.parser.ddl.{SqlReset, SqlSet, SqlUseModules}
import org.apache.flink.table.planner.parse.CalciteParser

import javax.annotation.Nullable

import java.lang.{Boolean => JBoolean}
import java.util
import java.util.function.{Function => JFunction}

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -140,7 +143,8 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlBeginStatementSet]
|| sqlNode.isInstanceOf[SqlEndStatementSet]
|| sqlNode.isInstanceOf[SqlSet]
|| sqlNode.isInstanceOf[SqlReset]) {
|| sqlNode.isInstanceOf[SqlReset]
|| sqlNode.isInstanceOf[SqlExecutePlan]) {
return sqlNode
}
sqlNode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;

import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -93,6 +97,23 @@ public void testExecutePlan() throws Exception {
assertResult(data, sinkPath);
}

@Test
public void testExecutePlanSql() throws Exception {
Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
FileUtils.createParentDirectories(planPath.toFile());

List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("src", data, "a bigint", "b int", "c varchar");
File sinkPath = createTestCsvSinkTable("sink", "a bigint", "b int", "c varchar");

CompiledPlan plan = tableEnv.compilePlanSql("insert into sink select * from src");
plan.writeToFile(planPath);

tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath.toAbsolutePath())).await();

assertResult(data, sinkPath);
}

@Test
public void testExplainPlan() throws IOException {
String actual =
Expand Down

0 comments on commit 38f0592

Please sign in to comment.