Skip to content

Commit

Permalink
[FLINK-21989][table] Add a SupportsSourceWatermark ability interface
Browse files Browse the repository at this point in the history
This closes apache#15388.
  • Loading branch information
twalthr committed Mar 29, 2021
1 parent 912fc8d commit 71122fd
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 32 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ will be called with values for the given lookup keys during runtime.
strategy is a builder/factory for timestamp extraction and watermark generation. During the runtime, the
watermark generator is located inside the source and is able to generate per-partition watermarks.</td>
</tr>
<tr>
<td><a href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsSourceWatermark.java'>SupportsSourceWatermark</a></td>
<td>Enables to fully rely on the watermark strategy provided by the <code>ScanTableSource</code>
itself. Thus, a <code>CREATE TABLE</code> DDL is able to use <code>SOURCE_WATERMARK()</code> which
is a built-in marker function that will be detected by the planner and translated into a call
to this interface if available.</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ will be called with values for the given lookup keys during runtime.
strategy is a builder/factory for timestamp extraction and watermark generation. During the runtime, the
watermark generator is located inside the source and is able to generate per-partition watermarks.</td>
</tr>
<tr>
<td><a href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsSourceWatermark.java'>SupportsSourceWatermark</a></td>
<td>Enables to fully rely on the watermark strategy provided by the <code>ScanTableSource</code>
itself. Thus, a <code>CREATE TABLE</code> DDL is able to use <code>SOURCE_WATERMARK()</code> which
is a built-in marker function that will be detected by the planner and translated into a call
to this interface if available.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.types.RowKind;

Expand All @@ -49,6 +50,7 @@
*
* <ul>
* <li>{@link SupportsWatermarkPushDown}
* <li>{@link SupportsSourceWatermark}
* <li>{@link SupportsFilterPushDown}
* <li>{@link SupportsAggregatePushDown}
* <li>{@link SupportsProjectionPushDown}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.abilities;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.connector.source.ScanTableSource;

/**
* Enables to fully rely on the watermark strategy provided by the {@link ScanTableSource} itself.
*
* <p>The concept of watermarks defines when time operations based on an event time attribute will
* be triggered. A watermark tells operators that no elements with a timestamp older or equal to the
* watermark timestamp should arrive at the operator. Thus, watermarks are a trade-off between
* latency and completeness.
*
* <p>Given the following SQL:
*
* <pre>{@code
* CREATE TABLE t (i INT, ts TIMESTAMP(3), WATERMARK FOR ts AS SOURCE_WATERMARK()) // `ts` becomes a time attribute
* }</pre>
*
* <p>In the above example, the {@code SOURCE_WATERMARK()} is a built-in marker function that will
* be detected by the planner and translated into a call to this interface if available. If a source
* does not implement this interface, an exception will be thrown.
*
* <p>Compared to {@link SupportsWatermarkPushDown}, it is not possible to influence a source's
* watermark strategy using customs expressions if {@code SOURCE_WATERMARK()} is declared.
* Nevertheless, a source can implement both interfaces if necessary.
*
* @see SupportsWatermarkPushDown
*/
@PublicEvolving
public interface SupportsSourceWatermark {

/** Instructs the source to emit source-specific watermarks during runtime. */
void applySourceWatermark();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
*
* <p>This interface provides a {@link WatermarkStrategy} that needs to be applied to the runtime
* implementation. Most built-in Flink sources provide a way of setting the watermark generator.
*
* @see SupportsSourceWatermark
*/
@PublicEvolving
public interface SupportsWatermarkPushDown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class)
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
@JsonSubTypes.Type(value = SourceWatermarkSpec.class)
})
@Internal
public interface SourceAbilitySpec {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.abilities.source;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;

/**
* A sub-class of {@link SourceAbilitySpec} that can not only serialize/deserialize the boolean flag
* whether a source watermark should be used to/from JSON, but can also apply it to {@link
* SupportsSourceWatermark}.
*/
@JsonTypeName("SourceWatermark")
public class SourceWatermarkSpec extends SourceAbilitySpecBase {
public static final String FIELD_NAME_SOURCE_WATERMARK_ENABLED = "sourceWatermarkEnabled";

@JsonProperty(FIELD_NAME_SOURCE_WATERMARK_ENABLED)
private final boolean sourceWatermarkEnabled;

@JsonCreator
public SourceWatermarkSpec(
@JsonProperty(FIELD_NAME_SOURCE_WATERMARK_ENABLED) boolean sourceWatermarkEnabled,
@JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType) {
super(producedType);
this.sourceWatermarkEnabled = sourceWatermarkEnabled;
}

@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
if (tableSource instanceof SupportsSourceWatermark) {
if (sourceWatermarkEnabled) {
((SupportsSourceWatermark) tableSource).applySourceWatermark();
}
} else {
throw new TableException(
String.format(
"%s does not support SupportsSourceWatermark.",
tableSource.getClass().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceWatermarkSpec;
import org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
Expand All @@ -39,10 +46,14 @@
import org.apache.calcite.rex.RexNode;

import java.time.Duration;
import java.util.List;

import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapFunctionDefinition;

/**
* Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link
* FlinkLogicalWatermarkAssigner} into the {@link FlinkLogicalTableSourceScan}.
* Base rule for interface {@link SupportsWatermarkPushDown} and {@link SupportsSourceWatermark}. It
* offers a util to push the {@link FlinkLogicalWatermarkAssigner} into the {@link
* FlinkLogicalTableSourceScan}.
*/
public abstract class PushWatermarkIntoTableSourceScanRuleBase extends RelOptRule {

Expand Down Expand Up @@ -72,20 +83,13 @@ protected FlinkLogicalTableSourceScan getNewScan(
TableConfig tableConfig,
boolean useWatermarkAssignerRowType) {
String digest = String.format("watermark=[%s]", watermarkExpr);
Duration idleTimeout =
tableConfig
.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
final long idleTimeoutMillis;
if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
idleTimeoutMillis = idleTimeout.toMillis();
digest = String.format("%s, idletimeout=[%s]", digest, idleTimeoutMillis);
} else {
idleTimeoutMillis = -1L;
}

TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();
final TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
final DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();

final boolean isSourceWatermark =
newDynamicTableSource instanceof SupportsSourceWatermark
&& hasSourceWatermarkDeclaration(watermarkExpr);

final RelDataType newType;
if (useWatermarkAssignerRowType) {
Expand All @@ -96,24 +100,68 @@ protected FlinkLogicalTableSourceScan getNewScan(
newType = scan.getRowType();
}

WatermarkPushDownSpec watermarkPushDownSpec =
new WatermarkPushDownSpec(
watermarkExpr,
idleTimeoutMillis,
(RowType) FlinkTypeFactory.toLogicalType(newType));
watermarkPushDownSpec.apply(newDynamicTableSource, SourceAbilityContext.from(scan));
final RowType producedType = (RowType) FlinkTypeFactory.toLogicalType(newType);
final SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);

final SourceAbilitySpec abilitySpec;
if (isSourceWatermark) {
final SourceWatermarkSpec sourceWatermarkSpec =
new SourceWatermarkSpec(true, producedType);
sourceWatermarkSpec.apply(newDynamicTableSource, abilityContext);
abilitySpec = sourceWatermarkSpec;
} else {
final Duration idleTimeout =
tableConfig
.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
final long idleTimeoutMillis;
if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
idleTimeoutMillis = idleTimeout.toMillis();
digest = String.format("%s, idletimeout=[%s]", digest, idleTimeoutMillis);
} else {
idleTimeoutMillis = -1L;
}

final WatermarkPushDownSpec watermarkPushDownSpec =
new WatermarkPushDownSpec(watermarkExpr, idleTimeoutMillis, producedType);
watermarkPushDownSpec.apply(newDynamicTableSource, abilityContext);
abilitySpec = watermarkPushDownSpec;
}

TableSourceTable newTableSourceTable =
tableSourceTable.copy(
newDynamicTableSource,
newType,
new String[] {digest},
new SourceAbilitySpec[] {watermarkPushDownSpec});
new SourceAbilitySpec[] {abilitySpec});
return FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
}

protected boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan scan) {
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
return tableSourceTable != null
&& tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
if (tableSourceTable == null) {
return false;
}
final DynamicTableSource tableSource = tableSourceTable.tableSource();
return (tableSource instanceof SupportsWatermarkPushDown)
|| (tableSource instanceof SupportsSourceWatermark
&& hasSourceWatermarkDeclaration(tableSourceTable));
}

private boolean hasSourceWatermarkDeclaration(TableSourceTable table) {
final ResolvedSchema schema = table.catalogTable().getResolvedSchema();
final List<WatermarkSpec> specs = schema.getWatermarkSpecs();
// we only support one watermark spec for now
if (specs.size() != 1) {
return false;
}
final ResolvedExpression watermarkExpr = specs.get(0).getWatermarkExpression();
final FunctionDefinition function = unwrapFunctionDefinition(watermarkExpr);
return function == BuiltInFunctionDefinitions.SOURCE_WATERMARK;
}

private boolean hasSourceWatermarkDeclaration(RexNode rexNode) {
final FunctionDefinition function = unwrapFunctionDefinition(rexNode);
return function == BuiltInFunctionDefinitions.SOURCE_WATERMARK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.apache.flink.table.planner.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;

import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.tools.RelBuilder;

import javax.annotation.Nullable;

/**
* Utilities for quick access of commonly used instances (like {@link FlinkTypeFactory}) without
* long chains of getters or casting like {@code (FlinkTypeFactory)
Expand Down Expand Up @@ -78,6 +86,29 @@ public static FlinkContext unwrapContext(Context context) {
return context.unwrap(FlinkContext.class);
}

public static @Nullable FunctionDefinition unwrapFunctionDefinition(
ResolvedExpression expression) {
if (!(expression instanceof RexNodeExpression)) {
return null;
}
final RexNodeExpression rexNodeExpression = (RexNodeExpression) expression;
if (!(rexNodeExpression.getRexNode() instanceof RexCall)) {
return null;
}
return unwrapFunctionDefinition(rexNodeExpression.getRexNode());
}

public static @Nullable FunctionDefinition unwrapFunctionDefinition(RexNode rexNode) {
if (!(rexNode instanceof RexCall)) {
return null;
}
final RexCall call = (RexCall) rexNode;
if (!(call.getOperator() instanceof BridgingSqlFunction)) {
return null;
}
return ((BridgingSqlFunction) call.getOperator()).getDefinition();
}

private ShortcutUtils() {
// no instantiation
}
Expand Down
Loading

0 comments on commit 71122fd

Please sign in to comment.