Skip to content

Commit

Permalink
it works with dimsum
Browse files Browse the repository at this point in the history
  • Loading branch information
Aggharta committed Jul 12, 2016
1 parent d14ca61 commit 480166a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 120 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@
<artifactId>apache</artifactId>
<version>8</version>
</dependency>-->
<dependency>
<!--<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library-all</artifactId>
<version>2.12.0-M4</version>
</dependency>
<version>2.10.0-RC2</version>
</dependency>-->
</dependencies>
<build>
<plugins>
Expand Down
233 changes: 119 additions & 114 deletions src/main/java/de/hpi/mmds/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@
import org.apache.spark.mllib.linalg.DenseVector;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.linalg.distributed.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
//import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD;
//import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._;

import java.io.File;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;

import static de.hpi.mmds.transpose.*;


public class Main {
private static String reviewPath = "resources/reviews";
Expand Down Expand Up @@ -95,16 +97,19 @@ public static void main(String args[]) {
return new Tuple2<>(new Match(vectors, template), a._2);
});

JavaPairRDD<Match, Integer> repartitionedVectorRDD = vectorRDD.repartition(CPUS);
JavaPairRDD<Tuple2<Match, Integer>, Long> repartitionedVectorRDD = vectorRDD.repartition(CPUS).zipWithIndex();

JavaPairRDD<Long, Tuple2<Match, Integer>> swappedRepartitionedVectorRDD = repartitionedVectorRDD.mapToPair(Tuple2::swap);

repartitionedVectorRDD.cache();
JavaRDD<Vector> vectors = repartitionedVectorRDD.map( tuple -> tuple._1().getVectors().get(0).vector);
//repartitionedVectorRDD.cache();
JavaPairRDD<Vector, Long> indexedVectors = repartitionedVectorRDD.mapToPair((Tuple2<Tuple2<Match, Integer>, Long> tuple) -> (new Tuple2<Vector, Long>(tuple._1()._1().getVectors().get(0).vector, tuple._2())));
JavaRDD<IndexedRow> rows = indexedVectors.map(tuple -> new IndexedRow(tuple._2(), tuple._1()));

IndexedRowMatrix mat = new IndexedRowMatrix(vectors.rdd());
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

System.out.println("Transposing Matrix");

RowMatrix mat2 = transpose.transposeRowMatrix(mat);
RowMatrix mat2 = transposeRowMatrix(mat);

System.out.println("computing similarities");

Expand All @@ -115,26 +120,13 @@ public static void main(String args[]) {
JavaPairRDD<Long, Long> asd = graphops.getConnectedComponents(entries.filter(matrixEntry -> matrixEntry.value() > threshold).rdd()).
mapToPair((Tuple2<Object, Object> tuple) -> new Tuple2<Long, Long>(Long.parseLong(tuple._1().toString()), Long.parseLong(tuple._2().toString())));

Map<Long, Set<Long>> x = asd.aggregate(new HashMap<Long, Set<Long>>(),
(HashMap<Long, Set<Long>> map, Tuple2<Long, Long> tuple) -> {
if (!map.containsKey(tuple._2())) {
Set<Long> set = new HashSet<Long>();
set.add(tuple._1());
map.put(tuple._2(), set);
}else {
map.get(tuple._2()).add(tuple._1());
}
return map;
},
(HashMap<Long, Set<Long>> map1, HashMap<Long, Set<Long>> map2) -> {
map1.putAll(map2);
return map1;
});
System.out.println(x);
JavaPairRDD<Long, Match> h = asd.join(swappedRepartitionedVectorRDD).mapToPair(tuple -> new Tuple2<Long, Match>(tuple._2()._1(), tuple._2()._2()._1()));

JavaPairRDD<Long, Iterable<Match>> i2 = h.groupByKey();

//System.out.println(i.collectAsMap());


//asd.collect().stream().forEach(tuple -> System.out.println(tuple));


// List<MatrixEntry> e = entries.filter(matrixEntry -> matrixEntry.value()>threshold).collect();
Expand Down Expand Up @@ -245,95 +237,108 @@ public static void main(String args[]) {
// );
//
// System.out.println(clusters.size());

JavaRDD<MergedVector> mergedVectorRDD = i2.map(value -> {
Set<NGramm> ngrams = new HashSet<NGramm>();
value._2().iterator().forEachRemaining(it -> ngrams.add(it.getNGramm()));
Match mv = value._2().iterator().next();
return new MergedVector(mv.getVectors(), mv.template, ngrams, ngrams.size());
});
//
// JavaRDD<MergedVector> mergedVectorRDD = context.parallelize(clusters, CPUS);
// JavaPairRDD<MergedVector, Integer> unsortedClustersRDD = mergedVectorRDD.mapToPair(
// (t) -> new Tuple2<>(t, t.count));
//
// JavaPairRDD<MergedVector, Integer> sortedClustersRDD = unsortedClustersRDD.mapToPair(Tuple2::swap)
// .sortByKey(false).mapToPair(Tuple2::swap);
//
// JavaRDD<MergedVector> finalClusterRDD = sortedClustersRDD.map(Tuple2::_1);
//
// Set<String> features1 = new HashSet<>();
// Set<String> descriptions1 = new HashSet<>();
// Map<List<TaggedWord>, String> featureMap = new HashMap<>();
//
// JavaRDD<MergedVector> top25ClusterRDD = context.parallelize(finalClusterRDD.take(25));
// top25ClusterRDD.foreach((MergedVector t) -> {
// List<TaggedWord> representation = t.getNGramm().taggedWords;
// String feature = t.feature;
// features1.add(feature);
// descriptions1.addAll(t.descriptions);
// t.ngrams.forEach((NGramm listOfTaggedWords) -> {
// featureMap.put(listOfTaggedWords.taggedWords, feature);
// });
// System.out.println(representation.toString() + ": " + t.count.toString() + " | " + t.ngrams.stream()
// .map(n -> n.taggedWords.stream().map(tw -> tw.word()).collect(Collectors.joining(", ")))
// .collect(Collectors.joining(" + ")));
// System.out.println("Feature: " + template.getFeature(representation));
//
// List<String> fbc = new ArrayList<String>(t.descriptions);
// JavaRDD<LabeledPoint> points = tagRDD.map((Tuple2<List<TaggedWord>, Float> rating) -> {
// //Map<List<TaggedWord>, String> fmp = descriptionMapBroadcast.getValue();
// double[] v = new double[fbc.size()];
// List<NGramm> output = new LinkedList<NGramm>();
// BigramThesis.findKGramsEx(3, rating._1, template).forEach(result ->
// output.add(new NGramm(result._1(), template)));
// //List<TaggedWord> output = rating._1().stream().filter((TaggedWord tw) -> (fbc.contains(tw.word()))).collect(Collectors.toList());
// Boolean foundOne = false;
// for (NGramm ngram : output) {
// String description = ngram.template.getDescription(ngram.taggedWords);
// foundOne = true;
// int index = fbc.indexOf(description);
// if (index >= 0) {
// v[index] += 1;
// }
// }
// if (foundOne) {
// return new LabeledPoint((double) (rating._2), Vectors.dense(v));
// } else return null;
// }).filter(point -> point != null);
//
// DataFrame training = sqlContext.createDataFrame(points, LabeledPoint.class);
//
//
// org.apache.spark.ml.regression.LinearRegression lr = new org.apache.spark.ml.regression.LinearRegression();
//
// lr.setMaxIter(20)
// .setRegParam(0.05);
//
// LinearRegressionModel model1 = lr.train(training);
//
// System.out.println("Model 1 was fit using parameters: " + model1.coefficients());
//
// Map<String, Double> map = new HashMap<>();
// double[] coeffs = model1.coefficients().toArray();
// for (int i = 0; i < coeffs.length; i++) {
// map.put(fbc.get(i), coeffs[i]);
// /*featureMap.entrySet().stream().filter((Map.Entry<List<TaggedWord>, String> entry) -> (features.indexOf(entry.getValue()) == index))
// .forEach((Map.Entry<List<TaggedWord>, String> entry2) -> {
// map.put(entry2.getValue(), coeffs[index]);
// });*/ // a beauty of its own quality
//
// }
//
// Iterator<Map.Entry<String, Double>> i = Utility.valueIterator(map);
// Set<String> features3 = new HashSet<>(featureMap.values());
// //for (String feature : features3){
// Map<String, Double> scores = new HashMap<>();
// for (Map.Entry<List<TaggedWord>, String> entry : featureMap.entrySet()) {
// if (entry.getValue().equals(feature)) {
// for (TaggedWord word : entry.getKey()) {
// if (map.containsKey(word.word())) {
// scores.put(word.word(), map.get(word.word()));
// }
// }
// }
// }
// System.out.println(feature + ": ");
// System.out.println(scores);
// });
//JavaRDD<MergedVector> mergedVectorRDD = context.parallelize(clusters, CPUS);
JavaPairRDD<MergedVector, Integer> unsortedClustersRDD = mergedVectorRDD.mapToPair(
(t) -> new Tuple2<>(t, t.count));

JavaPairRDD<MergedVector, Integer> sortedClustersRDD = unsortedClustersRDD.mapToPair(Tuple2::swap)
.sortByKey(false).mapToPair(Tuple2::swap);

JavaRDD<MergedVector> finalClusterRDD = sortedClustersRDD.map(Tuple2::_1);

finalClusterRDD.take(25).forEach((t) -> {
List<TaggedWord> representation = t.getNGramm().taggedWords;
System.out.println(representation.toString() + ": " + t.count.toString() + " | " + t.ngrams.stream()
.map(n -> n.taggedWords.stream().map(tw -> tw.word()).collect(Collectors.joining(", ")))
.collect(Collectors.joining(" + ")));
System.out.println("Feature: " + template.getFeature(representation));
});

Set<String> features1 = new HashSet<>();
Set<String> descriptions1 = new HashSet<>();
Map<List<TaggedWord>, String> featureMap = new HashMap<>();

finalClusterRDD.take(25).forEach((MergedVector cluster) -> {
String feature = cluster.feature;
features1.add(feature);
descriptions1.addAll(cluster.descriptions);
cluster.ngrams.forEach((NGramm listOfTaggedWords) -> {
featureMap.put(listOfTaggedWords.taggedWords, feature);
});
}
);
List<String> descriptions = new ArrayList<>(descriptions1);
Broadcast<List<String>> descriptionBroadcast = context.broadcast(descriptions);
JavaRDD<LabeledPoint> points = tagRDD.map((Tuple2<List<TaggedWord>, Float> rating) -> {
List<String> fbc = descriptionBroadcast.getValue();
//Map<List<TaggedWord>, String> fmp = descriptionMapBroadcast.getValue();
double[] v = new double[fbc.size()];
List<NGramm> output = new LinkedList<NGramm>();
BigramThesis.findKGramsEx(3, rating._1, template).forEach(result ->
output.add(new NGramm(result._1(), template)));
//List<TaggedWord> output = rating._1().stream().filter((TaggedWord tw) -> (fbc.contains(tw.word()))).collect(Collectors.toList());
Boolean foundOne = false;
for (NGramm ngram : output) {
String description = ngram.template.getDescription(ngram.taggedWords);
foundOne = true;
int index = fbc.indexOf(description);
if (index >= 0) {
v[index] = 1;
}
}
if (foundOne) {
return new LabeledPoint((double) (rating._2), Vectors.dense(v));
}else return null;
}).filter(point -> point!= null);

DataFrame training = sqlContext.createDataFrame(points, LabeledPoint.class);


org.apache.spark.ml.regression.LinearRegression lr = new org.apache.spark.ml.regression.LinearRegression();

lr.setMaxIter(20)
.setRegParam(0.05);

LinearRegressionModel model1 = lr.train(training);

System.out.println("Model 1 was fit using parameters: " + model1.coefficients());

Map<String, Double> map = new HashMap<>();
double[] coeffs = model1.coefficients().toArray();
for (int i = 0; i < coeffs.length; i++) {
int index = i;
map.put(descriptions.get(i), coeffs[i]);
/*featureMap.entrySet().stream().filter((Map.Entry<List<TaggedWord>, String> entry) -> (features.indexOf(entry.getValue()) == index))
.forEach((Map.Entry<List<TaggedWord>, String> entry2) -> {
map.put(entry2.getValue(), coeffs[index]);
});*/ // a beauty of its own quality

}

Iterator<Map.Entry<String, Double>> i = Utility.valueIterator(map);
Set<String> features3 = new HashSet<>(featureMap.values());
for (String feature : features3){
Map<String, Double> scores = new HashMap<>();
for (Map.Entry<List<TaggedWord>, String> entry : featureMap.entrySet()) {
if (entry.getValue().equals(feature)) {
for (TaggedWord word : entry.getKey()) {
if (map.containsKey(word.word())) {
scores.put(word.word(), map.get(word.word()));
}
}
}
}
System.out.println(feature + ": ");
System.out.println(scores);
}



Expand Down
6 changes: 3 additions & 3 deletions src/main/java/de/hpi/mmds/transpose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package de.hpi.mmds

import org.apache.spark.mllib.linalg.{Vectors, DenseVector}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
/**
* Taken and slightly adapted from http://stackoverflow.com/a/31862441
*/
object transpose {
def transposeRowMatrix(m: RowMatrix): RowMatrix = {
val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)}
def transposeRowMatrix(m: IndexedRowMatrix): RowMatrix = {
val transposedRowsRDD = m.rows.map{case (row: IndexedRow) => rowToTransposedTriplet(row.vector, row.index)}
.flatMap(x => x) // now we have triplets (newRowIndex, (newColIndex, value))
.groupByKey()
.sortByKey().map(_._2) // sort rows and remove row indexes
Expand Down

0 comments on commit 480166a

Please sign in to comment.