Skip to content

Commit

Permalink
[SPARK-33101][ML] Make LibSVM format propagate Hadoop config from DS …
Browse files Browse the repository at this point in the history
…options to underlying HDFS file system

### What changes were proposed in this pull request?
Propagate LibSVM options to Hadoop configs in the LibSVM datasource.

### Why are the changes needed?
There is a bug that when running:
```scala
spark.read.format("libsvm").options(conf).load(path)
```
The underlying file system will not receive the `conf` options.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, for example, users should read files from Azure Data Lake successfully:
```scala
def hadoopConf1() = Map[String, String](
  s"fs.adl.oauth2.access.token.provider.type" -> "ClientCredential",
  s"fs.adl.oauth2.client.id" -> dbutils.secrets.get(scope = "...", key = "..."),
  s"fs.adl.oauth2.credential" -> dbutils.secrets.get(scope = "...", key = "..."),
  s"fs.adl.oauth2.refresh.url" -> s"https://login.microsoftonline.com/.../oauth2/token")
val df = spark.read.format("libsvm").options(hadoopConf1).load("adl://....azuredatalakestore.net/foldersp1/...")
```
and not get the following exception because the settings above are not propagated to the filesystem:
```java
java.lang.IllegalArgumentException: No value for fs.adl.oauth2.access.token.provider found in conf file.
	at ....adl.AdlFileSystem.getNonEmptyVal(AdlFileSystem.java:820)
	at ....adl.AdlFileSystem.getCustomAccessTokenProvider(AdlFileSystem.java:220)
	at ....adl.AdlFileSystem.getAccessTokenProvider(AdlFileSystem.java:257)
	at ....adl.AdlFileSystem.initialize(AdlFileSystem.java:164)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
```

### How was this patch tested?
Added UT to `LibSVMRelationSuite`.

Closes apache#29984 from MaxGekk/ml-option-propagation.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
MaxGekk authored and dongjoon-hyun committed Oct 9, 2020
1 parent 3beab8d commit 1234c66
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[libsvm] class LibSVMFileFormat
"'numFeatures' option to avoid the extra scan.")

val paths = files.map(_.getPath.toString)
val parsed = MLUtils.parseLibSVMFile(sparkSession, paths)
val parsed = MLUtils.parseLibSVMFile(sparkSession, paths, options)
MLUtils.computeNumFeatures(parsed)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ object MLUtils extends Logging {
}

private[spark] def parseLibSVMFile(
sparkSession: SparkSession, paths: Seq[String]): RDD[(Double, Array[Int], Array[Double])] = {
sparkSession: SparkSession,
paths: Seq[String],
options: Map[String, String]): RDD[(Double, Array[Int], Array[Double])] = {
val lines = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = Map(DataSource.GLOB_PATHS_KEY -> "false")
options = options ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SaveMode}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.util.Utils


class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext with SQLHelper {
// Path for dataset
var path: String = _

Expand Down Expand Up @@ -211,4 +212,13 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(v == Vectors.sparse(2, Seq((0, 2.0), (1, 3.0))))
}
}

test("SPARK-33101: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
val df = spark.read.option("ds_option", "value").format("libsvm").load(path)
assert(df.columns(0) == "label")
}
}
}

0 comments on commit 1234c66

Please sign in to comment.