Skip to content

Commit

Permalink
[FLINK-8243] [orc] OrcTableSource reads input path recursively by def…
Browse files Browse the repository at this point in the history
…ault.

This closes apache#5344.
  • Loading branch information
fhueske committed Feb 5, 2018
1 parent ede4c07 commit de3d85b
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ An `OrcTableSource` is created as shown below:
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
Expand All @@ -566,7 +566,7 @@ OrcTableSource orcTableSource = OrcTableSource.builder()
val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}

// --------------------------------------------------------------------------------------------
// Getter methods for tests
// --------------------------------------------------------------------------------------------

@VisibleForTesting
Configuration getConfiguration() {
return conf;
}

@VisibleForTesting
int getBatchSize() {
return batchSize;
}

@VisibleForTesting
String getSchema() {
return schema.toString();
}

// --------------------------------------------------------------------------------------------
// Classes to define predicates
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class OrcTableSource
private final Configuration orcConfig;
// the number of rows to read in a batch
private final int batchSize;
// flag whether a path is recursively enumerated
private final boolean recursiveEnumeration;

// type information of the data returned by the InputFormat
private final RowTypeInfo typeInfo;
Expand All @@ -107,13 +109,15 @@ public class OrcTableSource
* @param orcSchema The schema of the ORC files as TypeDescription.
* @param orcConfig The configuration to read the ORC files.
* @param batchSize The number of Rows to read in a batch, default is 1000.
* @param recursiveEnumeration Flag whether the path should be recursively enumerated or not.
*/
private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
this(path, orcSchema, orcConfig, batchSize, null, null);
private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize, boolean recursiveEnumeration) {
this(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, null, null);
}

private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig,
int batchSize, int[] selectedFields, Predicate[] predicates) {
int batchSize, boolean recursiveEnumeration,
int[] selectedFields, Predicate[] predicates) {

Preconditions.checkNotNull(path, "Path must not be null.");
Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null.");
Expand All @@ -123,6 +127,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc
this.orcSchema = orcSchema;
this.orcConfig = orcConfig;
this.batchSize = batchSize;
this.recursiveEnumeration = recursiveEnumeration;
this.selectedFields = selectedFields;
this.predicates = predicates;

Expand All @@ -146,6 +151,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
OrcRowInputFormat orcIF = buildOrcInputFormat();
orcIF.setNestedFileEnumeration(recursiveEnumeration);
if (selectedFields != null) {
orcIF.selectFields(selectedFields);
}
Expand Down Expand Up @@ -175,7 +181,7 @@ public TableSchema getTableSchema() {
@Override
public TableSource<Row> projectFields(int[] selectedFields) {
// create a copy of the OrcTableSouce with new selected fields
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates);
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, predicates);
}

@Override
Expand All @@ -190,7 +196,7 @@ public TableSource<Row> applyPredicate(List<Expression> predicates) {
}
}

return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{}));
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, orcPredicates.toArray(new Predicate[]{}));
}

@Override
Expand Down Expand Up @@ -405,8 +411,11 @@ public static class Builder {

private int batchSize = 0;

private boolean recursive = true;

/**
* Sets the path of the ORC file(s).
* If the path specifies a directory, it will be recursively enumerated.
*
* @param path The path of the ORC file(s).
* @return The builder.
Expand All @@ -418,6 +427,21 @@ public Builder path(String path) {
return this;
}

/**
* Sets the path of the ORC file(s).
*
* @param path The path of the ORC file(s).
* @param recursive Flag whether the to enumerate
* @return The builder.
*/
public Builder path(String path, boolean recursive) {
Preconditions.checkNotNull(path, "Path must not be null.");
Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty.");
this.path = path;
this.recursive = recursive;
return this;
}

/**
* Sets the ORC schema of the files to read as a String.
*
Expand Down Expand Up @@ -483,7 +507,7 @@ public OrcTableSource build() {
// set default batch size
this.batchSize = DEFAULT_BATCH_SIZE;
}
return new OrcTableSource(this.path, this.schema, this.config, this.batchSize);
return new OrcTableSource(this.path, this.schema, this.config, this.batchSize, this.recursive);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.table.expressions.ResolvedFieldReference;
import org.apache.flink.types.Row;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -233,6 +235,56 @@ public void testApplyPredicate() throws Exception {
assertFalse(orc.isFilterPushedDown());
}

@Test
public void testBuilder() throws Exception {

// validate path, schema, and recursive enumeration default (enabled)
OrcTableSource orc1 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();

DataSet<Row> rows1 = orc1.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF1 = (OrcRowInputFormat) ((DataSource) rows1).getInputFormat();
assertEquals(true, orcIF1.getNestedFileEnumeration());
assertEquals(getPath(TEST_FILE_NESTED), orcIF1.getFilePath().toString());
assertEquals(TEST_SCHEMA_NESTED, orcIF1.getSchema());

// validate recursive enumeration disabled
OrcTableSource orc2 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED), false)
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();

DataSet<Row> rows2 = orc2.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF2 = (OrcRowInputFormat) ((DataSource) rows2).getInputFormat();
assertEquals(false, orcIF2.getNestedFileEnumeration());

// validate Hadoop configuration
Configuration conf = new Configuration();
conf.set("testKey", "testValue");
OrcTableSource orc3 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.withConfiguration(conf)
.build();

DataSet<Row> rows3 = orc3.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF3 = (OrcRowInputFormat) ((DataSource) rows3).getInputFormat();
assertEquals(conf, orcIF3.getConfiguration());

// validate batch size
OrcTableSource orc4 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.withBatchSize(987)
.build();

DataSet<Row> rows4 = orc4.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF4 = (OrcRowInputFormat) ((DataSource) rows4).getInputFormat();
assertEquals(987, orcIF4.getBatchSize());
}

private String getPath(String fileName) {
return getClass().getClassLoader().getResource(fileName).getPath();
}
Expand Down

0 comments on commit de3d85b

Please sign in to comment.