Skip to content

Commit

Permalink
Batch union rdd.
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Feb 23, 2015
1 parent e3624ef commit 96305b0
Showing 1 changed file with 50 additions and 12 deletions.
62 changes: 50 additions & 12 deletions ephgraph/src/main/scala/ephGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class ephGraph extends java.io.Serializable {
* post-init graph.
* *return* : new graph with updated edge weight.
*/

def updateEdge(edge:String, gr:Graph[Memo,Double]) : Graph[Memo,Double] = {
// parse edge into edge components
val comps = edge.split(" ")
Expand All @@ -59,29 +58,35 @@ class ephGraph extends java.io.Serializable {
* gr : Post-first run graph
* *return* : Updated graph
*/

def updateEdgeBatch(edges:RDD[String], gr:Graph[Memo,Double]) : Graph[Memo,Double] = {
var g = gr
// This is wasteful, figure out correct partitioning strategy.
val edArray = edges.collect()
// TODO: Get rid of 'for', probably a bad way of doing things.
for (line <- edArray)
g = updateEdge(line,g)
return g
}

/*
* Create edge -- to be used when the edge does not exist.
* edge : String -- src,dst,wt
* gr : Post-first run graph, type Memo,Double
* fileName : String, file-path on hdfs that contains a list of updates.
* sc : SparkContext
* gr : Post-first run graph, type [Memo,Double]
* *return* : New [Memo,Double] graph with updated edge applied.
*/
def ceateEdge(edge:String, gr:Graph[Memo,Double]) : Graph[Memo,Double] = {
val src = edge.split(" ")(0).toLong
val dst = edge.split(" ")(1).toLong
val wt = edge.split(" ")(2).toDouble
val newEd = sc.parallelize(Array(Edge(src,dst,wt)))
def ceateEdges(fileName:String, sc : SparkContext, gr:Graph[Memo,Double]) : Graph[Memo,Double] = {
// Read hdfs file to RDD[String]
val edString = sc.textFile(fileName)

// convert to RDD[Edge[Double]]
val edRdd = mapStringToEdge(edString)

// Define default vertex property, in case of new vertices.
val defaultVertexProp = Memo(Double.MaxValue, Map[VertexId,Double]())
return Graph(gr.vertices, gr.edges ++ newEd, defaultVertexProp)

// Create new graph.
return Graph(gr.vertices, gr.edges ++ edRdd, defaultVertexProp)
}

/****
Expand Down Expand Up @@ -153,11 +158,44 @@ class ephGraph extends java.io.Serializable {
}
}

/* Convert a string-of-edges-RDD to an RDD-of-edges.
edString : Edges in string form ("src dst wt"), of type RDD[String]
*return* : Above mapped to RDD[Edge[Double]]
*/
def mapStringToEdge(edString : RDD[String]) : RDD[Edge[Double]] = {
return edString.map{ ed =>
val src = ed.split(" ")(0).toLong
val dst = ed.split(" ")(1).toLong
val wt = ed.split(" ")(2).toDouble
new Edge(src,dst,wt) }
}

/* Read graph off a text file on hdfs.
* fileName : String, path of input file on hdfs. Input file
* contains one edge per line, where an edge is a string
* "src dst wt".
* sc : Spark context for this app.
* *return* : Graph of type [Memo, Double]
*/
def readGraphFromFile(fileName : String, sc : SparkContext) : Graph[Memo,Double] = {
// Read file. edString is of type RDD[String]
val edString = sc.textFile(fileName)

// Convert to RDD[Edge[Double]]
val edRdd = mapStringToEdge(edString)

// Create and return graph from RDD[Edge[Double]]
return initAttr(Graph.fromEdges(edRdd,0))
}


/* Initialize vertices and convert Graph[Int,Double] to Graph[Memo,Double]
*/
def initAttr(gr : Graph[Int,Double]) : Graph[Memo,Double] =
return gr.mapVertices( (vid,attr) => Memo(Double.MaxValue, Map[VertexId, Double]()) )
def initAttr(gr : Graph[Int,Double]) : Graph[Memo,Double] = {
return gr.mapVertices{ (vid,vattr) =>
val newDist = if (vid == 0) 0 else Double.MaxValue
gmod.Memo(newDist,Map[VertexId,Double]())}
}

def run(gr : Graph[Memo,Double]) : Graph[Memo,Double] = {
val initMsg = Map[VertexId, Double]()
Expand Down

0 comments on commit 96305b0

Please sign in to comment.