Skip to content

Commit

Permalink
Added metrics instrumentation around manager to account for mutateMan…
Browse files Browse the repository at this point in the history
…y calls. Refactored metrics initialization.

(cherry picked from commit 278d8b9)

Conflicts:
	titan-core/src/main/java/com/thinkaurelius/titan/diskstorage/Backend.java
  • Loading branch information
mbroecheler committed Sep 4, 2015
1 parent c9272c8 commit d9c3baf
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.thinkaurelius.titan.diskstorage.util.BackendOperation;
import com.thinkaurelius.titan.diskstorage.util.MetricInstrumentedStore;
import com.thinkaurelius.titan.diskstorage.configuration.backend.KCVSConfiguration;
import com.thinkaurelius.titan.diskstorage.util.MetricInstrumentedStoreManager;
import com.thinkaurelius.titan.diskstorage.util.StandardBaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class Backend implements LockerProvider, AutoCloseable {

public static final String ID_STORE_NAME = "titan_ids";

public static final String METRICS_STOREMANAGER_NAME = "storeManager";
public static final String METRICS_MERGED_STORE = "stores";
public static final String METRICS_MERGED_CACHE = "caches";
public static final String METRICS_CACHE_SUFFIX = ".cache";
Expand Down Expand Up @@ -136,7 +138,12 @@ public class Backend implements LockerProvider, AutoCloseable {
public Backend(Configuration configuration) {
this.configuration = configuration;

storeManager = getStorageManager(configuration);
KeyColumnValueStoreManager manager = getStorageManager(configuration);
if (configuration.get(BASIC_METRICS)) {
storeManager = new MetricInstrumentedStoreManager(manager,METRICS_STOREMANAGER_NAME,configuration.get(METRICS_MERGE_STORES),METRICS_MERGED_STORE);
} else {
storeManager = manager;
}
indexes = getIndexes(configuration);
storeFeatures = storeManager.getFeatures();

Expand Down Expand Up @@ -214,13 +221,9 @@ public Locker getLocker(String lockerName) {
*/
public void initialize(Configuration config) {
try {
boolean reportMetrics = configuration.get(BASIC_METRICS);

//EdgeStore & VertexIndexStore
KeyColumnValueStore idStore = storeManager.openDatabase(ID_STORE_NAME);
if (reportMetrics) {
idStore = new MetricInstrumentedStore(idStore, getMetricsStoreName(ID_STORE_NAME));
}

idAuthority = null;
if (storeFeatures.isKeyConsistent()) {
idAuthority = new ConsistentKeyIDAuthority(idStore, storeManager, config);
Expand All @@ -231,11 +234,6 @@ public void initialize(Configuration config) {
KeyColumnValueStore edgeStoreRaw = storeManagerLocking.openDatabase(EDGESTORE_NAME);
KeyColumnValueStore indexStoreRaw = storeManagerLocking.openDatabase(INDEXSTORE_NAME);

if (reportMetrics) {
edgeStoreRaw = new MetricInstrumentedStore(edgeStoreRaw, getMetricsStoreName(EDGESTORE_NAME));
indexStoreRaw = new MetricInstrumentedStore(indexStoreRaw, getMetricsStoreName(INDEXSTORE_NAME));
}

//Configure caches
if (cacheEnabled) {
long expirationTime = configuration.get(DB_CACHE_TIME);
Expand All @@ -259,8 +257,8 @@ public void initialize(Configuration config) {
long edgeStoreCacheSize = Math.round(cacheSizeBytes * EDGESTORE_CACHE_PERCENT);
long indexStoreCacheSize = Math.round(cacheSizeBytes * INDEXSTORE_CACHE_PERCENT);

edgeStore = new ExpirationKCVSCache(edgeStoreRaw,getMetricsCacheName("edgeStore",reportMetrics),expirationTime,cleanWaitTime,edgeStoreCacheSize);
indexStore = new ExpirationKCVSCache(indexStoreRaw,getMetricsCacheName("indexStore",reportMetrics),expirationTime,cleanWaitTime,indexStoreCacheSize);
edgeStore = new ExpirationKCVSCache(edgeStoreRaw,getMetricsCacheName("edgeStore"),expirationTime,cleanWaitTime,edgeStoreCacheSize);
indexStore = new ExpirationKCVSCache(indexStoreRaw,getMetricsCacheName("indexStore"),expirationTime,cleanWaitTime,indexStoreCacheSize);
} else {
edgeStore = new NoKCVSCache(edgeStoreRaw);
indexStore = new NoKCVSCache(indexStoreRaw);
Expand Down Expand Up @@ -377,12 +375,8 @@ public KCVSConfiguration getUserConfiguration() {
return userConfig;
}

private String getMetricsStoreName(String storeName) {
return configuration.get(METRICS_MERGE_STORES) ? METRICS_MERGED_STORE : storeName;
}

private String getMetricsCacheName(String storeName, boolean reportMetrics) {
if (!reportMetrics) return null;
private String getMetricsCacheName(String storeName) {
if (!configuration.get(BASIC_METRICS)) return null;
return configuration.get(METRICS_MERGE_STORES) ? METRICS_MERGED_CACHE : storeName + METRICS_CACHE_SUFFIX;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.thinkaurelius.titan.diskstorage.util;

import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.StoreMetaData;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
import com.thinkaurelius.titan.util.stats.MetricManager;
import static com.thinkaurelius.titan.diskstorage.util.MetricInstrumentedStore.*;

import java.util.List;
import java.util.Map;

import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.METRICS_MERGE_STORES;

/**
* @author Matthias Broecheler ([email protected])
*/
public class MetricInstrumentedStoreManager implements KeyColumnValueStoreManager {

public static final String M_OPEN_DATABASE = "openDatabase";
public static final String M_START_TX = "startTransaction";
public static final String M_CLOSE_MANAGER = "closeManager";


public static final String GLOBAL_PREFIX = "global";

private final KeyColumnValueStoreManager backend;
private final boolean mergeStoreMetrics;
private final String mergedMetricsName;
private final String managerMetricsName;

public MetricInstrumentedStoreManager(KeyColumnValueStoreManager backend, String managerMetricsName,
boolean mergeStoreMetrics, String mergedMetricsName) {
this.backend = backend;
this.mergeStoreMetrics = mergeStoreMetrics;
this.mergedMetricsName = mergedMetricsName;
this.managerMetricsName = managerMetricsName;
}


private String getMetricsStoreName(String storeName) {
return mergeStoreMetrics ? mergedMetricsName : storeName;
}

@Override
public KeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
MetricManager.INSTANCE.getCounter(GLOBAL_PREFIX, managerMetricsName, M_OPEN_DATABASE, M_CALLS).inc();
return new MetricInstrumentedStore(backend.openDatabase(name, metaData),getMetricsStoreName(name));
}

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
if (!txh.getConfiguration().hasGroupName()) {
backend.mutateMany(mutations,txh);
}
String prefix = txh.getConfiguration().getGroupName();

final MetricManager mgr = MetricManager.INSTANCE;
mgr.getCounter(prefix, managerMetricsName, M_MUTATE, M_CALLS).inc();
final Timer.Context tc = mgr.getTimer(prefix, managerMetricsName, M_MUTATE, M_TIME).time();

try {
backend.mutateMany(mutations,txh);
} catch (BackendException e) {
mgr.getCounter(prefix, managerMetricsName, M_MUTATE, M_EXCEPTIONS).inc();
throw e;
} catch (RuntimeException e) {
mgr.getCounter(prefix, managerMetricsName, M_MUTATE, M_EXCEPTIONS).inc();
throw e;
} finally {
tc.stop();
}
}

@Override
public StoreTransaction beginTransaction(BaseTransactionConfig config) throws BackendException {
MetricManager.INSTANCE.getCounter(GLOBAL_PREFIX, managerMetricsName, M_START_TX, M_CALLS).inc();
return backend.beginTransaction(config);
}

@Override
public void close() throws BackendException {
backend.close();
MetricManager.INSTANCE.getCounter(GLOBAL_PREFIX, managerMetricsName, M_CLOSE_MANAGER, M_CALLS).inc();
}

@Override
public void clearStorage() throws BackendException {
backend.clearStorage();
}

@Override
public StoreFeatures getFeatures() {
return backend.getFeatures();
}

@Override
public String getName() {
return backend.getName();
}

@Override
public List<KeyRange> getLocalKeyPartition() throws BackendException {
return backend.getLocalKeyPartition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,32 @@ public void testReadOperations(boolean cache) {

}


public static final List<String> STORE_NAMES =
ImmutableList.of("edgeStore", "vertexIndexStore", "edgeIndexStore", "idStore");
ImmutableList.of(Backend.EDGESTORE_NAME,Backend.INDEXSTORE_NAME,Backend.ID_STORE_NAME,Backend.METRICS_STOREMANAGER_NAME);

@Test
public void testSettingProperty() throws Exception {
metricsPrefix = "metrics1";

mgmt.makePropertyKey("foo").dataType(String.class).cardinality(Cardinality.SINGLE).make();
finishSchema();

TitanVertex v = tx.addVertex();
v.property("foo","bar");
tx.commit();


TitanTransaction tx = graph.buildTransaction().checkExternalVertexExistence(false).groupName(metricsPrefix).start();
v = tx.getVertex(v.longId());
v.property("foo","bus");
tx.commit();
printAllMetrics();
verifyStoreMetrics(STORE_NAMES.get(0));
verifyStoreMetrics(STORE_NAMES.get(1));
verifyStoreMetrics(STORE_NAMES.get(2));
verifyStoreMetrics(STORE_NAMES.get(3), ImmutableMap.of(M_MUTATE, 1l));
}


@Test
@Ignore //TODO: Ignore for now until everything is stable - then do the counting
Expand Down

0 comments on commit d9c3baf

Please sign in to comment.