Skip to content

Commit

Permalink
[FLINK-21934][table] Add new StreamTableEnvironment.toDataStream
Browse files Browse the repository at this point in the history
This closes apache#15457.
  • Loading branch information
twalthr committed Mar 31, 2021
1 parent 511f028 commit ad91d55
Show file tree
Hide file tree
Showing 20 changed files with 1,084 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -355,6 +357,102 @@ <T, ACC> void registerFunction(
*/
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

/**
* Converts the given {@link Table} into a {@link DataStream}.
*
* <p>Since the DataStream API does not support changelog processing natively, this method
* assumes append-only/insert-only semantics during the table-to-stream conversion. The records
* of class {@link Row} will always describe {@link RowKind#INSERT} changes. Updating tables are
* not supported by this method and will produce an exception.
*
* <p>If you want to convert the {@link Table} to a specific class or data type, use {@link
* #toDataStream(Table, Class)} or {@link #toDataStream(Table, AbstractDataType)} instead.
*
* <p>Note that the type system of the table ecosystem is richer than the one of the DataStream
* API. The table runtime will make sure to properly serialize the output records to the first
* operator of the DataStream API. Afterwards, the {@link Types} semantics of the DataStream API
* need to be considered.
*
* <p>If the input table contains a single rowtime column, it will be propagated into a stream
* record's timestamp. Watermarks will be propagated as well.
*
* @param table The {@link Table} to convert.
* @return The converted {@link DataStream}.
* @see #toDataStream(Table, AbstractDataType)
*/
DataStream<Row> toDataStream(Table table);

/**
* Converts the given {@link Table} into a {@link DataStream} of the given {@link Class}.
*
* <p>See {@link #toDataStream(Table, AbstractDataType)} for more information on how a {@link
* Table} is translated into a {@link DataStream}.
*
* <p>This method is a shortcut for:
*
* <pre>
* tableEnv.toDataStream(table, DataTypes.of(class))
* </pre>
*
* <p>Calling this method with a class of {@link Row} will redirect to {@link
* #toDataStream(Table)}.
*
* @param table The {@link Table} to convert.
* @param targetClass The {@link Class} that decides about the final external representation in
* {@link DataStream} records.
* @param <T> External record.
* @return The converted {@link DataStream}.
*/
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

/**
* Converts the given {@link Table} into a {@link DataStream} of the given {@link DataType}.
*
* <p>The given {@link DataType} is used to configure the table runtime to convert columns and
* internal data structures to the desired representation. The following example shows how to
* convert the table columns into the fields of a POJO type.
*
* <pre>
* // given a Table of (name STRING, age INT)
*
* public static class MyPojo {
* public String name;
* public Integer age;
*
* // default constructor for DataStream API
* public MyPojo() {}
*
* // fully assigning constructor for field order in Table API
* public MyPojo(String name, Integer age) {
* this.name = name;
* this.age = age;
* }
* }
*
* tableEnv.toDataStream(table, DataTypes.of(MyPojo.class));
* </pre>
*
* <p>Since the DataStream API does not support changelog processing natively, this method
* assumes append-only/insert-only semantics during the table-to-stream conversion. Updating
* tables are not supported by this method and will produce an exception.
*
* <p>Note that the type system of the table ecosystem is richer than the one of the DataStream
* API. The table runtime will make sure to properly serialize the output records to the first
* operator of the DataStream API. Afterwards, the {@link Types} semantics of the DataStream API
* need to be considered.
*
* <p>If the input table contains a single rowtime column, it will be propagated into a stream
* record's timestamp. Watermarks will be propagated as well.
*
* @param table The {@link Table} to convert.
* @param targetDataType The {@link DataType} that decides about the final external
* representation in {@link DataStream} records.
* @param <T> External record.
* @return The converted {@link DataStream}.
* @see #toDataStream(Table)
*/
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

/**
* Converts the given {@link DataStream} into a {@link Table} with specified field names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -61,16 +62,20 @@
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExternalModifyOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.JavaExternalQueryOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -258,7 +263,7 @@ private <T> Table fromDataStreamInternal(
unresolvedIdentifier = getParser().parseIdentifier(viewPath);
} else {
unresolvedIdentifier =
UnresolvedIdentifier.of("Unregistered_DataStream_" + dataStream.getId());
UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId());
}
final ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
Expand Down Expand Up @@ -294,6 +299,84 @@ private <T> Table fromDataStreamInternal(
return createTable(projectOperation);
}

@Override
public DataStream<Row> toDataStream(Table table) {
Preconditions.checkNotNull(table, "Table must not be null.");
// include all columns of the query (incl. metadata and computed columns)
final DataType sourceType = table.getResolvedSchema().toSourceRowDataType();
return toDataStream(table, sourceType);
}

@Override
@SuppressWarnings("unchecked")
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass) {
Preconditions.checkNotNull(table, "Table must not be null.");
Preconditions.checkNotNull(targetClass, "Target class must not be null.");
if (targetClass == Row.class) {
// for convenience, we allow the Row class here as well
return (DataStream<T>) toDataStream(table);
}

return toDataStream(table, DataTypes.of(targetClass));
}

@Override
public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType) {
Preconditions.checkNotNull(table, "Table must not be null.");
Preconditions.checkNotNull(targetDataType, "Target data type must not be null.");
final CatalogManager catalogManager = getCatalogManager();
final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();

final ExternalSchemaTranslator.OutputResult schemaTranslationResult =
ExternalSchemaTranslator.fromInternal(
catalogManager.getDataTypeFactory(),
table.getResolvedSchema(),
targetDataType);

final List<String> projections = schemaTranslationResult.getProjections();
final QueryOperation projectOperation;
if (projections == null) {
projectOperation = table.getQueryOperation();
} else {
projectOperation =
operationTreeBuilder.project(
projections.stream()
.map(ApiExpressionUtils::unresolvedRef)
.collect(Collectors.toList()),
table.getQueryOperation());
}

final ResolvedSchema resolvedSchema =
schemaResolver.resolve(schemaTranslationResult.getSchema());

final UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(
"Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId());
final ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);

final ExternalModifyOperation modifyOperation =
new ExternalModifyOperation(
objectIdentifier,
projectOperation,
resolvedSchema,
ChangelogMode.insertOnly(),
schemaTranslationResult.getPhysicalDataType());

return toDataStreamInternal(table, modifyOperation);
}

private <T> DataStream<T> toDataStreamInternal(Table table, ModifyOperation modifyOperation) {
final List<Transformation<?>> transformations =
planner.translate(Collections.singletonList(modifyOperation));

final Transformation<T> transformation = getTransformation(table, transformations);

executionEnvironment.addOperator(transformation);
return new DataStream<>(executionEnvironment, transformation);
}

@Override
public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
Expand Down Expand Up @@ -358,7 +441,7 @@ public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo
table.getQueryOperation(),
TypeConversions.fromLegacyInfoToDataType(typeInfo),
OutputConversionModifyOperation.UpdateMode.APPEND);
return toDataStream(table, modifyOperation);
return toDataStreamInternal(table, modifyOperation);
}

@Override
Expand All @@ -375,7 +458,7 @@ public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(
table.getQueryOperation(),
wrapWithChangeFlag(typeInfo),
OutputConversionModifyOperation.UpdateMode.RETRACT);
return toDataStream(table, modifyOperation);
return toDataStreamInternal(table, modifyOperation);
}

@Override
Expand All @@ -397,17 +480,6 @@ public Pipeline getPipeline(String jobName) {
return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
}

private <T> DataStream<T> toDataStream(
Table table, OutputConversionModifyOperation modifyOperation) {
List<Transformation<?>> transformations =
planner.translate(Collections.singletonList(modifyOperation));

Transformation<T> transformation = getTransformation(table, transformations);

executionEnvironment.addOperator(transformation);
return new DataStream<>(executionEnvironment, transformation);
}

@Override
protected void validateTableSource(TableSource<?> tableSource) {
super.validateTableSource(tableSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ChangelogMode getChangelogMode() {

@Override
public String asSummaryString() {
Map<String, Object> args = new LinkedHashMap<>();
final Map<String, Object> args = new LinkedHashMap<>();
args.put("identifier", identifier);
args.put("stream", dataStream.getId());
args.put("type", physicalDataType);
Expand All @@ -101,7 +101,7 @@ public String asSummaryString() {
args.put("fields", resolvedSchema.getColumnNames());

return OperationUtils.formatWithChildren(
"DataStream", args, getChildren(), Operation::asSummaryString);
"DataStreamInput", args, getChildren(), Operation::asSummaryString);
}

@Override
Expand Down
Loading

0 comments on commit ad91d55

Please sign in to comment.