Skip to content

Commit

Permalink
[FLINK-15092][java, table] Remove the restriction that CsvInputFormat…
Browse files Browse the repository at this point in the history
… expects at least one parser

CsvInputFormat (RowCsvInputFormat transitevely) had an artificial restriction that at least one parser
must have been provided. This is a limitation for a proper support of
projection pushdown in flink-table, when no fields from the input are
required. This lead to a safety condition check in PushProjectIntoTableSourceScanRule failing.

As part of this commit a TableSourceTestBase class was introduced where
we can check if table sourcec meet such requirements.
  • Loading branch information
dawidwys committed Dec 17, 2019
1 parent 0186e10 commit c325191
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public void open(FileInputSplit split) throws IOException {
@SuppressWarnings("unchecked")
FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();

//throw exception if no field parsers are available
if (fieldParsers.length == 0) {
throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
}

// create the value holders
this.parsedValues = new Object[fieldParsers.length];
for (int i = 0; i < fieldParsers.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
private int[] fieldPosMap;
private boolean emptyColumnAsNull;

public RowCsvInputFormat(Path filePath, TypeInformation[] fieldTypeInfos, String lineDelimiter, String fieldDelimiter, int[] selectedFields, boolean emptyColumnAsNull) {
public RowCsvInputFormat(
Path filePath,
TypeInformation[] fieldTypeInfos,
String lineDelimiter,
String fieldDelimiter,
int[] selectedFields,
boolean emptyColumnAsNull) {

super(filePath);
this.arity = fieldTypeInfos.length;
if (arity == 0) {
throw new IllegalArgumentException("At least one field must be specified");
}
if (arity != selectedFields.length) {
throw new IllegalArgumentException("Number of field types and selected fields must be the same");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,33 @@ public void testScanOrder() throws Exception {
assertEquals(333, result.getField(0));
assertEquals(777, result.getField(1));
assertEquals(0, result.getField(2));
}

@Test
public void testEmptyProjection() throws Exception {
String fileContent =
"111|222|333\n" +
"000|999|888";
FileInputSplit split = createTempFile(fileContent);

RowCsvInputFormat format = new RowCsvInputFormat(
PATH,
new TypeInformation[0],
new int[0]);

format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);

Row result = new Row(0);

// check first row
result = format.nextRecord(result);
assertNotNull(result);

// check second row
result = format.nextRecord(result);
assertNotNull(result);
}

private static FileInputSplit createTempFile(String content) throws IOException {
Expand Down
7 changes: 7 additions & 0 deletions flink-table/flink-table-api-java-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,12 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public TableSchema getTableSchema() {
@Override
public CsvTableSource projectFields(int[] fields) {
if (fields.length == 0) {
fields = new int[]{0};
fields = new int[0];
}
return new CsvTableSource(config.select(fields));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.sources;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.utils.TypeConversions;

/**
* Tests for {@link CsvTableSource}.
*/
public class CsvTableSourceTest extends TableSourceTestBase {

@Override
protected TableSource<?> createTableSource(TableSchema requestedSchema) {
CsvTableSource.Builder builder = CsvTableSource.builder()
.path("ignored")
.fieldDelimiter("|");

requestedSchema.getTableColumns().forEach(
column -> builder.field(column.getName(), TypeConversions.fromDataTypeToLegacyInfo(column.getType()))
);

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.sources;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

import org.junit.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeThat;

/**
* Collection of tests that verify assumptions that table sources should meet.
*/
public abstract class TableSourceTestBase {

/**
* Constructs a table source to be tested.
*
* @param requestedSchema A requested schema for the table source. Some tests require particular
* behavior depending on the schema of a source.
* @return table source to be tested
*/
protected abstract TableSource<?> createTableSource(TableSchema requestedSchema);

/**
* Checks that {@link ProjectableTableSource#projectFields(int[])} returns a table source with
* a different {@link TableSource#explainSource()} even when filtering out all fields.
*
* <p>Required by {@code PushProjectIntoTableSourceScanRule}.
*/
@Test
public void testEmptyProjection() {
TableSource<?> source = createTableSource(
TableSchema.builder()
.field("f0", DataTypes.INT())
.build()
);
assumeThat(source, instanceOf(ProjectableTableSource.class));

ProjectableTableSource<?> projectableTableSource = (ProjectableTableSource<?>) source;

TableSource<?> newTableSource = projectableTableSource.projectFields(new int[0]);
assertThat(newTableSource.explainSource(), not(equalTo(source.explainSource())));
}

/**
* Checks that {@link ProjectableTableSource#projectFields(int[])} returns a table source with
* a different {@link TableSource#explainSource()}, but same schema.
*
* <p>Required by {@code PushProjectIntoTableSourceScanRule}.
*/
@Test
public void testProjectionReturnsDifferentSource() {
TableSource<?> source = createTableSource(
TableSchema.builder()
.field("f0", DataTypes.INT())
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT())
.build()
);
assumeThat(source, instanceOf(ProjectableTableSource.class));

ProjectableTableSource<?> projectableTableSource = (ProjectableTableSource<?>) source;

TableSource<?> newTableSource = projectableTableSource.projectFields(new int[] {0, 2});
assertThat(newTableSource.explainSource(), not(equalTo(source.explainSource())));
assertThat(newTableSource.getTableSchema(), equalTo(source.getTableSchema()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TableSourceTest extends TableTestBase {
"DataSetCalc",
s"BatchTableSourceScan(table=[[default_catalog, default_database, $tableName]], " +
s"fields=[], " +
s"source=[CsvTableSource(read fields: first)])",
s"source=[CsvTableSource(read fields: )])",
term("select", "1 AS _c0")
)

Expand Down

0 comments on commit c325191

Please sign in to comment.