forked from szilard/benchm-ml
-
Notifications
You must be signed in to change notification settings - Fork 0
/
5-spark.txt
80 lines (56 loc) · 2.06 KB
/
5-spark.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
spark-1.3.0-bin-hadoop2.4/bin/spark-shell --driver-memory 30G --executor-memory 30G
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
def csv_to_sparse_labpoint(fname:String) : org.apache.spark.rdd.RDD[LabeledPoint] = {
val rdd = sc.textFile(fname).map({ line =>
val vv = line.split(',').map(_.toDouble)
val label = vv(0)
val X = vv.slice(1,vv.size)
val n = X.filter(_!=0).length
var X_ids = Array.fill(n){0}
var X_vals = Array.fill(n){0.0}
var kk = 0
for( k <- 0 to X.length-1) {
if (X(k)!=0) {
X_ids(kk) = k
X_vals(kk) = X(k)
kk = kk + 1
}
}
val features = Vectors.sparse(X.length, X_ids, X_vals)
LabeledPoint(label, features)
})
return rdd
}
val d_train_0 = csv_to_sparse_labpoint("spark-train-0.1m.csv")
val d_test = csv_to_sparse_labpoint("spark-test-0.1m.csv")
d_train_0.partitions.size
val d_train = d_train_0.repartition(32)
d_train.partitions.size
d_train.cache()
d_test.cache()
d_test.count()
d_train.count()
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 100
boostingStrategy.learningRate = 0.01
boostingStrategy.treeStrategy.maxDepth = 16
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val now = System.nanoTime
val model = GradientBoostedTrees.train(d_train, boostingStrategy)
( System.nanoTime - now )/1e9
val scoreAndLabels = d_test.map { point =>
//val score = model.predict(point.features) // 0/1
val pred_by_tree = model.trees.map(tree => tree.predict(point.features))
var score = 0.0
for(i <- 0 to model.numTrees-1) {
score = score + pred_by_tree(i) * model.treeWeights(i)
}
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)