Skip to content

Commit

Permalink
Added hooks for statistics collection.
Browse files Browse the repository at this point in the history
  • Loading branch information
avitorovic committed Oct 20, 2013
1 parent f17ea03 commit 560102f
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 83 deletions.
3 changes: 0 additions & 3 deletions src/plan_runner/storage/AggregationStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
import plan_runner.utilities.SystemParameters;

public class AggregationStorage<V> extends KeyValueStore<Object, V> {
/**
*
*/
private static final long serialVersionUID = 1L;

private static Logger LOG = Logger.getLogger(AggregationStorage.class);
Expand Down
22 changes: 16 additions & 6 deletions src/plan_runner/storage/BerkeleyDBStoreSkewed.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;

import com.sleepycat.bind.tuple.DoubleBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.DatabaseEntry;
import org.apache.log4j.Logger;

import plan_runner.conversion.DateConversion;
import plan_runner.utilities.SystemParameters;

import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.DatabaseEntry;

/*
* Less duplicates/better performance when there is skew
* the work for put is smaller, as no huge strings have to be read
Expand All @@ -24,15 +24,25 @@
* Value = Value
*/
public class BerkeleyDBStoreSkewed<KeyType> extends BerkeleyDBStore<KeyType> {
private static Logger LOG = Logger.getLogger(BerkeleyDBStoreSkewed.class);

private final DateConversion _dc = new DateConversion();

private Random randomGen = new Random();
private final int DISPERSION = 10000;
private int DISPERSION = 10000;

public BerkeleyDBStoreSkewed(Class type, String storagePath) {
super(type, storagePath);
}

public BerkeleyDBStoreSkewed(Class type, String storagePath, Map conf) {
this(type, storagePath);
if(SystemParameters.isExisting(conf, "DIP_BDB_SKEW_DISPERSION")){
DISPERSION = SystemParameters.getInt(conf, "DIP_BDB_SKEW_DISPERSION");
LOG.info("BDB Skewed Dispersion set to " + DISPERSION);
}
}

@Override
public void put(KeyType key, String value) {
incrementSize();
Expand Down
73 changes: 38 additions & 35 deletions src/plan_runner/storm_components/StormDstBDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected void applyOperatorsAndSend(Tuple stormTupleRcv, List<String> tuple) {
LOG.info("First Storage: " + _firstRelationStorage.getStatistics()
+ "\nEnd of First Storage");
LOG.info("Second Storage: " + _secondRelationStorage.getStatistics()
+ "\nEnd of First Storage");
+ "\nEnd of Second Storage");
}

if (MyUtilities.isSending(_hierarchyPosition, _batchOutputMillis))
Expand All @@ -225,40 +225,43 @@ private void createStorage() {
storagePath = SystemParameters.getString(getConf(), "STORAGE_LOCAL_DIR");

// TODO This assumes that there is only one index !!
/*
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Uniform BDB!");
*/

if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Skewed BDB!");


if(MyUtilities.isBDBUniform(getConf())){
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Uniform BDB!");
}else if(MyUtilities.isBDBSkewed(getConf())){
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Skewed BDB!");
}else{
throw new RuntimeException("Unsupported BDB type!");
}

if (_joinPredicate != null)
_existIndexes = true;
else
Expand Down
70 changes: 36 additions & 34 deletions src/plan_runner/storm_components/StormThetaJoinBDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,40 +189,42 @@ private void createStorage() {
storagePath = SystemParameters.getString(getConf(), "STORAGE_LOCAL_DIR");

// TODO We assume that there is only one index !!
/*
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Uniform BDB!");
*/

if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Skewed BDB!");

if(MyUtilities.isBDBUniform(getConf())){
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStore(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Uniform BDB!");
}else if(MyUtilities.isBDBSkewed(getConf())){
if (_typeOfValueIndexed.get(0) instanceof Integer) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Integer.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Double) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Double.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof Date) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(Date.class, storagePath + "/second");
} else if (_typeOfValueIndexed.get(0) instanceof String) {
_firstRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/first");
_secondRelationStorage = new BerkeleyDBStoreSkewed(String.class, storagePath + "/second");
} else
throw new RuntimeException("non supported type");
LOG.info("Storage with Skewed BDB!");
}else{
throw new RuntimeException("Unsupported BDB type!");
}

if (_joinPredicate != null)
_existIndexes = true;
else
Expand Down
17 changes: 12 additions & 5 deletions src/plan_runner/utilities/MyUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,11 +638,18 @@ public static String tupleToString(List<String> tuple, Map conf) {
}

public static boolean isBDB(Map conf){
boolean isBDB = false;
if(SystemParameters.isExisting(conf, "DIP_IS_BDB")){
isBDB = SystemParameters.getBoolean(conf, "DIP_IS_BDB");
}
return isBDB;
return SystemParameters.isExisting(conf, "DIP_IS_BDB")
&& SystemParameters.getBoolean(conf, "DIP_IS_BDB");
}

public static boolean isBDBUniform(Map conf) {
return SystemParameters.isExisting(conf, "DIP_BDB_TYPE")
&& SystemParameters.getString(conf, "DIP_BDB_TYPE").equalsIgnoreCase("UNIFORM");
}

public static boolean isBDBSkewed(Map conf) {
return SystemParameters.isExisting(conf, "DIP_BDB_TYPE")
&& SystemParameters.getString(conf, "DIP_BDB_TYPE").equalsIgnoreCase("SKEWED");
}

public static List<String> listFilesForPath(String dir) {
Expand Down

0 comments on commit 560102f

Please sign in to comment.