From 392647d09cc43ea94a53aa56d8c46bfe7d16d53e Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 18 Dec 2018 15:37:42 +0100 Subject: [PATCH] [FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE Added a validation that checks if no ambiguous columns are defined in MATCH_RECOGNIZE clause. Without the check there is a cryptic message thrown from code generation stack. This closes #7328. --- .../datastream/DataStreamMatchRule.scala | 53 +++++++++++++++++-- .../match/MatchRecognizeValidationTest.scala | 24 +++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala index bc0f56e38bac1..046a49aefdfe6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala @@ -18,12 +18,15 @@ package org.apache.flink.table.plan.rules.datastream +import java.util.{List => JList} + import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} import org.apache.calcite.sql.SqlAggFunction -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.plan.logical.MatchRecognize import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch @@ -33,6 +36,7 @@ import org.apache.flink.table.plan.util.RexDefaultVisitor import org.apache.flink.table.util.MatchUtil import scala.collection.JavaConverters._ +import scala.collection.mutable class DataStreamMatchRule extends ConverterRule( @@ -46,6 +50,8 @@ class DataStreamMatchRule validateAggregations(logicalMatch.getMeasures.values().asScala) validateAggregations(logicalMatch.getPatternDefinitions.values().asScala) + // This check might be obsolete once CALCITE-2747 is resolved + validateAmbiguousColumns(logicalMatch) true } @@ -91,7 +97,48 @@ class DataStreamMatchRule expr.foreach(_.accept(validator)) } - class AggregationsValidator extends RexDefaultVisitor[Object] { + private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = { + if (logicalMatch.isAllRows) { + throw new TableException("All rows per match mode is not supported yet.") + } else { + val refNameFinder = new RefNameFinder(logicalMatch.getInput.getRowType) + validateAmbiguousColumnsOnRowPerMatch( + logicalMatch.getPartitionKeys, + logicalMatch.getMeasures.keySet().asScala, + logicalMatch.getRowType, + refNameFinder) + } + } + + private def validateAmbiguousColumnsOnRowPerMatch( + partitionKeys: JList[RexNode], + measuresNames: mutable.Set[String], + expectedSchema: RelDataType, + refNameFinder: RefNameFinder) + : Unit = { + val actualSize = partitionKeys.size() + measuresNames.size + val expectedSize = expectedSchema.getFieldCount + if (actualSize != expectedSize) { + //try to find ambiguous column + + val ambiguousColumns = partitionKeys.asScala.map(_.accept(refNameFinder)) + .filter(measuresNames.contains).mkString("{", ", ", "}") + + throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns") + } + } + + private class RefNameFinder(inputSchema: RelDataType) extends RexDefaultVisitor[String] { + + override def visitInputRef(inputRef: RexInputRef): String = { + inputSchema.getFieldList.get(inputRef.getIndex).getName + } + + override def visitNode(rexNode: RexNode): String = + throw new TableException(s"PARTITION BY clause accepts only input reference. Found $rexNode") + } + + private class AggregationsValidator extends RexDefaultVisitor[Object] { override def visitCall(call: RexCall): AnyRef = { call.getOperator match { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchRecognizeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchRecognizeValidationTest.scala index b77a60e1af519..4e6562da281ff 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchRecognizeValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchRecognizeValidationTest.scala @@ -174,6 +174,30 @@ class MatchRecognizeValidationTest extends TableTestBase { streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] } + @Test + def testValidatingAmbiguousColumns(): Unit = { + thrown.expectMessage("Columns ambiguously defined: {symbol, price}") + thrown.expect(classOf[ValidationException]) + + val sqlQuery = + s""" + |SELECT * + |FROM Ticker + |MATCH_RECOGNIZE ( + | PARTITION BY symbol, price + | ORDER BY proctime + | MEASURES + | A.symbol AS symbol, + | A.price AS price + | PATTERN (A) + | DEFINE + | A AS symbol = 'a' + |) AS T + |""".stripMargin + + streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] + } + // *************************************************************************************** // * Those validations are temporary. We should remove those tests once we support those * // * features. *