Skip to content

Commit

Permalink
add internals
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryLead committed Aug 20, 2014
1 parent a65717a commit b115086
Show file tree
Hide file tree
Showing 33 changed files with 1,001 additions and 38 deletions.
Binary file added .cache
Binary file not shown.
8 changes: 8 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.scala-ide.sdt.launching.SCALA_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
<classpathentry kind="con" path="org.eclipse.jdt.USER_LIBRARY/SparkLib-1.0.1"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin
18 changes: 18 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>SparkLearning</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.scala-ide.sdt.core.scalabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.scala-ide.sdt.core.scalanature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
11 changes: 11 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.7
5 changes: 5 additions & 0 deletions .settings/org.scala-ide.sdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
eclipse.preferences.version=1
organizeimports.expandcollapse=expand
organizeimports.groups=java$scala$org$com
organizeimports.scalapackage=false
organizeimports.wildcards=scalaz$scalaz.Scalaz
Binary file added src/.DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions src/api/examples/Cartesian.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ object Cartesian {
val x = sc.parallelize(List(1, 2, 3, 4, 5))
val y = sc.parallelize(List(6, 7, 8, 9, 10))

println(x ++ y ++ x)
val result = x.cartesian(y)
//result.collect
result.foreach(println)
Expand Down
26 changes: 19 additions & 7 deletions src/api/examples/Cogroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,40 @@ object Cogroup {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Cogroup Test")

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
b.foreach(println)
val a = sc.parallelize(List(1, 2, 1, 3), 2)
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
val d = a.map((_, "b"))
//b.foreach(println)
// output:
// (1,b)
// (2,b)
// (1,b)
// (3,b)
val c = a.map((_, "c"))
c.foreach(println)
val e = b.map((_, "c"))
//c.foreach(println)
// output:
// (1,c)
// (2,c)
// (1,c)
// (3,c)

val result = b.cogroup(c)
//val result = b.cogroup(c)
val result = d.cogroup(e, 4)
result.foreach(println)

println(result.toDebugString)
// output:
// (1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
// (3,(ArrayBuffer(b),ArrayBuffer(c)))
// (2,(ArrayBuffer(b),ArrayBuffer(c)))

/*
* MappedValuesRDD[5] at cogroup at Cogroup.scala:28 (3 partitions)
* CoGroupedRDD[4] at cogroup at Cogroup.scala:28 (3 partitions)
* MappedRDD[2] at map at Cogroup.scala:12 (2 partitions)
* ParallelCollectionRDD[0] at parallelize at Cogroup.scala:10 (2 partitions)
* MappedRDD[3] at map at Cogroup.scala:19 (3 partitions)
* ParallelCollectionRDD[1] at parallelize at Cogroup.scala:11 (3 partitions)
*
*/
}
}
12 changes: 6 additions & 6 deletions src/api/examples/GroupByKeyPair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ object GroupByKeyPair {
val pairs = d.keyBy(x => x % 10)

val result1 = pairs.groupByKey()
val result2 = pairs.groupByKey(3)
val result3 = pairs.groupByKey(new RangePartitioner(3, pairs))
//val result2 = pairs.groupByKey(3)
//val result3 = pairs.groupByKey(new RangePartitioner(3, pairs))

println("Result 1:")
result1.foreach(println)

println("Result 2:")
result2.foreach(println)
//println("Result 2:")
//result2.foreach(println)

println("Result 3:")
result3.foreach(println)
//println("Result 3:")
//result3.foreach(println)

}
}
24 changes: 24 additions & 0 deletions src/api/examples/IntersectionTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package api.examples

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.RangePartitioner

object IntersectionTest {

def main(args: Array[String]) {

val sc = new SparkContext("local", "Intersection Test")
val a = sc.parallelize(List(1, 2, 3, 3, 4, 5), 3)
val b = sc.parallelize(List(1, 2, 5, 6), 2)
//val c = sc.parallelize(List(1, 2, 3), 1)

val r = a.intersection(b)
//r.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

println(r.toDebugString)
// [PartitionIndex 1] 1
// [PartitionIndex 2] 5
// [PartitionIndex 2] 2
}
}
55 changes: 55 additions & 0 deletions src/internals/IntersectionTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.RangePartitioner

object IntersectionTest {

def main(args: Array[String]) {

val sc = new SparkContext("local", "Intersection Test")
val a = sc.parallelize(List(1, 2, 3, 3, 4, 5), 3)
val b = sc.parallelize(List(1, 2, 5, 6), 2)


val r = a.intersection(b)

a.foreachWith(i => i)((x, i) => println("[aIndex " + i + "] " + x))
b.foreachWith(i => i)((x, i) => println("[bIndex " + i + "] " + x))
r.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

println(r.toDebugString)

/*
[aIndex 0] 1
[aIndex 0] 2
[aIndex 1] 3
[aIndex 1] 3
[aIndex 2] 4
[aIndex 2] 5
[bIndex 0] 1
[bIndex 0] 2
[bIndex 1] 5
[bIndex 1] 6
[PartitionIndex 1] 1
[PartitionIndex 2] 5
[PartitionIndex 2] 2
MappedRDD[7] at intersection at IntersectionTest.scala:16 (3 partitions)
FilteredRDD[6] at intersection at IntersectionTest.scala:16 (3 partitions)
MappedValuesRDD[5] at intersection at IntersectionTest.scala:16 (3 partitions)
CoGroupedRDD[4] at intersection at IntersectionTest.scala:16 (3 partitions)
MappedRDD[2] at intersection at IntersectionTest.scala:16 (3 partitions)
ParallelCollectionRDD[0] at parallelize at IntersectionTest.scala:12 (3 partitions)
MappedRDD[3] at intersection at IntersectionTest.scala:16 (2 partitions)
ParallelCollectionRDD[1] at parallelize at IntersectionTest.scala:13 (2 partitions)
*/
}
}
24 changes: 24 additions & 0 deletions src/internals/RepartitionTest2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner

object RepartitionTest2 {
def main(args: Array[String]) {

val sc = new SparkContext("local", "repartition Test")
val data = Array[(Int, Char)]((3, 'a'), (2, 'b'),
(1, 'c'), (4, 'd'))
val pairs1 = sc.parallelize(data, 3).partitionBy(new HashPartitioner(2))

pairs1.foreachWith(i => i)((x, i) => println("[pairs1-Index " + i + "] " + x))
}
}
/*
[pairs1-Index 0] (3,a)
[pairs1-Index 0] (2,b)
[pairs1-Index 0] (1,c)
[pairs1-Index 1] (4,d)
*/
38 changes: 38 additions & 0 deletions src/internals/broadcastTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internals

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object broadcast {
def main(args: Array[String]) {

val bcName = "Http"
val blockSize = "4096"

System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
"BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
val sparkConf = new SparkConf().setAppName("Broadcast Test").setMaster("local")

val sc = new SparkContext(sparkConf)

val slices = 2
val num = 100

val arr1 = new Array[Int](num)

for (i <- 0 until arr1.length) {
arr1(i) = i
}

val data = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)

val barr1 = sc.broadcast(arr1)
val observedSizes = sc.parallelize(1 to 4, slices).map(_ => barr1.value.size)
// Collect the small RDD so we can print the observed sizes locally.
observedSizes.collect().foreach(i => println(i))

//println(barr1.value.size)
//barr1.value.collect
}
}
55 changes: 55 additions & 0 deletions src/internals/cartesianTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object cartesianTest {
def main(args: Array[String]) {

val sc = new SparkContext("local", "cartesian Test")
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'))
val pairs1 = sc.parallelize(data1, 2)

val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'))
val pairs2 = sc.parallelize(data2, 2)

val result = pairs1.cartesian(pairs2)

//pairs1.foreachWith(i => i)((x, i) => println("[pairs1-Index " + i + "] " + x))
//pairs2.foreachWith(i => i)((x, i) => println("[pairs2-Index " + i + "] " + x))
result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

//println(result.toDebugString)
}
}
/*
[pairs1-Index 0] (1,a)
[pairs1-Index 0] (2,b)
[pairs1-Index 1] (3,c)
[pairs1-Index 1] (4,d)
[pairs2-Index 0] (1,A)
[pairs2-Index 1] (2,B)
[PartitionIndex 0] ((1,a),(1,A))
[PartitionIndex 0] ((2,b),(1,A))
[PartitionIndex 1] ((1,a),(2,B))
[PartitionIndex 1] ((2,b),(2,B))
[PartitionIndex 2] ((3,c),(1,A))
[PartitionIndex 2] ((4,d),(1,A))
[PartitionIndex 3] ((3,c),(2,B))
[PartitionIndex 3] ((4,d),(2,B))
CartesianRDD[2] at cartesian at cartesianTest.scala:17 (4 partitions)
ParallelCollectionRDD[0] at parallelize at cartesianTest.scala:12 (2 partitions)
ParallelCollectionRDD[1] at parallelize at cartesianTest.scala:15 (2 partitions)
*/

Loading

0 comments on commit b115086

Please sign in to comment.