Skip to content

Commit

Permalink
[SPARK-15945][MLLIB] Conversion between old/new vector columns in a D…
Browse files Browse the repository at this point in the history
…ataFrame (Scala/Java)

## What changes were proposed in this pull request?

This PR provides conversion utils between old/new vector columns in a DataFrame. So users can use it to migrate their datasets and pipelines manually. The methods are implemented under `MLUtils` and called `convertVectorColumnsToML` and `convertVectorColumnsFromML`. Both take a DataFrame and a list of vector columns to be converted. It is a no-op on vector columns that are already converted. A warning message is logged if actual conversion happens.

This is the first sub-task under SPARK-15944 to make it easier to migrate existing pipelines to Spark 2.0.

## How was this patch tested?

Unit tests in Scala and Java.

cc: yanboliang

Author: Xiangrui Meng <[email protected]>

Closes apache#13662 from mengxr/SPARK-15945.
  • Loading branch information
mengxr authored and yanboliang committed Jun 15, 2016
1 parent 42a28ca commit 63e0aeb
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 8 deletions.
117 changes: 111 additions & 6 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

package org.apache.spark.mllib.util

import scala.annotation.varargs
import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{VectorUDT => MLVectorUDT}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.random.BernoulliCellSampler

/**
* Helper methods to load, save and pre-process data used in ML Lib.
*/
@Since("0.8.0")
object MLUtils {
object MLUtils extends Logging {

private[mllib] lazy val EPSILON = {
var eps = 1.0
Expand All @@ -50,7 +55,6 @@ object MLUtils {
* where the indices are one-based and in ascending order.
* This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]],
* where the feature indices are converted to zero-based.
*
* @param sc Spark context
* @param path file or directory path in any Hadoop-supported file system URI
* @param numFeatures number of features, which will be determined from the input data if a
Expand Down Expand Up @@ -145,7 +149,6 @@ object MLUtils {
* Save labeled data in LIBSVM format.
* @param data an RDD of LabeledPoint to be saved
* @param dir directory to save the data
*
* @see [[org.apache.spark.mllib.util.MLUtils#loadLibSVMFile]]
*/
@Since("1.0.0")
Expand Down Expand Up @@ -253,6 +256,110 @@ object MLUtils {
}
}

/**
* Converts vector columns in an input Dataset from the [[org.apache.spark.mllib.linalg.Vector]]
* type to the new [[org.apache.spark.ml.linalg.Vector]] type under the `spark.ml` package.
* @param dataset input dataset
* @param cols a list of vector columns to be converted. New vector columns will be ignored. If
* unspecified, all old vector columns will be converted except nested ones.
* @return the input [[DataFrame]] with old vector columns converted to the new vector type
*/
@Since("2.0.0")
@varargs
def convertVectorColumnsToML(dataset: Dataset[_], cols: String*): DataFrame = {
val schema = dataset.schema
val colSet = if (cols.nonEmpty) {
cols.flatMap { c =>
val dataType = schema(c).dataType
if (dataType.getClass == classOf[VectorUDT]) {
Some(c)
} else {
// ignore new vector columns and raise an exception on other column types
require(dataType.getClass == classOf[MLVectorUDT],
s"Column $c must be old Vector type to be converted to new type but got $dataType.")
None
}
}.toSet
} else {
schema.fields
.filter(_.dataType.getClass == classOf[VectorUDT])
.map(_.name)
.toSet
}

if (colSet.isEmpty) {
return dataset.toDF()
}

logWarning("Vector column conversion has serialization overhead. " +
"Please migrate your datasets and workflows to use the spark.ml package.")

// TODO: This implementation has performance issues due to unnecessary serialization.
// TODO: It is better (but trickier) if we can cast the old vector type to new type directly.
val convertToML = udf { v: Vector => v.asML }
val exprs = schema.fields.map { field =>
val c = field.name
if (colSet.contains(c)) {
convertToML(col(c)).as(c, field.metadata)
} else {
col(c)
}
}
dataset.select(exprs: _*)
}

/**
* Converts vector columns in an input Dataset to the [[org.apache.spark.ml.linalg.Vector]] type
* from the new [[org.apache.spark.mllib.linalg.Vector]] type under the `spark.ml` package.
* @param dataset input dataset
* @param cols a list of vector columns to be converted. Old vector columns will be ignored. If
* unspecified, all new vector columns will be converted except nested ones.
* @return the input [[DataFrame]] with new vector columns converted to the old vector type
*/
@Since("2.0.0")
@varargs
def convertVectorColumnsFromML(dataset: Dataset[_], cols: String*): DataFrame = {
val schema = dataset.schema
val colSet = if (cols.nonEmpty) {
cols.flatMap { c =>
val dataType = schema(c).dataType
if (dataType.getClass == classOf[MLVectorUDT]) {
Some(c)
} else {
// ignore old vector columns and raise an exception on other column types
require(dataType.getClass == classOf[VectorUDT],
s"Column $c must be new Vector type to be converted to old type but got $dataType.")
None
}
}.toSet
} else {
schema.fields
.filter(_.dataType.getClass == classOf[MLVectorUDT])
.map(_.name)
.toSet
}

if (colSet.isEmpty) {
return dataset.toDF()
}

logWarning("Vector column conversion has serialization overhead. " +
"Please migrate your datasets and workflows to use the spark.ml package.")

// TODO: This implementation has performance issues due to unnecessary serialization.
// TODO: It is better (but trickier) if we can cast the new vector type to old type directly.
val convertFromML = udf { Vectors.fromML _ }
val exprs = schema.fields.map { field =>
val c = field.name
if (colSet.contains(c)) {
convertFromML(col(c)).as(c, field.metadata)
} else {
col(c)
}
}
dataset.select(exprs: _*)
}

/**
* Returns the squared Euclidean distance between two vectors. The following formula will be used
* if it does not introduce too much numerical error:
Expand All @@ -261,7 +368,6 @@ object MLUtils {
* </pre>
* When both vector norms are given, this is faster than computing the squared distance directly,
* especially when one of the vectors is a sparse vector.
*
* @param v1 the first vector
* @param norm1 the norm of the first vector, non-negative
* @param v2 the second vector
Expand Down Expand Up @@ -314,7 +420,6 @@ object MLUtils {
* When `x` is positive and large, computing `math.log(1 + math.exp(x))` will lead to arithmetic
* overflow. This will happen when `x > 709.78` which is not a very large number.
* It can be addressed by rewriting the formula into `x + math.log1p(math.exp(-x))` when `x > 0`.
*
* @param x a floating-point value as input.
* @return the result of `math.log(1 + math.exp(x))`.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.util;

import java.util.Collections;

import org.junit.Assert;
import org.junit.Test;

import org.apache.spark.SharedSparkSession;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

public class JavaMLUtilsSuite extends SharedSparkSession {

@Test
public void testConvertVectorColumnsToAndFromML() {
Vector x = Vectors.dense(2.0);
Dataset<Row> dataset = spark.createDataFrame(
Collections.singletonList(new LabeledPoint(1.0, x)), LabeledPoint.class
).select("label", "features");
Dataset<Row> newDataset1 = MLUtils.convertVectorColumnsToML(dataset);
Row new1 = newDataset1.first();
Assert.assertEquals(RowFactory.create(1.0, x.asML()), new1);
Row new2 = MLUtils.convertVectorColumnsToML(dataset, "features").first();
Assert.assertEquals(new1, new2);
Row old1 = MLUtils.convertVectorColumnsFromML(newDataset1).first();
Assert.assertEquals(RowFactory.create(1.0, x), old1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import scala.io.Source
import breeze.linalg.{squaredDistance => breezeSquaredDistance}
import com.google.common.io.Files

import org.apache.spark.SparkException
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.util.Utils

class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
Expand Down Expand Up @@ -245,4 +247,58 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(log1pExp(-13.8) ~== math.log1p(math.exp(-13.8)) absTol 1E-10)
assert(log1pExp(-238423789.865) ~== math.log1p(math.exp(-238423789.865)) absTol 1E-10)
}

test("convertVectorColumnsToML") {
val x = Vectors.sparse(2, Array(1), Array(1.0))
val metadata = new MetadataBuilder().putLong("numFeatures", 2L).build()
val y = Vectors.dense(2.0, 3.0)
val z = Vectors.dense(4.0)
val p = (5.0, z)
val w = Vectors.dense(6.0).asML
val df = spark.createDataFrame(Seq(
(0, x, y, p, w)
)).toDF("id", "x", "y", "p", "w")
.withColumn("x", col("x"), metadata)
val newDF1 = convertVectorColumnsToML(df)
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
val new1 = newDF1.first()
assert(new1 === Row(0, x.asML, y.asML, Row(5.0, z), w))
val new2 = convertVectorColumnsToML(df, "x", "y").first()
assert(new2 === new1)
val new3 = convertVectorColumnsToML(df, "y", "w").first()
assert(new3 === Row(0, x, y.asML, Row(5.0, z), w))
intercept[IllegalArgumentException] {
convertVectorColumnsToML(df, "p")
}
intercept[IllegalArgumentException] {
convertVectorColumnsToML(df, "p._2")
}
}

test("convertVectorColumnsFromML") {
val x = Vectors.sparse(2, Array(1), Array(1.0)).asML
val metadata = new MetadataBuilder().putLong("numFeatures", 2L).build()
val y = Vectors.dense(2.0, 3.0).asML
val z = Vectors.dense(4.0).asML
val p = (5.0, z)
val w = Vectors.dense(6.0)
val df = spark.createDataFrame(Seq(
(0, x, y, p, w)
)).toDF("id", "x", "y", "p", "w")
.withColumn("x", col("x"), metadata)
val newDF1 = convertVectorColumnsFromML(df)
assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.")
val new1 = newDF1.first()
assert(new1 === Row(0, Vectors.fromML(x), Vectors.fromML(y), Row(5.0, z), w))
val new2 = convertVectorColumnsFromML(df, "x", "y").first()
assert(new2 === new1)
val new3 = convertVectorColumnsFromML(df, "y", "w").first()
assert(new3 === Row(0, x, Vectors.fromML(y), Row(5.0, z), w))
intercept[IllegalArgumentException] {
convertVectorColumnsFromML(df, "p")
}
intercept[IllegalArgumentException] {
convertVectorColumnsFromML(df, "p._2")
}
}
}

0 comments on commit 63e0aeb

Please sign in to comment.