Skip to content

Commit

Permalink
change AkkaStateQuery util to query periodically with a timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Jan 15, 2016
1 parent 58bc64e commit d31aef0
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 19 deletions.
3 changes: 3 additions & 0 deletions conf/benchmarkConf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ storm.ackers: 2

#Spark Specific
spark.batchtime: 2000

#Flink specific
group.id: "someString"
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package flink.benchmark.state;

import benchmark.common.advertising.RedisAdCampaignCache;
import flink.benchmark.Utils;
import flink.benchmark.utils.Utils;
import net.minidev.json.JSONObject;
import net.minidev.json.parser.JSONParser;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand Down Expand Up @@ -47,14 +47,17 @@ public static void main(final String[] args) throws Exception {

// load yaml file
Yaml yml = new Yaml(new SafeConstructor());
Map ymlMap = (Map) yml.load(new FileInputStream(args[0]));
Map<String, String> ymlMap = (Map) yml.load(new FileInputStream(args[0]));
/* ymlMap.put("zookeeper.connect", "localhost:"+Integer.toString((Integer)ymlMap.get("zookeeper.port")) );
ymlMap.put("group.id", "abcaaak" + UUID.randomUUID());
ymlMap.put("bootstrap.servers", "localhost:9092");
ymlMap.put("auto.offset.reset", "earliest"); */
String zookeeper = Utils.getZookeeperServers(ymlMap);
ymlMap.put("zookeeper.connect", zookeeper); // set ZK connect for Kafka
ymlMap.put("bootstrap.servers", Utils.getKafkaBrokers(ymlMap));
for(Map.Entry e : ymlMap.entrySet()) {{
e.setValue(e.getValue().toString());
}}

ParameterTool parameters = ParameterTool.fromMap(ymlMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ public static void main(String[] args) throws Exception {
// periodically query
int campaignId = 0;
while(true) {

// query 5 seconds ago
Long time = System.currentTimeMillis() - 5000;
Future<Object> futureResult = Patterns.ask(
queryActor,
new QueryState<>(null, campaigns.get(campaignId++)),
new QueryState<>(time, campaigns.get(campaignId++)),
new Timeout(askTimeout));

Object result = Await.result(futureResult, askTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public void processElement(StreamRecord<Tuple2<String, Long>> streamRecord) thro
CountAndAccessTime previous = window.get(windowEnd);
if (previous == null) {
previous = new CountAndAccessTime();
window.put(windowEnd, previous);
previous.count = 1L;
previous.lastEventTime = timestamp;
} else {
previous.count++;
previous.lastEventTime = Math.max(previous.lastEventTime, timestamp);
}
previous.lastAccessTime = System.currentTimeMillis();
window.put(windowEnd, previous);
}
}

Expand Down Expand Up @@ -255,21 +255,23 @@ public String getValue(Long timestamp, String key) throws WrongKeyPartitionExcep

synchronized (windows) {
Map<Long, CountAndAccessTime> window = windows.get(key);
if (timestamp == null) {
if (window != null) {
// return the latency of the last window:
TreeMap<Long, CountAndAccessTime> orderedMap = new TreeMap<>(window);
Map.Entry<Long, CountAndAccessTime> first = orderedMap.lastEntry();
LOG.info("first {} of {}", first, window);
return Long.toString(first.getValue().lastAccessTime - first.getValue().lastEventTime);
} else {
return "Key is not known. Available campaign IDs " + windows.keySet().toString();
}
}
if (window == null) {
return "Key is not known. Available campaign IDs " + windows.keySet().toString();
}
// campaign id (key) and timestamp are set
if (timestamp == null) {
// return the latency of the last window:
TreeMap<Long, CountAndAccessTime> orderedMap = new TreeMap<>(window);
Map.Entry<Long, CountAndAccessTime> first = orderedMap.lastEntry();
return Long.toString(first.getValue().lastAccessTime - first.getValue().lastEventTime);
} else {
// query with timestamp:
CountAndAccessTime cat = window.get(timestamp % windowSize);
if(cat == null) {
return "Timestamp not available";
}
return Long.toString(cat.lastAccessTime - cat.lastEventTime);
}
/* // campaign id (key) and timestamp are set
long windowStart = timestamp - (timestamp % windowSize);
long windowEnd = windowStart + windowSize;
Expand All @@ -278,7 +280,7 @@ public String getValue(Long timestamp, String key) throws WrongKeyPartitionExcep
return "Campaign " + key + " has the following windows " + window.toString();
} else {
return "count of campaign: " + key + " in window (" + windowStart + "," + windowEnd + "): " + count.count + " latency " + (count.lastAccessTime - count.lastEventTime);
}
} */
}

}
Expand Down
152 changes: 152 additions & 0 deletions flink-benchmarks/src/main/java/flink/benchmark/utils/AnalyzeTool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package flink.benchmark.utils;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class AnalyzeTool {

public static class Result {

DescriptiveStatistics latencies;
SummaryStatistics throughputs;
Map<String, DescriptiveStatistics> perHostLat;
Map<String, SummaryStatistics> perHostThr;

public Result(DescriptiveStatistics latencies, SummaryStatistics throughputs, Map<String, DescriptiveStatistics> perHostLat, Map<String, SummaryStatistics> perHostThr) {
this.latencies = latencies;
this.throughputs = throughputs;
this.perHostLat = perHostLat;
this.perHostThr = perHostThr;
}
}

public static Result analyze(String file, List<String> toIgnore) throws FileNotFoundException {
Scanner sc = new Scanner(new File(file));

String l;
Pattern latencyPattern = Pattern.compile(".*Latency ([0-9]+) ms.*");
Pattern throughputPattern = Pattern.compile(".*That's ([0-9.]+) elements\\/second\\/core.*");
Pattern hostPattern = Pattern.compile("Container: .* on (.+).c.astral-sorter-757..*");
Pattern stormHostPattern = Pattern.compile(".*Client environment:host.name=(.+).c.astral-sorter-757..*");

DescriptiveStatistics latencies = new DescriptiveStatistics();
SummaryStatistics throughputs = new SummaryStatistics();
String currentHost = null;
Map<String, DescriptiveStatistics> perHostLat = new HashMap<String, DescriptiveStatistics>();
Map<String, SummaryStatistics> perHostThr = new HashMap<String, SummaryStatistics>();

while( sc.hasNextLine()) {
l = sc.nextLine();
// ---------- host ---------------
Matcher hostMatcher = hostPattern.matcher(l);
if(hostMatcher.matches()) {
currentHost = hostMatcher.group(1);
System.err.println("Setting host to "+currentHost);
}
Matcher stormHostMatcher = stormHostPattern.matcher(l);
if(stormHostMatcher.matches()) {
currentHost = stormHostMatcher.group(1);
System.err.println("Setting host to "+currentHost+ " (storm)");
}

if(toIgnore != null && toIgnore.contains(currentHost)) continue;

// ---------- latency ---------------
Matcher latencyMatcher = latencyPattern.matcher(l);
if(latencyMatcher.matches()) {
int latency = Integer.valueOf(latencyMatcher.group(1));
latencies.addValue(latency);

DescriptiveStatistics perHost = perHostLat.get(currentHost);
if(perHost == null) {
perHost = new DescriptiveStatistics();
perHostLat.put(currentHost, perHost);
}
perHost.addValue(latency);
}

// ---------- throughput ---------------
Matcher tpMatcher = throughputPattern.matcher(l);
if(tpMatcher.matches()) {
double eps = Double.valueOf(tpMatcher.group(1));
throughputs.addValue(eps);
// System.out.println("epts = "+eps);

SummaryStatistics perHost = perHostThr.get(currentHost);
if(perHost == null) {
perHost = new SummaryStatistics();
perHostThr.put(currentHost, perHost);
}
perHost.addValue(eps);
}
}

return new Result(latencies, throughputs, perHostLat, perHostThr);
}

public static void main(String[] args) throws FileNotFoundException {
Result r1 = analyze(args[0], null);
DescriptiveStatistics latencies = r1.latencies;
SummaryStatistics throughputs = r1.throughputs;
// System.out.println("lat-mean;lat-median;lat-90percentile;lat-95percentile;lat-99percentile;throughput-mean;throughput-max;latencies;throughputs;");
System.out.println("all-machines;" + latencies.getMean() + ";" + latencies.getPercentile(50) + ";" + latencies.getPercentile(90) + ";" + latencies.getPercentile(95) + ";" + latencies.getPercentile(99)+ ";" + throughputs.getMean() + ";" + throughputs.getMax() + ";" + latencies.getN() + ";" + throughputs.getN());

System.err.println("================= Latency (" + r1.perHostLat.size() + " reports ) =====================");
List<Map.Entry<String, DescriptiveStatistics>> orderedPerHostLatency = new ArrayList<Map.Entry<String, DescriptiveStatistics>>();

for(Map.Entry<String, DescriptiveStatistics> entry : r1.perHostLat.entrySet()) {
System.err.println("====== "+entry.getKey()+" (entries: "+entry.getValue().getN()+") =======");
System.err.println("Mean latency " + entry.getValue().getMean());
System.err.println("Median latency " + entry.getValue().getPercentile(50));
orderedPerHostLatency.add(entry);
}

System.err.println("================= Throughput ("+r1.perHostThr.size()+" reports ) =====================");
for(Map.Entry<String, SummaryStatistics> entry : r1.perHostThr.entrySet()) {
System.err.println("====== "+entry.getKey()+" (entries: "+entry.getValue().getN()+")=======");
System.err.println("Mean throughput " + entry.getValue().getMean());
}

Collections.sort(orderedPerHostLatency, new Comparator<Map.Entry<String, DescriptiveStatistics>>() {
@Override
public int compare(Map.Entry<String, DescriptiveStatistics> o1, Map.Entry<String, DescriptiveStatistics> o2) {
if (o1.getValue().getMean() < o2.getValue().getMean()) {
return 1;
} else {
return -1;
}
}
});

List<Map.Entry<String, DescriptiveStatistics>> statsToIgnore = orderedPerHostLatency.subList(0, 2);
List<String> toIgnore = new ArrayList<String>();
System.err.println("============= HOSTS TO IGNORE (num: "+statsToIgnore.size()+") ============== ");
for(Map.Entry<String, DescriptiveStatistics> entry : statsToIgnore) {
System.err.println("====== "+entry.getKey()+" (entries: "+entry.getValue().getN()+") =======");
System.err.println("Mean latency " + entry.getValue().getMean());
System.err.println("Median latency " + entry.getValue().getPercentile(50));
toIgnore.add(entry.getKey());
}

Result finalResult = analyze(args[0], toIgnore);
latencies = finalResult.latencies;
throughputs = finalResult.throughputs;

System.out.println("-2-machines;" + latencies.getMean() + ";" + latencies.getPercentile(50) + ";" + latencies.getPercentile(90) + ";" + latencies.getPercentile(95) + ";" + latencies.getPercentile(99)+ ";" + throughputs.getMean() + ";" + throughputs.getMax() + ";" + latencies.getN() + ";" + throughputs.getN());



}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package flink.benchmark;
package flink.benchmark.utils;

import java.util.List;
import java.util.Map;
Expand Down

0 comments on commit d31aef0

Please sign in to comment.