Skip to content

Commit

Permalink
Add option to throttle request rate per thread.
Browse files Browse the repository at this point in the history
Summary:
Simulate poisson process with inter-arrival times exponentially
distributed. Includes unit tests for request rate functionality.

Also work on consolidating random number generation.  The random number
generation in the original implementation creates many Random() instances,
each of which is seeded with the current time at construction time.
This is not really ideal as the Java RNG is fast but not particularly
good quality and the stream of numbers from a single RNG is likely to
be better quality that that from many RNGs with correlated seeds.

Also working towards deterministic random number generation, where
a single initial seed determines the entire request workload. This
allows workloads and tests to be reproducible.  That seed is used for
a high-quality but slow RNG, which then generates seeds for new
faster RNGs that can be passed to threads in such a way that the calls to
each RNG always occur in the same order.  The request workload is now
deterministic except for the id1 selection algorithm in
RealDistribution, which is inherently non-deterministic because it
mutates arrays shared between threads.

Test Plan:
ant test

To check deterministm, ran small request workload multiple types, checked that the count of each type of operation stayed the same (to check determinism)

Reviewers: dhruba

Reviewed By: dhruba

CC: vamsi

Differential Revision: https://reviews.facebook.net/D4695
  • Loading branch information
Tim Armstrong committed Aug 17, 2012
1 parent d2c72be commit 9259d55
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 118 deletions.
11 changes: 11 additions & 0 deletions config/LinkConfigMysql.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,20 @@ randomid2max = 0
# always use this configuration (1) when using HBaseGeneralAtomicityTesting;
id2gen_config = 0

# seed for initial data load random number generation (optional)
# load_random_seed = 12345

# seed for request random number generation (optional)
# request_random_seed = 12345

# read + write requests per thread
requests = 500000

# request rate per thread. <= 0 means unthrottled requests, > 0 limits
# the average request rate to that number of requests per second per thread,
# with the inter-request intervals governed by an exponential distribution
requestrate = 0

# max duration in seconds for the 2nd part of benchmark (1st part is load)
maxtime = 3600

Expand Down
53 changes: 49 additions & 4 deletions src/java/com/facebook/LinkBench/LinkBenchDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -136,6 +140,8 @@ void load() throws IOException, InterruptedException, Throwable {
logger.info("Starting loaders " + nloaders);
logger.debug("Bulk Load setting: " + bulkLoad);

Random masterRandom = createMasterRNG(props, "load_random_seed");

LoadProgress loadTracker = new LoadProgress(logger, maxid1 - startid1);
for (int i = 0; i < nloaders; i++) {
LinkStore store = initStore(Phase.LOAD, i);
Expand All @@ -145,7 +151,8 @@ void load() throws IOException, InterruptedException, Throwable {
loaders.add(l);
}

enqueueLoadWork(chunk_q, startid1, maxid1, nloaders);
enqueueLoadWork(chunk_q, startid1, maxid1, nloaders,
new Random(masterRandom.nextLong()));

// run loaders
loadTracker.startTimer();
Expand All @@ -169,8 +176,43 @@ void load() throws IOException, InterruptedException, Throwable {

}

/**
* Create a new random number generated, optionally seeded to a known
* value from the config file. If seed value not provided, a seed
* is chosen. In either case the seed is logged for later reproducibility.
* @param props
* @param configKey config key for the seed value
* @return
*/
private Random createMasterRNG(Properties props, String configKey) {
long seed;
if (props.containsKey(configKey)) {
seed = Long.parseLong(props.getProperty(configKey));
logger.info("Using configured random seed " + configKey + "=" + seed);
} else {
seed = System.nanoTime() ^ (long)configKey.hashCode();
logger.info("Using random seed " + seed + " since " + configKey
+ " not specified");
}

SecureRandom masterRandom;
try {
masterRandom = SecureRandom.getInstance("SHA1PRNG");
} catch (NoSuchAlgorithmException e) {
logger.warn("SHA1PRNG not available, defaulting to default SecureRandom" +
" implementation");
masterRandom = new SecureRandom();
}
masterRandom.setSeed(ByteBuffer.allocate(8).putLong(seed).array());

// Can be used to check that rng is behaving as expected
logger.debug("First number generated by master " + configKey +
": " + masterRandom.nextLong());
return masterRandom;
}

private void enqueueLoadWork(BlockingQueue<LoadChunk> chunk_q, long startid1,
long maxid1, int nloaders) {
long maxid1, int nloaders, Random rng) {
// Enqueue work chunks. Do it in reverse order as a heuristic to improve
// load balancing, since queue is FIFO and later chunks tend to be larger

Expand All @@ -180,7 +222,7 @@ private void enqueueLoadWork(BlockingQueue<LoadChunk> chunk_q, long startid1,
ArrayList<LoadChunk> stack = new ArrayList<LoadChunk>();
for (long id1 = startid1; id1 < maxid1; id1 += chunkSize) {
stack.add(new LoadChunk(chunk_num, id1,
Math.min(id1 + chunkSize, maxid1)));
Math.min(id1 + chunkSize, maxid1), rng));
chunk_num++;
}

Expand Down Expand Up @@ -212,11 +254,14 @@ void sendrequests() throws IOException, InterruptedException, Throwable {

RequestProgress progress = LinkBenchRequest.createProgress(logger, props);

Random masterRandom = createMasterRNG(props, "request_random_seed");

// create requesters
for (int i = 0; i < nrequesters; i++) {
final int id = i;
LinkStore store = initStore(Phase.REQUEST, i);
LinkBenchRequest l = new LinkBenchRequest(store, props, latencyStats,
progress, i, nrequesters);
progress, new Random(masterRandom.nextLong()), i, nrequesters);
requesters.add(l);
}

Expand Down
5 changes: 3 additions & 2 deletions src/java/com/facebook/LinkBench/LinkBenchDriverMR.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -319,7 +320,7 @@ public void map(IntWritable loaderid,

LinkBenchLoad loader = new LinkBenchLoad(store, props, latencyStats,
loaderid.get(), maxid1 == startid1 + 1,
nloaders.get(), prog_tracker);
nloaders.get(), prog_tracker, new Random());

LinkedList<LinkBenchLoad> tasks = new LinkedList<LinkBenchLoad>();
tasks.add(loader);
Expand Down Expand Up @@ -358,7 +359,7 @@ public void map(IntWritable requesterid,
progress.startTimer();
final LinkBenchRequest requester =
new LinkBenchRequest(store, props, latencyStats, progress,
requesterid.get(), nrequesters.get());
new Random(), requesterid.get(), nrequesters.get());


// Wrap in runnable to handle error
Expand Down
58 changes: 25 additions & 33 deletions src/java/com/facebook/LinkBench/LinkBenchLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class LinkBenchLoad implements Runnable {

private final Logger logger = Logger.getLogger(ConfigUtil.LINKBENCH_LOGGER);

private long randomid2max; // whether id2 should be generated randomly

private long maxid1; // max id1 to generate
Expand All @@ -55,11 +55,6 @@ public class LinkBenchLoad implements Runnable {
long diffShuffle;
long linksloaded;

// Random generators
Random random_data;
Random random_links = new Random(); // for #links
private Random random_id2; // random number generator for id2 if needed

/**
* special case for single hot row benchmark. If singleAssoc is set,
* then make this method not print any statistics message, all statistics
Expand All @@ -81,12 +76,12 @@ public class LinkBenchLoad implements Runnable {
*/
public LinkBenchLoad(LinkStore store, Properties props,
LinkBenchLatency latencyStats, int loaderID, boolean singleAssoc,
int nloaders, LoadProgress prog_tracker) {
int nloaders, LoadProgress prog_tracker, Random rng) {
this(store, props, latencyStats, loaderID, singleAssoc,
new ArrayBlockingQueue<LoadChunk>(2), prog_tracker);

// Just add a single chunk to the queue
chunk_q.add(new LoadChunk(loaderID, startid1, maxid1));
chunk_q.add(new LoadChunk(loaderID, startid1, maxid1, rng));
chunk_q.add(LoadChunk.SHUTDOWN);
}

Expand All @@ -107,7 +102,7 @@ public LinkBenchLoad(LinkStore store,
this.singleAssoc = singleAssoc;
this.chunk_q = chunk_q;
this.prog_tracker = prog_tracker;


/*
* Load settings from properties
Expand Down Expand Up @@ -156,13 +151,6 @@ public LinkBenchLoad(LinkStore store,
*/
// random number generator for id2 if needed
randomid2max = Long.parseLong(props.getProperty("randomid2max"));
random_id2 = (randomid2max > 0) ? (new Random()) : null;

// generate data as a sequence of random characters from 'a' to 'd'
random_data = new Random();

// Random number generators for #links
random_links = new Random();
}

public long getLinksLoaded() {
Expand All @@ -171,7 +159,8 @@ public long getLinksLoaded() {

// Gets the #links to generate for an id1 based on distribution specified
// by nlinks_func, nlinks_config and nlinks_default.
public static long getNlinks(long id1, long startid1, long maxid1,
public static long getNlinks(Random rng,
long id1, long startid1, long maxid1,
int nlinks_func, int nlinks_config,
int nlinks_default) {
long nlinks = nlinks_default; // start with nlinks_default
Expand All @@ -182,7 +171,7 @@ public static long getNlinks(long id1, long startid1, long maxid1,
return nlinks;
case -2 :
// real distribution
nlinks = RealDistribution.getNlinks(id1, startid1, maxid1);
nlinks = RealDistribution.getNlinks(rng, id1, startid1, maxid1);
break;

case -1 :
Expand Down Expand Up @@ -300,7 +289,8 @@ private void processChunk(LoadChunk chunk, boolean bulkLoad,
long nlinks = 0;
long id1;
if (nlinks_func == -2) {
long res[] = RealDistribution.getId1AndNLinks(i, startid1, maxid1);
long res[] = RealDistribution.getId1AndNLinks(chunk.rng, i,
startid1, maxid1);
id1 = res[0];
nlinks = res[1];
if (id1 == i) {
Expand All @@ -310,7 +300,7 @@ private void processChunk(LoadChunk chunk, boolean bulkLoad,
}
} else {
id1 = i;
nlinks = getNlinks(id1, startid1, maxid1,
nlinks = getNlinks(chunk.rng, id1, startid1, maxid1,
nlinks_func, nlinks_config, nlinks_default);
}

Expand All @@ -321,9 +311,8 @@ private void processChunk(LoadChunk chunk, boolean bulkLoad,
" nlinks = " + nlinks);
}

createOutLinks(link, loadBuffer, countLoadBuffer,
id1, nlinks, singleAssoc,
bulkLoad, bulkLoadBatchSize);
createOutLinks(chunk.rng, link, loadBuffer, countLoadBuffer,
id1, nlinks, singleAssoc, bulkLoad, bulkLoadBatchSize);

if (!singleAssoc) {
long nloaded = (i - chunk.start) / chunk.step;
Expand Down Expand Up @@ -352,7 +341,8 @@ private void processChunk(LoadChunk chunk, boolean bulkLoad,
* @param bulkLoad
* @param bulkLoadBatchSize
*/
private void createOutLinks(Link link, ArrayList<Link> loadBuffer,
private void createOutLinks(Random rng,
Link link, ArrayList<Link> loadBuffer,
ArrayList<LinkCount> countLoadBuffer,
long id1, long nlinks, boolean singleAssoc, boolean bulkLoad,
int bulkLoadBatchSize) {
Expand All @@ -366,7 +356,7 @@ private void createOutLinks(Link link, ArrayList<Link> loadBuffer,
// Can't reuse link object
link = initLink();
}
constructLink(link, id1, j, singleAssoc);
constructLink(rng, link, id1, j, singleAssoc);

if (bulkLoad) {
loadBuffer.add(link);
Expand Down Expand Up @@ -421,8 +411,8 @@ private Link initLink() {
* id1
* @param singleAssoc whether we are in singleAssoc mode
*/
private void constructLink(Link link, long id1, long outlink_ix,
boolean singleAssoc) {
private void constructLink(Random rng, Link link, long id1,
long outlink_ix, boolean singleAssoc) {
link.id1 = id1;

// Using random number generator for id2 means we won't know
Expand All @@ -434,12 +424,12 @@ private void constructLink(Link link, long id1, long outlink_ix,
} else {
link.id2 = (randomid2max == 0 ?
(maxid1 + id1 + outlink_ix) :
random_id2.nextInt((int)randomid2max));
rng.nextInt((int)randomid2max));
link.time = System.currentTimeMillis();
// generate data as a sequence of random characters from 'a' to 'd'
link.data = new byte[datasize];
for (int k = 0; k < datasize; k++) {
link.data[k] = (byte)('a' + Math.abs(random_data.nextInt()) % 4);
link.data[k] = (byte)('a' + Math.abs(rng.nextInt(4)));
}
}

Expand Down Expand Up @@ -552,27 +542,29 @@ private void loadCounts(ArrayList<LinkCount> loadBuffer) {
*/
public static class LoadChunk {
public static LoadChunk SHUTDOWN = new LoadChunk(true,
0, 0, 0, 1);
0, 0, 0, 1, null);

public LoadChunk(long id, long start, long end) {
this(false, id, start, end, 1);
public LoadChunk(long id, long start, long end, Random rng) {
this(false, id, start, end, 1, rng);
}
public LoadChunk(boolean shutdown,
long id, long start, long end, long step) {
long id, long start, long end, long step, Random rng) {
super();
this.shutdown = shutdown;
this.id = id;
this.start = start;
this.end = end;
this.step = step;
this.size = (end - start) / step;
this.rng = rng;
}
public final boolean shutdown;
public final long id;
public final long start;
public final long end;
public final long step;
public final long size;
public Random rng;

public String toString() {
if (shutdown) {
Expand Down
Loading

0 comments on commit 9259d55

Please sign in to comment.