Skip to content

Commit

Permalink
[FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE
Browse files Browse the repository at this point in the history
Added a validation that checks if no ambiguous columns are defined
in MATCH_RECOGNIZE clause. Without the check there is a cryptic
message thrown from code generation stack.

This closes apache#7328.
  • Loading branch information
dawidwys authored and twalthr committed Jan 8, 2019
1 parent 90f3b42 commit 392647d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.table.plan.rules.datastream

import java.util.{List => JList}

import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SqlAggFunction
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.plan.logical.MatchRecognize
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
Expand All @@ -33,6 +36,7 @@ import org.apache.flink.table.plan.util.RexDefaultVisitor
import org.apache.flink.table.util.MatchUtil

import scala.collection.JavaConverters._
import scala.collection.mutable

class DataStreamMatchRule
extends ConverterRule(
Expand All @@ -46,6 +50,8 @@ class DataStreamMatchRule

validateAggregations(logicalMatch.getMeasures.values().asScala)
validateAggregations(logicalMatch.getPatternDefinitions.values().asScala)
// This check might be obsolete once CALCITE-2747 is resolved
validateAmbiguousColumns(logicalMatch)
true
}

Expand Down Expand Up @@ -91,7 +97,48 @@ class DataStreamMatchRule
expr.foreach(_.accept(validator))
}

class AggregationsValidator extends RexDefaultVisitor[Object] {
private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = {
if (logicalMatch.isAllRows) {
throw new TableException("All rows per match mode is not supported yet.")
} else {
val refNameFinder = new RefNameFinder(logicalMatch.getInput.getRowType)
validateAmbiguousColumnsOnRowPerMatch(
logicalMatch.getPartitionKeys,
logicalMatch.getMeasures.keySet().asScala,
logicalMatch.getRowType,
refNameFinder)
}
}

private def validateAmbiguousColumnsOnRowPerMatch(
partitionKeys: JList[RexNode],
measuresNames: mutable.Set[String],
expectedSchema: RelDataType,
refNameFinder: RefNameFinder)
: Unit = {
val actualSize = partitionKeys.size() + measuresNames.size
val expectedSize = expectedSchema.getFieldCount
if (actualSize != expectedSize) {
//try to find ambiguous column

val ambiguousColumns = partitionKeys.asScala.map(_.accept(refNameFinder))
.filter(measuresNames.contains).mkString("{", ", ", "}")

throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns")
}
}

private class RefNameFinder(inputSchema: RelDataType) extends RexDefaultVisitor[String] {

override def visitInputRef(inputRef: RexInputRef): String = {
inputSchema.getFieldList.get(inputRef.getIndex).getName
}

override def visitNode(rexNode: RexNode): String =
throw new TableException(s"PARTITION BY clause accepts only input reference. Found $rexNode")
}

private class AggregationsValidator extends RexDefaultVisitor[Object] {

override def visitCall(call: RexCall): AnyRef = {
call.getOperator match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,30 @@ class MatchRecognizeValidationTest extends TableTestBase {
streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
}

@Test
def testValidatingAmbiguousColumns(): Unit = {
thrown.expectMessage("Columns ambiguously defined: {symbol, price}")
thrown.expect(classOf[ValidationException])

val sqlQuery =
s"""
|SELECT *
|FROM Ticker
|MATCH_RECOGNIZE (
| PARTITION BY symbol, price
| ORDER BY proctime
| MEASURES
| A.symbol AS symbol,
| A.price AS price
| PATTERN (A)
| DEFINE
| A AS symbol = 'a'
|) AS T
|""".stripMargin

streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
}

// ***************************************************************************************
// * Those validations are temporary. We should remove those tests once we support those *
// * features. *
Expand Down

0 comments on commit 392647d

Please sign in to comment.