Skip to content

Commit

Permalink
[FLINK-19079][table-runtime] Introduce row time deduplicate operator (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang authored Nov 5, 2020
1 parent f504bf5 commit ad18d13
Show file tree
Hide file tree
Showing 32 changed files with 1,352 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
//TODO current deduplicate on row time is still a Rank,
// remove this after support deduplicate on row time
// for Rank node that can convert to Deduplicate, unique key is partition key
val canConvertToDeduplicate: Boolean = {
val rankRange = rel.rankRange
val isRowNumberType = rel.rankType == RankType.ROW_NUMBER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils, KeySelectorUtil}
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator
import org.apache.flink.table.runtime.operators.deduplicate.{DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepLastRowFunction}
import org.apache.flink.table.runtime.operators.deduplicate.{ProcTimeDeduplicateKeepLastRowFunction, ProcTimeMiniBatchDeduplicateKeepLastRowFunction}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo

import java.util
Expand Down Expand Up @@ -96,23 +96,23 @@ class StreamExecChangelogNormalize(
val operator = if (isMiniBatchEnabled) {
val exeConfig = planner.getExecEnv.getConfig
val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
val processFunction = new MiniBatchDeduplicateKeepLastRowFunction(
val processFunction = new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
rowTypeInfo,
generateUpdateBefore,
true, // generateInsert
false, // inputInsertOnly
rowSerializer,
// disable state ttl, the changelog normalize should keep all state to have data integrity
// we can enable state ttl if this is really needed in some cases
-1)
-1,
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
val trigger = AggregateUtil.createMiniBatchTrigger(tableConfig)
new KeyedMapBundleOperator(
processFunction,
trigger)
} else {
val processFunction = new DeduplicateKeepLastRowFunction(
-1, // disable state ttl
val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
rowTypeInfo,
-1, // disable state ttl
generateUpdateBefore,
true, // generateInsert
false) // inputInsertOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.flink.annotation.Experimental
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.ConfigOptions.key
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.operators.{KeyedProcessOperator, OneInputStreamOperator}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDeduplicate.TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ChangelogPlanUtils, KeySelectorUtil}
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, KeySelectorUtil}
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator
import org.apache.flink.table.runtime.operators.deduplicate.{DeduplicateKeepFirstRowFunction, DeduplicateKeepLastRowFunction, MiniBatchDeduplicateKeepFirstRowFunction, MiniBatchDeduplicateKeepLastRowFunction}
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger
import org.apache.flink.table.runtime.operators.deduplicate._
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.util.Preconditions

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
Expand All @@ -54,6 +60,7 @@ class StreamExecDeduplicate(
traitSet: RelTraitSet,
inputRel: RelNode,
uniqueKeys: Array[Int],
val isRowtime: Boolean,
val keepLastRow: Boolean)
extends SingleRel(cluster, traitSet, inputRel)
with StreamPhysicalRel
Expand All @@ -71,16 +78,18 @@ class StreamExecDeduplicate(
traitSet,
inputs.get(0),
uniqueKeys,
isRowtime,
keepLastRow)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val fieldNames = getRowType.getFieldNames
val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
val keep = if (keepLastRow) "LastRow" else "FirstRow"
super.explainTerms(pw)
.item("keep", keep)
.item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
.item("order", "PROCTIME")
.item("order", orderString)
}

//~ ExecNode methods -----------------------------------------------------------
Expand All @@ -100,48 +109,18 @@ class StreamExecDeduplicate(

val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]

val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
val rowSerializer = rowTypeInfo.createSerializer(planner.getExecEnv.getConfig)
val tableConfig = planner.getTableConfig
val generateInsert = tableConfig.getConfiguration
.getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE)
val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
val operator = if (isMiniBatchEnabled) {
val exeConfig = planner.getExecEnv.getConfig
val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
val processFunction = if (keepLastRow) {
new MiniBatchDeduplicateKeepLastRowFunction(
rowTypeInfo,
generateUpdateBefore,
generateInsert,
true,
rowSerializer,
minRetentionTime)
} else {
new MiniBatchDeduplicateKeepFirstRowFunction(
rowSerializer,
minRetentionTime)
}
val trigger = AggregateUtil.createMiniBatchTrigger(tableConfig)
new KeyedMapBundleOperator(
processFunction,
trigger)

val operator = if (isRowtime) {
new RowtimeDeduplicateOperatorTranslator(rowTypeInfo, rowSerializer, tableConfig, this)
.createDeduplicateOperator()
} else {
val processFunction = if (keepLastRow) {
new DeduplicateKeepLastRowFunction(
minRetentionTime,
rowTypeInfo,
generateUpdateBefore,
generateInsert,
true)
} else {
new DeduplicateKeepFirstRowFunction(minRetentionTime)
}
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
new ProcTimeDeduplicateOperatorTranslator(rowTypeInfo, rowSerializer, tableConfig, this)
.createDeduplicateOperator()
}

val ret = new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
Expand All @@ -161,6 +140,129 @@ class StreamExecDeduplicate(
}
}

/**
* Base translator to create deduplicate operator.
*/
abstract class DeduplicateOperatorTranslator(
rowTypeInfo: InternalTypeInfo[RowData],
serializer: TypeSerializer[RowData],
tableConfig: TableConfig,
deduplicate: StreamExecDeduplicate) {

protected val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(deduplicate)
protected val generateInsert = tableConfig.getConfiguration
.getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE)
protected val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
protected val minRetentionTime = tableConfig.getMinIdleStateRetentionTime

protected val miniBatchSize = if (isMiniBatchEnabled) {
val size = tableConfig.getConfiguration.getLong(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
Preconditions.checkArgument(size > 0)
size
} else {
-1L
}

protected val keepLastRow = deduplicate.keepLastRow

def createDeduplicateOperator(): OneInputStreamOperator[RowData, RowData]

}

/**
* Translator to create process time deduplicate operator.
*/
class RowtimeDeduplicateOperatorTranslator(
rowTypeInfo: InternalTypeInfo[RowData],
serializer: TypeSerializer[RowData],
tableConfig: TableConfig,
deduplicate: StreamExecDeduplicate)
extends DeduplicateOperatorTranslator(
rowTypeInfo,
serializer,
tableConfig,
deduplicate) {

override def createDeduplicateOperator(): OneInputStreamOperator[RowData, RowData] = {
val rowtimeField = deduplicate.getInput.getRowType.getFieldList
.filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
Preconditions.checkArgument(rowtimeField.nonEmpty)
val rowtimeIndex = rowtimeField.get(0).getIndex
if (isMiniBatchEnabled) {
val trigger = new CountBundleTrigger[RowData](miniBatchSize)
val processFunction = new RowTimeMiniBatchDeduplicateFunction(
rowTypeInfo,
serializer,
minRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert,
keepLastRow)
new KeyedMapBundleOperator(processFunction, trigger)
} else {
val processFunction = new RowTimeDeduplicateFunction(
rowTypeInfo,
minRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert,
keepLastRow)
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
}
}
}

/**
* Translator to create process time deduplicate operator.
*/
class ProcTimeDeduplicateOperatorTranslator(
rowTypeInfo: InternalTypeInfo[RowData],
serializer: TypeSerializer[RowData],
tableConfig: TableConfig,
deduplicate: StreamExecDeduplicate)
extends DeduplicateOperatorTranslator(
rowTypeInfo,
serializer,
tableConfig,
deduplicate) {

override def createDeduplicateOperator(): OneInputStreamOperator[RowData, RowData] = {
if (isMiniBatchEnabled) {
val trigger = new CountBundleTrigger[RowData](miniBatchSize)
if (keepLastRow) {
val processFunction = new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
rowTypeInfo,
serializer,
minRetentionTime,
generateUpdateBefore,
generateInsert,
true)
new KeyedMapBundleOperator(processFunction, trigger)
} else {
val processFunction = new ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(
serializer,
minRetentionTime)
new KeyedMapBundleOperator(processFunction, trigger)
}
} else {
if (keepLastRow) {
val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
rowTypeInfo,
minRetentionTime,
generateUpdateBefore,
generateInsert,
true)
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
} else {
val processFunction = new ProcTimeDeduplicateKeepFirstRowFunction(minRetentionTime)
new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
}
}
}
}

object StreamExecDeduplicate {

@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case deduplicate: StreamExecDeduplicate =>
// deduplicate only support insert only as input
val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = if (deduplicate.keepLastRow) {
// produce updates if it keeps last row
ModifyKindSetTrait.ALL_CHANGES
} else {
val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {
// only proctime first row deduplicate does not produce UPDATE changes
ModifyKindSetTrait.INSERT_ONLY
} else {
// other deduplicate produce update changes
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.{RelCollation, RelNode}

/**
* Rule that matches [[FlinkLogicalRank]] which is sorted by proc-time attribute and
* Rule that matches [[FlinkLogicalRank]] which is sorted by time attribute and
* limits 1 and its rank type is ROW_NUMBER, and converts it to [[StreamExecDeduplicate]].
*
* NOTES: Queries that can be converted to [[StreamExecDeduplicate]] could be converted to
Expand All @@ -45,14 +45,15 @@ import org.apache.calcite.rel.{RelCollation, RelNode}
* ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
* FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* }}} will be converted to StreamExecDeduplicate which keeps first row in proctime.
*
* 2. {{{
* SELECT a, b, c FROM (
* SELECT a, b, c, proctime,
* ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
* SELECT a, b, c, rowtime,
* ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as row_num
* FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.
* }}} will be converted to StreamExecDeduplicate which keeps last row in rowtime.
*/
class StreamExecDeduplicateRule
extends ConverterRule(
Expand Down Expand Up @@ -81,12 +82,18 @@ class StreamExecDeduplicateRule
// order by timeIndicator desc ==> lastRow, otherwise is firstRow
val fieldCollation = rank.orderKey.getFieldCollations.get(0)
val isLastRow = fieldCollation.direction.isDescending

val fieldType = rank.getInput().getRowType.getFieldList
.get(fieldCollation.getFieldIndex).getType
val isRowtime = FlinkTypeFactory.isRowtimeIndicatorType(fieldType)

val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
new StreamExecDeduplicate(
rel.getCluster,
providedTraitSet,
convInput,
rank.partitionKey.toArray,
isRowtime,
isLastRow)
}
}
Expand All @@ -98,7 +105,7 @@ object StreamExecDeduplicateRule {
/**
* Whether the given rank could be converted to [[StreamExecDeduplicate]].
*
* Returns true if the given rank is sorted by proc-time attribute and limits 1
* Returns true if the given rank is sorted by time attribute and limits 1
* and its RankFunction is ROW_NUMBER, else false.
*
* @param rank The [[FlinkLogicalRank]] node
Expand All @@ -117,20 +124,21 @@ object StreamExecDeduplicateRule {
}

val inputRowType = rank.getInput.getRowType
val isSortOnProctime = sortOnProcTimeAttribute(sortCollation, inputRowType)
val isSortOnProctime = sortOnTimeAttribute(sortCollation, inputRowType)

!rank.outputRankNumber && isLimit1 && isSortOnProctime && isRowNumberType
}

private def sortOnProcTimeAttribute(
private def sortOnTimeAttribute(
sortCollation: RelCollation,
inputRowType: RelDataType): Boolean = {
if (sortCollation.getFieldCollations.size() != 1) {
false
} else {
val firstSortField = sortCollation.getFieldCollations.get(0)
val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
FlinkTypeFactory.isProctimeIndicatorType(fieldType)
FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
}
}
}
Loading

0 comments on commit ad18d13

Please sign in to comment.