Skip to content

Commit

Permalink
Refactor GLM algorithms and add Java tests
Browse files Browse the repository at this point in the history
This change adds Java examples and unit tests for all GLM algorithms
to make sure the MLLib interface works from Java. Changes include
- Introduce LabeledPoint and avoid using Doubles in train arguments
- Rename train to run in class methods
- Make the optimizer a member variable of GLM to make sure the builder
  pattern works
  • Loading branch information
shivaram committed Aug 7, 2013
1 parent 7388e27 commit 7db69d5
Show file tree
Hide file tree
Showing 22 changed files with 626 additions and 170 deletions.
85 changes: 85 additions & 0 deletions examples/src/main/java/spark/mllib/examples/JavaLR.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package spark.mllib.examples;


import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.Function;

import spark.mllib.classification.LogisticRegressionWithSGD;
import spark.mllib.classification.LogisticRegressionModel;
import spark.mllib.regression.LabeledPoint;

import java.util.Arrays;
import java.util.StringTokenizer;

/**
* Logistic regression based classification using ML Lib.
*/
public class JavaLR {

static class ParsePoint extends Function<String, LabeledPoint> {
public LabeledPoint call(String line) {
String[] parts = line.split(",");
Double y = Double.parseDouble(parts[0]);
StringTokenizer tok = new StringTokenizer(parts[1], " ");
int numTokens = tok.countTokens();
double[] x = new double[numTokens];
for (int i = 0; i < numTokens; ++i) {
x[i] = Double.parseDouble(tok.nextToken());
}
return new LabeledPoint(y, x);
}
}

public static void printWeights(double[] a) {
System.out.println(Arrays.toString(a));
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
System.exit(1);
}

JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);

// Another way to configure LogisticRegression
//
// LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
// lr.optimizer().setNumIterations(iterations)
// .setStepSize(stepSize)
// .setMiniBatchFraction(1.0);
// lr.setIntercept(true);
// LogisticRegressionModel model = lr.train(points.rdd());

LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);

System.out.print("Final w: ");
printWeights(model.weights());

System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ trait ClassificationModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Array[Double]]): RDD[Int]
def predict(testData: RDD[Array[Double]]): RDD[Double]

/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Int prediction from the trained model
*/
def predict(testData: Array[Double]): Int
def predict(testData: Array[Double]): Double
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import org.jblas.DoubleMatrix
class LogisticRegressionModel(
override val weights: Array[Double],
override val intercept: Double)
extends GeneralizedLinearModel[Int](weights, intercept)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
round(1.0/ (1.0 + math.exp(margin * -1))).toInt
round(1.0/ (1.0 + math.exp(margin * -1)))
}
}

Expand All @@ -49,12 +49,15 @@ class LogisticRegressionWithSGD (
var regParam: Double,
var miniBatchFraction: Double,
var addIntercept: Boolean)
extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel]
with GradientDescent with Serializable {
extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
with Serializable {

val gradient = new LogisticGradient()
val updater = new SimpleUpdater()

val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
/**
* Construct a LogisticRegression object with default parameters
*/
Expand Down Expand Up @@ -86,14 +89,14 @@ object LogisticRegressionWithSGD {
* the number of features in the data.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train(
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
input, initialWeights)
}

Expand All @@ -109,13 +112,13 @@ object LogisticRegressionWithSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train(
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
input)
}

Expand All @@ -131,7 +134,7 @@ object LogisticRegressionWithSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
Expand All @@ -149,23 +152,22 @@ object LogisticRegressionWithSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int)
: LogisticRegressionModel =
{
train(input, numIterations, 1.0, 1.0)
}

def main(args: Array[String]) {
if (args.length != 5) {
if (args.length != 4) {
println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
"<regularization_parameter> <niters>")
"<niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
val model = LogisticRegressionWithSGD.train(
data, args(4).toInt, args(2).toDouble, args(3).toDouble)
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)

sc.stop()
}
Expand Down
25 changes: 14 additions & 11 deletions mllib/src/main/scala/spark/mllib/classification/SVM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import org.jblas.DoubleMatrix
class SVMModel(
override val weights: Array[Double],
override val intercept: Double)
extends GeneralizedLinearModel[Int](weights, intercept)
extends GeneralizedLinearModel(weights, intercept)
with ClassificationModel with Serializable {

override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
intercept: Double) = {
signum(dataMatrix.dot(weightMatrix) + intercept).toInt
signum(dataMatrix.dot(weightMatrix) + intercept)
}
}

Expand All @@ -46,11 +46,14 @@ class SVMWithSGD private (
var regParam: Double,
var miniBatchFraction: Double,
var addIntercept: Boolean)
extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable {
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {

val gradient = new HingeGradient()
val updater = new SquaredL2Updater()

val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
/**
* Construct a SVM object with default parameters
*/
Expand Down Expand Up @@ -81,15 +84,15 @@ object SVMWithSGD {
* the number of features in the data.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: SVMModel =
{
new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input,
new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
initialWeights)
}

Expand All @@ -105,14 +108,14 @@ object SVMWithSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: SVMModel =
{
new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input)
new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}

/**
Expand All @@ -127,7 +130,7 @@ object SVMWithSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
Expand All @@ -146,7 +149,7 @@ object SVMWithSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
input: RDD[(Int, Array[Double])],
input: RDD[LabeledPoint],
numIterations: Int)
: SVMModel =
{
Expand All @@ -159,7 +162,7 @@ object SVMWithSGD {
System.exit(1)
}
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
val data = MLUtils.loadLabeledData(sc, args(1))
val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)

sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import org.jblas.DoubleMatrix

import scala.collection.mutable.ArrayBuffer

trait GradientDescent extends Optimizer {
class GradientDescent(gradient: Gradient, updater: Updater) extends Optimizer {

val gradient: Gradient
val updater: Updater

var stepSize: Double
var numIterations: Int
var regParam: Double
var miniBatchFraction: Double
var stepSize: Double = 1.0
var numIterations: Int = 100
var regParam: Double = 0.0
var miniBatchFraction: Double = 1.0

/**
* Set the step size per-iteration of SGD. Default 1.0.
Expand Down
Loading

0 comments on commit 7db69d5

Please sign in to comment.