forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding Java versions of Pi and LogQuery
- Loading branch information
Showing
3 changed files
with
161 additions
and
0 deletions.
There are no files selected for viewing
113 changes: 113 additions & 0 deletions
113
examples/src/main/java/spark/examples/JavaLogQuery.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package spark.examples; | ||
|
||
import com.google.common.collect.Lists; | ||
import scala.Tuple2; | ||
import scala.Tuple3; | ||
import spark.api.java.JavaPairRDD; | ||
import spark.api.java.JavaRDD; | ||
import spark.api.java.JavaSparkContext; | ||
import spark.api.java.function.Function2; | ||
import spark.api.java.function.PairFunction; | ||
|
||
import java.io.Serializable; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Executes a roll up-style query against Apache logs. | ||
*/ | ||
public class JavaLogQuery { | ||
|
||
public static List<String> exampleApacheLogs = Lists.newArrayList( | ||
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + | ||
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; " + | ||
".NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " + | ||
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" " + | ||
"62.24.11.25 images.com 1358492167 - Whatup", | ||
"10.10.10.10 - \"FRED\" [18/Jan/2013:18:02:37 +1100] \"GET http://images.com/2013/Generic.jpg " + | ||
"HTTP/1.1\" 304 306 \"http:/referall.com\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; " + | ||
"GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR " + | ||
"3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " + | ||
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " + | ||
"0 73.23.2.15 images.com 1358492557 - Whatup"); | ||
|
||
|
||
public static Pattern apacheLogRegex = Pattern.compile("^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); | ||
|
||
/** Tracks the total query count and number of aggregate bytes for a particular group. */ | ||
public static class Stats implements Serializable { | ||
|
||
private int count; | ||
private int numBytes; | ||
|
||
public Stats(int count, int numBytes) { | ||
this.count = count; | ||
this.numBytes = numBytes; | ||
} | ||
public Stats merge(Stats other) { | ||
return new Stats(count + other.count, numBytes + other.numBytes); | ||
} | ||
|
||
public String toString() { | ||
return String.format("bytes=%s\tn=%s", numBytes, count); | ||
} | ||
} | ||
|
||
public static Tuple3<String, String, String> extractKey(String line) { | ||
Matcher m = apacheLogRegex.matcher(line); | ||
List<String> key = Collections.emptyList(); | ||
if (m.find()) { | ||
String ip = m.group(1); | ||
String user = m.group(3); | ||
String query = m.group(5); | ||
if (!user.equalsIgnoreCase("-")) { | ||
return new Tuple3<String, String, String>(ip, user, query); | ||
} | ||
} | ||
return new Tuple3<String, String, String>(null, null, null); | ||
} | ||
|
||
public static Stats extractStats(String line) { | ||
Matcher m = apacheLogRegex.matcher(line); | ||
if (m.find()) { | ||
int bytes = Integer.parseInt(m.group(7)); | ||
return new Stats(1, bytes); | ||
} | ||
else | ||
return new Stats(1, 0); | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
if (args.length == 0) { | ||
System.err.println("Usage: JavaLogQuery <master> [logFile]"); | ||
System.exit(1); | ||
} | ||
|
||
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery", | ||
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); | ||
|
||
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs); | ||
|
||
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() { | ||
@Override | ||
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception { | ||
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); | ||
} | ||
}); | ||
|
||
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { | ||
@Override | ||
public Stats call(Stats stats, Stats stats2) throws Exception { | ||
return stats.merge(stats2); | ||
} | ||
}); | ||
|
||
List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect(); | ||
for (Tuple2 t : output) { | ||
System.out.println(t._1 + "\t" + t._2); | ||
} | ||
System.exit(0); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package spark.examples; | ||
|
||
import spark.api.java.JavaRDD; | ||
import spark.api.java.JavaSparkContext; | ||
import spark.api.java.function.Function; | ||
import spark.api.java.function.Function2; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** Computes an approximation to pi */ | ||
public class JavaSparkPi { | ||
|
||
public static void main(String[] args) throws Exception { | ||
if (args.length == 0) { | ||
System.err.println("Usage: JavaLogQuery <master> [slices]"); | ||
System.exit(1); | ||
} | ||
|
||
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery", | ||
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); | ||
|
||
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2; | ||
int n = 100000 * slices; | ||
List<Integer> l = new ArrayList<Integer>(n); | ||
for (int i = 0; i < n; i++) | ||
l.add(i); | ||
|
||
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); | ||
|
||
int count = dataSet.map(new Function<Integer, Integer>() { | ||
@Override | ||
public Integer call(Integer integer) throws Exception { | ||
double x = Math.random() * 2 - 1; | ||
double y = Math.random() * 2 - 1; | ||
return (x * x + y * y < 1) ? 1 : 0; | ||
} | ||
}).reduce(new Function2<Integer, Integer, Integer>() { | ||
@Override | ||
public Integer call(Integer integer, Integer integer2) throws Exception { | ||
return integer + integer2; | ||
} | ||
}); | ||
|
||
System.out.println("Pi is roughly " + 4.0 * count / n); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters