Skip to content
This repository has been archived by the owner on Aug 16, 2024. It is now read-only.

Commit

Permalink
port streamingbench from 5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonwang committed Nov 7, 2016
1 parent fa95533 commit 2efad75
Show file tree
Hide file tree
Showing 87 changed files with 4,843 additions and 69 deletions.
127 changes: 77 additions & 50 deletions autogen/pom.xml
Original file line number Diff line number Diff line change
@@ -1,56 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.intel.hibench</groupId>
<artifactId>hibench</artifactId>
<version>6.0-SNAPSHOT</version>
</parent>

<artifactId>autogen</artifactId>
<packaging>jar</packaging>
<parent>
<groupId>com.intel.hibench</groupId>
<name>HiBench data generation tools</name>
<artifactId>hibench</artifactId>
<version>6.0-SNAPSHOT</version>
</parent>

<dependencies>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.uncommons.maths</groupId>
<artifactId>uncommons-maths</artifactId>
<version>${uncommons-maths.version}</version>
</dependency>
</dependencies>
<artifactId>autogen</artifactId>
<packaging>jar</packaging>
<groupId>com.intel.hibench</groupId>
<name>HiBench data generation tools</name>

<dependencies>
<dependency>
<groupId>com.intel.hibench</groupId>
<artifactId>hibench-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.uncommons.maths</groupId>
<artifactId>uncommons-maths</artifactId>
<version>${uncommons-maths.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.mr2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.mr2.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
126 changes: 126 additions & 0 deletions autogen/src/main/java/com/intel/hibench/streambench/DataGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.streambench;

import com.intel.hibench.common.HiBenchConfig;
import com.intel.hibench.streambench.common.ConfigLoader;
import com.intel.hibench.streambench.common.StreamBenchConfig;
import com.intel.hibench.streambench.util.DataGeneratorConfig;
import com.intel.hibench.streambench.util.KafkaSender;
import com.intel.hibench.streambench.util.RecordSendTask;

import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DataGenerator {

public static void main(String[] args) {
if (args.length < 5) {
System.err.println("args: <ConfigFile> <userVisitsFile> <userVisitsFileOffset> <kMeansFile> <kMeansFileOffset> need to be specified!");
System.exit(1);
}

// initialize variable from configuration and input parameters.
ConfigLoader configLoader = new ConfigLoader(args[0]);

String userVisitsFile = args[1];
long userVisitsFileOffset = Long.parseLong(args[2]);
String kMeansFile = args[3];
long kMeansFileOffset = Long.parseLong(args[4]);

// load properties from config file
String testCase = configLoader.getProperty(StreamBenchConfig.TESTCASE).toLowerCase();
String topic = configLoader.getProperty(StreamBenchConfig.KAFKA_TOPIC);
String brokerList = configLoader.getProperty(StreamBenchConfig.KAFKA_BROKER_LIST);
int intervalSpan = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_INTERVAL_SPAN));
long recordsPerInterval = Long.parseLong(configLoader.getProperty(StreamBenchConfig.DATAGEN_RECORDS_PRE_INTERVAL));
long totalRecords = Long.parseLong(configLoader.getProperty(StreamBenchConfig.DATAGEN_TOTAL_RECORDS));
int totalRounds = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_TOTAL_ROUNDS));
int recordLength = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_RECORD_LENGTH));
String dfsMaster = configLoader.getProperty(HiBenchConfig.DFS_MASTER);
boolean debugMode = Boolean.getBoolean(configLoader.getProperty(StreamBenchConfig.DEBUG_MODE));

DataGeneratorConfig dataGeneratorConf = new DataGeneratorConfig(testCase, brokerList, kMeansFile, kMeansFileOffset,
userVisitsFile, userVisitsFileOffset, dfsMaster, recordLength, intervalSpan, topic, recordsPerInterval,
totalRounds, totalRecords, debugMode);

// Create thread pool and submit producer task
int producerNumber = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_PRODUCER_NUMBER));
ExecutorService pool = Executors.newFixedThreadPool(producerNumber);
for(int i = 0; i < producerNumber; i++) {
pool.execute(new DataGeneratorJob(dataGeneratorConf));
}

// Print out some basic information
System.out.println("============ StreamBench Data Generator ============");
System.out.println(" Interval Span : " + intervalSpan + " ms");
System.out.println(" Record Per Interval : " + recordsPerInterval);
System.out.println(" Record Length : " + recordLength + " bytes");
System.out.println(" Producer Number : " + producerNumber);
if(totalRecords == -1) {
System.out.println(" Total Records : -1 [Infinity]");
} else {
System.out.println(" Total Records : " + totalRecords);
}

if (totalRounds == -1) {
System.out.println(" Total Rounds : -1 [Infinity]");
} else {
System.out.println(" Total Rounds : " + totalRounds);
}
System.out.println(" Kafka Topic : " + topic);
System.out.println("====================================================");
System.out.println("Estimated Speed : ");
long recordsPreSecond = recordsPerInterval * 1000 * producerNumber / intervalSpan ;
System.out.println(" " + recordsPreSecond + " records/second");
double mbPreSecond = (double)recordsPreSecond * recordLength / 1000000;
System.out.println(" " + mbPreSecond + " Mb/second");
System.out.println("====================================================");

pool.shutdown();
}

static class DataGeneratorJob implements Runnable {
DataGeneratorConfig conf;

// Constructor
public DataGeneratorJob(DataGeneratorConfig conf) {
this.conf = conf;
}

@Override
public void run() {
// instantiate KafkaSender
KafkaSender sender;
if(conf.getTestCase().contains("statistics")) {
sender = new KafkaSender(conf.getBrokerList(), conf.getkMeansFile(), conf.getkMeansFileOffset(),
conf.getDfsMaster(), conf.getRecordLength(), conf.getIntervalSpan());
} else {
sender = new KafkaSender(conf.getBrokerList(), conf.getUserVisitsFile(), conf.getUserVisitsFileOffset(),
conf.getDfsMaster(), conf.getRecordLength(), conf.getIntervalSpan());
}

// Schedule timer task
Timer timer = new Timer();
timer.scheduleAtFixedRate(
new RecordSendTask(sender, conf.getTopic(), conf.getRecordsPerInterval(),
conf.getTotalRounds(), conf.getTotalRecords(), conf.getDebugMode(), timer), 0, conf.getIntervalSpan());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.streambench.util;

import org.apache.hadoop.conf.Configuration;

import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Cache the total records in memory.
*/
public class CachedData {

private volatile static CachedData cachedData;

private List<String> data;

private int next;
private int totalRecords;

public static CachedData getInstance(String seedFile, long fileOffset, String dfsMaster) {
if(cachedData == null) {
synchronized (CachedData.class) {
if (cachedData == null) {
cachedData = new CachedData(seedFile, fileOffset, dfsMaster);
}
}
}
return cachedData;
}

private CachedData(String seedFile, long fileOffset, String dfsMaster){
Configuration dfsConf = new Configuration();
dfsConf.set("fs.default.name", dfsMaster);

// read records from seedFile and cache into "data"
data = new ArrayList<String>();
BufferedReader reader = SourceFileReader.getReader(dfsConf, seedFile, fileOffset);
String line = null;
try {
while ((line = reader.readLine()) != null) {
data.add(line);
}
} catch (IOException e) {
System.err.println("Failed read records from Path: " + seedFile);
e.printStackTrace();
}

this.next = 0;
this.totalRecords = data.size();
}

/**
* Loop get record.
* @return the record.
*/
public String getRecord() {
int current = next;
next = (next + 1) % totalRecords;
return data.get(current);
}
}
Loading

0 comments on commit 2efad75

Please sign in to comment.