Skip to content

Commit

Permalink
[SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDi…
Browse files Browse the repository at this point in the history
…rectory` to `TestUtils`

### What changes were proposed in this pull request?
`SpecificParquetRecordReaderBase.listDirectory`  is used to return the list of files at `path` recursively and the result will skips files that are ignored normally by MapReduce.

This method is only used by tests in Spark now and the tests also includes non-parquet test scenario, such as `OrcColumnarBatchReaderSuite`.

So this pr move this method from `SpecificParquetRecordReaderBase` to `TestUtils` to make it as a test method.

### Why are the changes needed?
Refactoring: move test method to `TestUtils`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA

Closes apache#35177 from LuciferYang/list-directory.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
LuciferYang authored and srowen committed Jan 15, 2022
1 parent 8ae9707 commit 7614472
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 40 deletions.
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,21 @@ private[spark] object TestUtils {
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Returns the list of files at 'path' recursively. This skips files that are ignored normally
* by MapReduce.
*/
def listDirectory(path: File): Array[String] = {
val result = ArrayBuffer.empty[String]
if (path.isDirectory) {
path.listFiles.foreach(f => result.appendAll(listDirectory(f)))
} else {
val c = path.getName.charAt(0)
if (c != '.' && c != '_') result.append(path.getAbsolutePath)
}
result.toArray
}

/** Creates a temp JSON file that contains the input JSON record. */
def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = {
val file = File.createTempFile(prefix, ".json", dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -121,25 +119,6 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
}
}

/**
* Returns the list of files at 'path' recursively. This skips files that are ignored normally
* by MapReduce.
*/
public static List<String> listDirectory(File path) {
List<String> result = new ArrayList<>();
if (path.isDirectory()) {
for (File f: path.listFiles()) {
result.addAll(listDirectory(f));
}
} else {
char c = path.getName().charAt(0);
if (c != '.' && c != '_') {
result.add(path.getAbsolutePath());
}
}
return result;
}

/**
* Initializes the reader to read the file at `path` with `columns` projected. If columns is
* null, all the columns are projected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.SparkConf
import org.apache.spark.TestUtils
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader}
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
Expand Down Expand Up @@ -167,7 +168,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
sqlBenchmark.run()

// Driving the parquet reader in batch mode directly.
val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
val files = TestUtils.listDirectory(new File(dir, "parquet"))
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { _ =>
Expand All @@ -183,7 +184,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
case DoubleType => (col: ColumnVector, i: Int) => doubleSum += col.getDouble(i)
}

files.map(_.asInstanceOf[String]).foreach { p =>
files.foreach { p =>
val reader = new VectorizedParquetRecordReader(
enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
Expand Down Expand Up @@ -468,12 +469,12 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
}
}

val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
val files = TestUtils.listDirectory(new File(dir, "parquet"))
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
benchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
files.foreach { p =>
val reader = new VectorizedParquetRecordReader(
enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription

import org.apache.spark.TestUtils
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -117,7 +117,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
val partitionValues = new GenericInternalRow(Array(v))
val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
val file = new File(TestUtils.listDirectory(dir).head)
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.TestUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -50,12 +51,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
(1 :: 1000 :: Nil).foreach { n => {
withTempPath { dir =>
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
reader.initialize(file, null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
assert(batch.numRows() == n)
Expand All @@ -80,12 +81,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
withTempPath { dir =>
val data = List.fill(n)(NULL_ROW).toDF
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
reader.initialize(file.asInstanceOf[String], null)
reader.initialize(file, null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
assert(batch.numRows() == n)
Expand Down Expand Up @@ -114,7 +115,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
// first page is dictionary encoded and the remaining two are plain encoded.
val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString))
data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
val file = TestUtils.listDirectory(dir).head

val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -928,7 +928,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
withTempPath { dir =>
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
val file = TestUtils.listDirectory(dir).head;
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
Expand Down Expand Up @@ -1032,7 +1032,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
val vectorizedReader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val file = TestUtils.listDirectory(dir).head

try {
vectorizedReader.initialize(file, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetM
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.MessageType

import org.apache.spark.TestUtils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -179,7 +180,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
}

def getMetaData(dir: java.io.File): Map[String, String] = {
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val file = TestUtils.listDirectory(dir).head
val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
Expand All @@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -764,7 +763,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
withTempPath { dir =>
val path = dir.getAbsolutePath
df.write.mode("overwrite").parquet(path)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val file = TestUtils.listDirectory(dir).head

val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), new Configuration())
val f = ParquetFileReader.open(hadoopInputFile)
Expand Down

0 comments on commit 7614472

Please sign in to comment.