Skip to content

Commit

Permalink
Improve performance of coGroups.
Browse files Browse the repository at this point in the history
Make changes to the implementation of cogroup to improve its
performance. In addition, modify the APIs - only keep 'coGroup' for two
DLists, and return the key in the resulting DList.
  • Loading branch information
espringe authored and blever committed Mar 13, 2012
1 parent c8c19d2 commit 7f00ce5
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 392 deletions.
1 change: 0 additions & 1 deletion examples/averageAge/project/project/build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ object Plugins extends Build {
uri("git://github.com/NICTA/sbt-scoobi.git#master")
)
}

2 changes: 1 addition & 1 deletion src/main/scala/com/nicta/scoobi/WireFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ object WireFormat {
}
}

implicit def LeftFmt[T1, T2](implicit wt1: WireFormat[T1]) = new WireFormat[Left[T1, T2]] {
implicit def LeftFmt[T1, T2](implicit wt1: WireFormat[T1]) = new WireFormat[Left[T1, T2]] {
def toWire(x: Left[T1, T2], out: DataOutput) = wt1.toWire(x.a, out)
def fromWire(in: DataInput): Left[T1, T2] = Left[T1, T2](wt1.fromWire(in))
def show(x: Left[T1, T2]) = x.toString
Expand Down
313 changes: 0 additions & 313 deletions src/main/scala/com/nicta/scoobi/lib/CoGroup.scala

This file was deleted.

37 changes: 33 additions & 4 deletions src/main/scala/com/nicta/scoobi/lib/Join.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import com.nicta.scoobi.WireFormat._
import com.nicta.scoobi.Grouping
import com.nicta.scoobi.DoFn
import com.nicta.scoobi.Emitter
import scala.collection.mutable.MutableList
import scala.collection.mutable.ArrayBuffer

object Join {

private def innerJoin[T, A, B] = new DoFn[((T, Boolean), Iterable[Either[A, B]]), (T, (A, B))] {
override def setup() {}
override def process(input: ((T, Boolean), Iterable[Either[A, B]]), emitter: Emitter[(T, (A, B))]) {
var alist: MutableList[A] = new MutableList()
var alist = new ArrayBuffer[A]

for (v <- input._2) {
v match {
Expand All @@ -46,7 +46,7 @@ object Join {
private def rightOuterJoin[T, A, B, A2](has: (T, A, B) => A2, notHas: (T, B) => A2) = new DoFn[((T, Boolean), Iterable[Either[A, B]]), (T, (A2, B))] {
override def setup() {}
override def process(input: ((T, Boolean), Iterable[Either[A, B]]), emitter: Emitter[(T, (A2, B))]) {
var alist: MutableList[A] = new MutableList()
var alist = new ArrayBuffer[A]

for (v <- input._2) {
v match {
Expand Down Expand Up @@ -89,7 +89,13 @@ object Join {

override def sortCompare(a: (K, Boolean), b: (K, Boolean)): Int = {
val n = groupCompare(a, b)
if (n != 0) n else implicitly[Ordering[Boolean]].compare(a._2, b._2)
if (n != 0)
n
else (a._2, b._2) match {
case (true, false) => -1
case (false, true) => 1
case _ => 0
}
}
}

Expand Down Expand Up @@ -135,4 +141,27 @@ object Join {
B : Manifest : WireFormat]
(d1: DList[(K, A)], d2: DList[(K, B)])
: DList[(K, (A, Option[B]))] = joinRight(d2, d1).map(v => (v._1, v._2.swap))


/** Perform a co-group of two (2) distributed lists */
def coGroup[K : Manifest : WireFormat : Grouping,
A : Manifest : WireFormat,
B : Manifest : WireFormat]
(d1: DList[(K, A)], d2: DList[(K, B)])
: DList[(K, (Iterable[A], Iterable[B]))] = {
val d1s: DList[(K, Either[A, B])] = d1 map { case (k, a1) => (k, Left(a1)) }
val d2s: DList[(K, Either[A, B])] = d2 map { case (k, a2) => (k, Right(a2)) }

(d1s ++ d2s).groupByKey map {
case (k, as) => {
val vb1 = new VectorBuilder[A]()
val vb2 = new VectorBuilder[B]()
as foreach {
case Left(a1) => vb1 += a1
case Right(a2) => vb2 += a2
}
(k, (vb1.result().toIterable, vb2.result().toIterable))
}
}
}
}
Loading

0 comments on commit 7f00ce5

Please sign in to comment.