Skip to content

Commit

Permalink
[FLINK-26467][table] Compile RowDataToStringConverter lazily
Browse files Browse the repository at this point in the history
This closes apache#19087.
  • Loading branch information
slinkydeveloper authored and twalthr committed Apr 8, 2022
1 parent 807da40 commit b03a448
Showing 1 changed file with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@
import org.apache.flink.table.data.utils.CastExecutor;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.table.utils.print.RowDataToStringConverter;

import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

import static org.apache.flink.table.api.DataTypes.STRING;

/** {@link RowData} to {@link String} converter using {@link CastRule}. */
@Internal
public final class RowDataToStringConverterImpl implements RowDataToStringConverter {

private final Function<RowData, String>[] columnConverters;
private final DataType dataType;
private final CastRule.Context castRuleContext;

private Function<RowData, String>[] columnConverters;

@VisibleForTesting
public RowDataToStringConverterImpl(DataType dataType) {
Expand All @@ -51,9 +52,14 @@ public RowDataToStringConverterImpl(DataType dataType) {
false);
}

@SuppressWarnings("unchecked")
public RowDataToStringConverterImpl(
DataType dataType, ZoneId zoneId, ClassLoader classLoader, boolean legacyBehaviour) {
this.dataType = dataType;
this.castRuleContext = CastRule.Context.create(legacyBehaviour, zoneId, classLoader);
}

@SuppressWarnings("unchecked")
private void init() {
List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType);
this.columnConverters = new Function[rowDataTypes.size()];

Expand All @@ -64,34 +70,29 @@ public RowDataToStringConverterImpl(
CastExecutor<Object, StringData> castExecutor =
(CastExecutor<Object, StringData>)
CastRuleProvider.create(
CastRule.Context.create(legacyBehaviour, zoneId, classLoader),
fieldType,
STRING().getLogicalType());
castRuleContext, fieldType, VarCharType.STRING_TYPE);
if (castExecutor == null) {
// Fallback in case no casting rule is defined, for example for MULTISET and
// STRUCTURED
// Links to https://issues.apache.org/jira/browse/FLINK-24403
this.columnConverters[index] =
row -> {
if (row.isNullAt(index)) {
return PrintStyle.NULL_VALUE;
}
return Objects.toString(getter.getFieldOrNull(row));
};
} else {
this.columnConverters[index] =
row -> {
if (row.isNullAt(index)) {
return PrintStyle.NULL_VALUE;
}
return castExecutor.cast(getter.getFieldOrNull(row)).toString();
};
throw new IllegalStateException(
"Cannot create a cast executor for converting "
+ fieldType
+ " to string. This is a bug, please open an issue.");
}
this.columnConverters[index] =
row -> {
if (row.isNullAt(index)) {
return PrintStyle.NULL_VALUE;
}
return castExecutor.cast(getter.getFieldOrNull(row)).toString();
};
}
}

@Override
public String[] convert(RowData rowData) {
if (this.columnConverters == null) {
init();
}

String[] result = new String[rowData.getArity()];
for (int i = 0; i < result.length; i++) {
result[i] = this.columnConverters[i].apply(rowData);
Expand Down

0 comments on commit b03a448

Please sign in to comment.