diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index 413d7a95cb796..792f19d260a70 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -548,7 +548,7 @@ An `OrcTableSource` is created as shown below: Configuration config = new Configuration(); OrcTableSource orcTableSource = OrcTableSource.builder() - // path to ORC file(s) + // path to ORC file(s). NOTE: By default, directories are recursively scanned. .path("file:///path/to/data") // schema of ORC files .forOrcSchema("struct>>") @@ -566,7 +566,7 @@ OrcTableSource orcTableSource = OrcTableSource.builder() val config = new Configuration() val orcTableSource = OrcTableSource.builder() - // path to ORC file(s) + // path to ORC file(s). NOTE: By default, directories are recursively scanned. .path("file:///path/to/data") // schema of ORC files .forOrcSchema("struct>>") diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java index 61575ad727cb1..a037962d0fe22 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java @@ -393,6 +393,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } } + // -------------------------------------------------------------------------------------------- + // Getter methods for tests + // -------------------------------------------------------------------------------------------- + + @VisibleForTesting + Configuration getConfiguration() { + return conf; + } + + @VisibleForTesting + int getBatchSize() { + return batchSize; + } + + @VisibleForTesting + String getSchema() { + return schema.toString(); + } + // -------------------------------------------------------------------------------------------- // Classes to define predicates // -------------------------------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java index 0eab4a043da3c..d895d6239573d 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java @@ -92,6 +92,8 @@ public class OrcTableSource private final Configuration orcConfig; // the number of rows to read in a batch private final int batchSize; + // flag whether a path is recursively enumerated + private final boolean recursiveEnumeration; // type information of the data returned by the InputFormat private final RowTypeInfo typeInfo; @@ -107,13 +109,15 @@ public class OrcTableSource * @param orcSchema The schema of the ORC files as TypeDescription. * @param orcConfig The configuration to read the ORC files. * @param batchSize The number of Rows to read in a batch, default is 1000. + * @param recursiveEnumeration Flag whether the path should be recursively enumerated or not. */ - private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { - this(path, orcSchema, orcConfig, batchSize, null, null); + private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize, boolean recursiveEnumeration) { + this(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, null, null); } private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, - int batchSize, int[] selectedFields, Predicate[] predicates) { + int batchSize, boolean recursiveEnumeration, + int[] selectedFields, Predicate[] predicates) { Preconditions.checkNotNull(path, "Path must not be null."); Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null."); @@ -123,6 +127,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc this.orcSchema = orcSchema; this.orcConfig = orcConfig; this.batchSize = batchSize; + this.recursiveEnumeration = recursiveEnumeration; this.selectedFields = selectedFields; this.predicates = predicates; @@ -146,6 +151,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { OrcRowInputFormat orcIF = buildOrcInputFormat(); + orcIF.setNestedFileEnumeration(recursiveEnumeration); if (selectedFields != null) { orcIF.selectFields(selectedFields); } @@ -175,7 +181,7 @@ public TableSchema getTableSchema() { @Override public TableSource projectFields(int[] selectedFields) { // create a copy of the OrcTableSouce with new selected fields - return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates); + return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, predicates); } @Override @@ -190,7 +196,7 @@ public TableSource applyPredicate(List predicates) { } } - return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{})); + return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, orcPredicates.toArray(new Predicate[]{})); } @Override @@ -405,8 +411,11 @@ public static class Builder { private int batchSize = 0; + private boolean recursive = true; + /** * Sets the path of the ORC file(s). + * If the path specifies a directory, it will be recursively enumerated. * * @param path The path of the ORC file(s). * @return The builder. @@ -418,6 +427,21 @@ public Builder path(String path) { return this; } + /** + * Sets the path of the ORC file(s). + * + * @param path The path of the ORC file(s). + * @param recursive Flag whether the to enumerate + * @return The builder. + */ + public Builder path(String path, boolean recursive) { + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty."); + this.path = path; + this.recursive = recursive; + return this; + } + /** * Sets the ORC schema of the files to read as a String. * @@ -483,7 +507,7 @@ public OrcTableSource build() { // set default batch size this.batchSize = DEFAULT_BATCH_SIZE; } - return new OrcTableSource(this.path, this.schema, this.config, this.batchSize); + return new OrcTableSource(this.path, this.schema, this.config, this.batchSize, this.recursive); } } diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java index 0d946ae45a0a4..fab994e8472cf 100644 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.typeutils.MapTypeInfo; @@ -37,6 +38,7 @@ import org.apache.flink.table.expressions.ResolvedFieldReference; import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -233,6 +235,56 @@ public void testApplyPredicate() throws Exception { assertFalse(orc.isFilterPushedDown()); } + @Test + public void testBuilder() throws Exception { + + // validate path, schema, and recursive enumeration default (enabled) + OrcTableSource orc1 = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); + + DataSet rows1 = orc1.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + OrcRowInputFormat orcIF1 = (OrcRowInputFormat) ((DataSource) rows1).getInputFormat(); + assertEquals(true, orcIF1.getNestedFileEnumeration()); + assertEquals(getPath(TEST_FILE_NESTED), orcIF1.getFilePath().toString()); + assertEquals(TEST_SCHEMA_NESTED, orcIF1.getSchema()); + + // validate recursive enumeration disabled + OrcTableSource orc2 = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED), false) + .forOrcSchema(TEST_SCHEMA_NESTED) + .build(); + + DataSet rows2 = orc2.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + OrcRowInputFormat orcIF2 = (OrcRowInputFormat) ((DataSource) rows2).getInputFormat(); + assertEquals(false, orcIF2.getNestedFileEnumeration()); + + // validate Hadoop configuration + Configuration conf = new Configuration(); + conf.set("testKey", "testValue"); + OrcTableSource orc3 = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .withConfiguration(conf) + .build(); + + DataSet rows3 = orc3.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + OrcRowInputFormat orcIF3 = (OrcRowInputFormat) ((DataSource) rows3).getInputFormat(); + assertEquals(conf, orcIF3.getConfiguration()); + + // validate batch size + OrcTableSource orc4 = OrcTableSource.builder() + .path(getPath(TEST_FILE_NESTED)) + .forOrcSchema(TEST_SCHEMA_NESTED) + .withBatchSize(987) + .build(); + + DataSet rows4 = orc4.getDataSet(ExecutionEnvironment.createLocalEnvironment()); + OrcRowInputFormat orcIF4 = (OrcRowInputFormat) ((DataSource) rows4).getInputFormat(); + assertEquals(987, orcIF4.getBatchSize()); + } + private String getPath(String fileName) { return getClass().getClassLoader().getResource(fileName).getPath(); }