Skip to content

Commit

Permalink
Simple state working.
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Feb 15, 2015
1 parent 9d08833 commit b01e288
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
25 changes: 21 additions & 4 deletions ephgraph/src/main/scala/Drv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.log4j.Logger
import org.apache.log4j.Level


object Drv {
def main(args : Array[String]) {
// Create SparkContext -- boilerplate
val conf = new SparkConf().setAppName("graphMod")
val conf = new SparkConf().setAppName("Drv")
val sc = new SparkContext(conf)

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
/* Read graph data from input file, and create raw RDD.
* Input graph format: one edge per line, where each line is a
* string "srcId dstId weight".
* 'file' is set to a MappedRDD object.
*/
val file = sc.textFile("/user/tree1024.gr")
val file = sc.textFile("/user/akshay/tree1024.gr")


// Map 'file' to EdgeRDD, by parsing records appropriately.
Expand All @@ -28,9 +33,21 @@ object Drv {
var gr = Graph.fromEdges(edrdd, 0)

val gmod = new ephGraph()
val grInit = gr.mapVertices{ (vid,vattr) => gmod.Memo(Double.MaxValue,Map[VertexId,Double]())}

/// pregel ///
// val initMsg = Map[VertexId, Double]()
val grInit = gr.mapVertices{ (vid,vattr) =>
val newDist = if (vid == 0) 0 else Double.MaxValue
gmod.Memo(newDist,Map[VertexId,Double]())}
//var g = grInit.mapVertices( (vid,vattr) => gmod.vertexProgram(vid,vattr,initMsg))
// var msg = g.mapReduceTriplets(gmod.sendMessage, gmod.mergeMsgs)
//g.vertices.saveAsTextFile("/user/akshay/delta/v")
//g.edges.saveAsTextFile("/user/akshay/delta/e")
//msg.saveAsTextFile("/user/akshay/delta/m")

//val grInit = gr.mapVertices{ (vid,vattr) => gmod.Memo(Double.MaxValue,Map[VertexId,Double]())}
val grSD = gmod.run(grInit)
gmod.saveToText("/user/delta/", grSD)
gmod.saveToText("/user/akshay/delta/d", grSD)

//grSD.vertices.map{ case (vid,vattr) => (vid,vattr.distSoFar)}.saveAsTextFile("/user/akshay/delta/r")

Expand Down
16 changes: 10 additions & 6 deletions ephgraph/src/main/scala/ephGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ephGraph extends java.io.Serializable {
* Takes two MsgDigests, and merges them. Note that the '++' operator adds new keys if they
* don't exist, or updates values if keys exist.
*/
private def mergeMsgs(memoMsg : MsgDigest, newMsg : MsgDigest) : MsgDigest =
def mergeMsgs(memoMsg : MsgDigest, newMsg : MsgDigest) : MsgDigest =
memoMsg ++ newMsg

private def computeState(msgs : MsgDigest) : Double =
Expand All @@ -115,7 +115,7 @@ class ephGraph extends java.io.Serializable {
*/
def vertexProgram(id : VertexId, memo : Memo, messages : MsgDigest) : Memo = {
val newMsgs = mergeMsgs(memo.msgs, messages)
val newDist = computeState(newMsgs)
val newDist = if (newMsgs.isEmpty) memo.dist else computeState(newMsgs)
return Memo(newDist, newMsgs)
}

Expand All @@ -127,11 +127,15 @@ class ephGraph extends java.io.Serializable {
* else, not.
*/
def sendMessage(edge: EdgeTriplet[Memo,Double]) : Iterator[(VertexId, MsgDigest)] = {
val potentialMsg = edge.srcAttr.dist + edge.attr
if (edge.dstAttr.msgs.contains(edge.srcId) && (edge.dstAttr.msgs(edge.srcId) == potentialMsg))
if (edge.srcAttr.dist == Double.MaxValue)
return Iterator.empty
else
return Iterator((edge.dstId, Map(edge.srcId -> potentialMsg)))
else {
val potentialMsg = edge.srcAttr.dist + edge.attr
if (edge.dstAttr.msgs.contains(edge.srcId) && (edge.dstAttr.msgs(edge.srcId) == potentialMsg))
return Iterator.empty
else
return Iterator((edge.dstId, Map(edge.srcId -> potentialMsg)))
}
}


Expand Down

0 comments on commit b01e288

Please sign in to comment.