Skip to content

Commit 72ecfd0

Browse files
MrBagojkbradley
authored andcommitted
[SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large vertexIds
## What changes were proposed in this pull request? runParallelPersonalizedPageRank in graphx checks that `sources` are <= Int.MaxValue.toLong, but this is not actually required. This check seems to have been added because we use sparse vectors in the implementation and sparse vectors cannot be indexed by values > MAX_INT. However we do not ever index the sparse vector by the source vertexIds so this isn't an issue. I've added a test with large vertexIds to confirm this works as expected. ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22139 from MrBago/remove-veretexId-check-pppr. Authored-by: Bago Amirbekian <[email protected]> Signed-off-by: Joseph K. Bradley <[email protected]>
1 parent 99d2e4e commit 72ecfd0

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala

+10-18
Original file line numberDiff line numberDiff line change
@@ -184,25 +184,23 @@ object PageRank extends Logging {
184184
* indexed by the position of nodes in the sources list) and
185185
* edge attributes the normalized edge weight
186186
*/
187-
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
188-
numIter: Int, resetProb: Double = 0.15,
189-
sources: Array[VertexId]): Graph[Vector, Double] = {
187+
def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
188+
graph: Graph[VD, ED],
189+
numIter: Int,
190+
resetProb: Double = 0.15,
191+
sources: Array[VertexId]): Graph[Vector, Double] = {
190192
require(numIter > 0, s"Number of iterations must be greater than 0," +
191193
s" but got ${numIter}")
192194
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
193195
s" to [0, 1], but got ${resetProb}")
194196
require(sources.nonEmpty, s"The list of sources must be non-empty," +
195197
s" but got ${sources.mkString("[", ",", "]")}")
196198

197-
// TODO if one sources vertex id is outside of the int range
198-
// we won't be able to store its activations in a sparse vector
199-
require(sources.max <= Int.MaxValue.toLong,
200-
s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}")
201199
val zero = Vectors.sparse(sources.size, List()).asBreeze
202-
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
203-
val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
204-
(vid, v)
205-
}.toMap
200+
// map of vid -> vector where for each vid, the _position of vid in source_ is set to 1.0
201+
val sourcesInitMap = sources.zipWithIndex.toMap.mapValues { i =>
202+
Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
203+
}
206204
val sc = graph.vertices.sparkContext
207205
val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
208206
// Initialize the PageRank graph with each edge attribute having
@@ -212,13 +210,7 @@ object PageRank extends Logging {
212210
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
213211
// Set the weight on the edges based on the degree
214212
.mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
215-
.mapVertices { (vid, attr) =>
216-
if (sourcesInitMapBC.value contains vid) {
217-
sourcesInitMapBC.value(vid)
218-
} else {
219-
zero
220-
}
221-
}
213+
.mapVertices((vid, _) => sourcesInitMapBC.value.getOrElse(vid, zero))
222214

223215
var i = 0
224216
while (i < numIter) {

graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala

+25-7
Original file line numberDiff line numberDiff line change
@@ -203,24 +203,42 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
203203

204204
test("Chain PersonalizedPageRank") {
205205
withSpark { sc =>
206-
val chain1 = (0 until 9).map(x => (x, x + 1) )
206+
// Check that implementation can handle large vertexIds, SPARK-25149
207+
val vertexIdOffset = Int.MaxValue.toLong + 1
208+
val sourceOffest = 4
209+
val source = vertexIdOffset + sourceOffest
210+
val numIter = 10
211+
val vertices = vertexIdOffset until vertexIdOffset + numIter
212+
val chain1 = vertices.zip(vertices.tail)
207213
val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) }
208214
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
209215
val resetProb = 0.15
210216
val tol = 0.0001
211-
val numIter = 10
212217
val errorTol = 1.0e-1
213218

214-
val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices
215-
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
219+
val a = resetProb / (1 - Math.pow(1 - resetProb, numIter - sourceOffest))
220+
// We expect the rank to decay as (1 - resetProb) ^ distance
221+
val expectedRanks = sc.parallelize(vertices).map { vid =>
222+
val rank = if (vid < source) {
223+
0.0
224+
} else {
225+
a * Math.pow(1 - resetProb, vid - source)
226+
}
227+
vid -> rank
228+
}
229+
val expected = VertexRDD(expectedRanks)
230+
231+
val staticRanks = chain.staticPersonalizedPageRank(source, numIter, resetProb).vertices
232+
assert(compareRanks(staticRanks, expected) < errorTol)
216233

217-
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
234+
val dynamicRanks = chain.personalizedPageRank(source, tol, resetProb).vertices
235+
assert(compareRanks(dynamicRanks, expected) < errorTol)
218236

219237
val parallelStaticRanks = chain
220-
.staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
238+
.staticParallelPersonalizedPageRank(Array(source), numIter, resetProb).mapVertices {
221239
case (vertexId, vector) => vector(0)
222240
}.vertices.cache()
223-
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
241+
assert(compareRanks(parallelStaticRanks, expected) < errorTol)
224242
}
225243
}
226244

0 commit comments

Comments
 (0)