Skip to content

Commit

Permalink
Readme Updated and Spark Code Added
Browse files Browse the repository at this point in the history
  • Loading branch information
revantkumar committed Apr 24, 2015
1 parent 2d9f8b7 commit 41c3812
Show file tree
Hide file tree
Showing 14 changed files with 988 additions and 1 deletion.
File renamed without changes.
File renamed without changes.
File renamed without changes.
15 changes: 15 additions & 0 deletions SparkCode/SimpleApp/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name := "MRS"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-graphx" % "1.2.1" % "provided"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.2.1" % "provided"

libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"

resolvers += Resolver.sonatypeRepo("public")
Binary file added SparkCode/SimpleApp/hd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions SparkCode/SimpleApp/src/main/scala/5 14898471
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
5 14898471
10 20782149
20 32532654
50 69492644
60 79577044
100 133809487
53 changes: 53 additions & 0 deletions SparkCode/SimpleApp/src/main/scala/AbstractParams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



import scala.reflect.runtime.universe._

/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {

private def tag: TypeTag[T] = typeTag[T]

/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(this)
allAccessors.map { f =>
val paramName = f.name.toString
val fieldMirror = instanceMirror.reflectField(f)
val paramValue = fieldMirror.get
s" $paramName:\t$paramValue"
}.mkString("{\n", ",\n", "\n}")
}
}
153 changes: 153 additions & 0 deletions SparkCode/SimpleApp/src/main/scala/Analytics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.graphx

import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.PartitionStrategy._

/**
* Driver program for running graph algorithms.
*/
object Analytics extends Logging {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
"Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.err.println("Supported 'taskType' as follows:")
System.err.println(" pagerank Compute PageRank")
System.err.println(" cc Compute the connected components of vertices")
System.err.println(" triangles Count the number of triangles")
System.exit(1)
}

val taskType = args(0)
val fname = args(1)
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)

val conf = new SparkConf()
GraphXUtils.registerKryoClasses(conf)

val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
println("Set the number of edge partitions using --numEPart.")
sys.exit(1)
}
val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
.map(PartitionStrategy.fromString(_))
val edgeStorageLevel = options.remove("edgeStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
val vertexStorageLevel = options.remove("vertexStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)

taskType match {
case "pagerank" =>
val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
val outFname = options.remove("output").getOrElse("")
val numIterOpt = options.remove("numIter").map(_.toInt)

options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

println("======================================")
println("| PageRank |")
println("======================================")

val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)

val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case None => PageRank.runUntilConvergence(graph, tol)
}).vertices.cache()

println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))

if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}

sc.stop()

case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

println("======================================")
println("| Connected Components |")
println("======================================")

val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
sc.stop()

case "triangles" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

println("======================================")
println("| Triangle Count |")
println("======================================")

val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
// TriangleCount requires the graph to be partitioned
.partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
case (vid, data) => data.toLong
}.reduce(_ + _) / 3)
sc.stop()

case _ =>
println("Invalid task type.")
}
}
}
144 changes: 144 additions & 0 deletions SparkCode/SimpleApp/src/main/scala/LocalALS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



import org.apache.commons.math3.linear._

/**
* Alternating least squares matrix factorization.
*
* This is an example implementation for learning how to use Spark. For more conventional use,
* please refer to org.apache.spark.mllib.recommendation.ALS
*/
object LocalALS {

// Parameters set through command line arguments
var M = 0 // Number of movies
var U = 0 // Number of users
var F = 0 // Number of features
var ITERATIONS = 0
val LAMBDA = 0.01 // Regularization coefficient

def generateR(): RealMatrix = {
val mh = randomMatrix(M, F)
val uh = randomMatrix(U, F)
mh.multiply(uh.transpose())
}

def rmse(targetR: RealMatrix, ms: Array[RealVector], us: Array[RealVector]): Double = {
val r = new Array2DRowRealMatrix(M, U)
for (i <- 0 until M; j <- 0 until U) {
r.setEntry(i, j, ms(i).dotProduct(us(j)))
}
val diffs = r.subtract(targetR)
var sumSqs = 0.0
for (i <- 0 until M; j <- 0 until U) {
val diff = diffs.getEntry(i, j)
sumSqs += diff * diff
}
math.sqrt(sumSqs / (M.toDouble * U.toDouble))
}

def updateMovie(i: Int, m: RealVector, us: Array[RealVector], R: RealMatrix) : RealVector = {
var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
var Xty: RealVector = new ArrayRealVector(F)
// For each user that rated the movie
for (j <- 0 until U) {
val u = us(j)
// Add u * u^t to XtX
XtX = XtX.add(u.outerProduct(u))
// Add u * rating to Xty
Xty = Xty.add(u.mapMultiply(R.getEntry(i, j)))
}
// Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
XtX.addToEntry(d, d, LAMBDA * U)
}
// Solve it with Cholesky
new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}

def updateUser(j: Int, u: RealVector, ms: Array[RealVector], R: RealMatrix) : RealVector = {
var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
var Xty: RealVector = new ArrayRealVector(F)
// For each movie that the user rated
for (i <- 0 until M) {
val m = ms(i)
// Add m * m^t to XtX
XtX = XtX.add(m.outerProduct(m))
// Add m * rating to Xty
Xty = Xty.add(m.mapMultiply(R.getEntry(i, j)))
}
// Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
XtX.addToEntry(d, d, LAMBDA * M)
}
// Solve it with Cholesky
new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}

def showWarning() {
System.err.println(
"""WARN: This is a naive implementation of ALS and is given as an example!
|Please use the ALS method found in org.apache.spark.mllib.recommendation
|for more conventional use.
""".stripMargin)
}

def main(args: Array[String]) {

args match {
case Array(m, u, f, iters) => {
M = m.toInt
U = u.toInt
F = f.toInt
ITERATIONS = iters.toInt
}
case _ => {
System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
System.exit(1)
}
}

showWarning()

println(s"Running with M=$M, U=$U, F=$F, iters=$ITERATIONS")

val R = generateR()

// Initialize m and u randomly
var ms = Array.fill(M)(randomVector(F))
var us = Array.fill(U)(randomVector(F))

// Iteratively update movies then users
for (iter <- 1 to ITERATIONS) {
println(s"Iteration $iter:")
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
println("RMSE = " + rmse(R, ms, us))
println()
}
}

private def randomVector(n: Int): RealVector =
new ArrayRealVector(Array.fill(n)(math.random))

private def randomMatrix(rows: Int, cols: Int): RealMatrix =
new Array2DRowRealMatrix(Array.fill(rows, cols)(math.random))

}
Loading

0 comments on commit 41c3812

Please sign in to comment.