Skip to content

Commit

Permalink
https://github.com/Kademi/kademi-dev/issues/12408
Browse files Browse the repository at this point in the history
  • Loading branch information
brad committed Aug 26, 2021
1 parent fb8d58f commit 72233f1
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 108 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>hashsplit4j</groupId>
<artifactId>hashsplit4j-lib</artifactId>
<version>2.5.1</version>
<version>2.5.2</version>
<packaging>jar</packaging>
<name>hashsplit4j-lib</name>
<distributionManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.hashsplit4j.runnables;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.hashsplit4j.api.BlobImpl;
import org.hashsplit4j.store.SimpleFileDb;
import org.slf4j.LoggerFactory;

/**
*
* @author dylan
*/
public class SimpleFileDbQueueRunnable implements Runnable {

private static final org.slf4j.Logger log = LoggerFactory.getLogger(SimpleFileDbQueueRunnable.class);
public static final int MAX_ERRORS = 100; // if too many errors then dont try to save any more

private final SimpleFileDb db;
private final BlockingQueue<BlobImpl> queue;
private final long maxFileSize;

private int errors;

public SimpleFileDbQueueRunnable(final SimpleFileDb db, final int queueCapacity, long maxFileSize) {
this.db = db;
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.maxFileSize = maxFileSize;
}

public long getMaxFileSize() {
return maxFileSize;
}

public long getFileSize() {
return db.getValuesFileSize();
}

public int getQueueSize() {
return queue.size();
}

public int getErrors() {
return errors;
}

/**
* Inserts a blob into this queue if it is possible to do so immediately
* without violating capacity restrictions, returning true upon success and
* false if for any reason the blob is not enqueued (eg no space is
* currently available, or already enqueued)
*
* @param hash
* @param bytes
* @return true upon success and false if no space is currently available
*/
public boolean add(String hash, byte[] bytes) {
if (errors > MAX_ERRORS) {
log.warn("addBlob: Too many errors, will not try to save to MCS");
return false;
} else {
if (db.getValuesFileSize() < maxFileSize) {
log.info("Enqueuing blob={} size={}", hash, bytes.length);
BlobImpl blob = new BlobImpl(hash, bytes);
return this.queue.offer(blob);
} else {
log.info("Cache file has exceeded max size, will not add to cache {}", hash);
return false;
}
}
}

@Override
public void run() {
BlobImpl blob = null;
while (true) {
try {
blob = queue.take();
if (blob != null) {
db.put(blob.getHash(), blob.getBytes());
}
} catch (Exception ex) {
errors++;
if (ex instanceof InterruptedException) {
log.error("An InterruptedException was thrown with queue {}", queue, ex);
throw new RuntimeException(ex);
} else {
log.error("Exception inserting blob into DB: Msg: {}", ex.getMessage(), ex);
}
}
}
}

}
131 changes: 131 additions & 0 deletions src/main/java/org/hashsplit4j/store/AbstractFileDbBlobStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
*/
package org.hashsplit4j.store;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hashsplit4j.runnables.SimpleFileDbQueueRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author brad
*/
public class AbstractFileDbBlobStore {

private static final Logger log = LoggerFactory.getLogger(AbstractFileDbBlobStore.class);

protected final List<SimpleFileDb> dbs = new ArrayList<>();
protected final Set<String> dbNames = new HashSet<>();
protected final Map<String, SimpleFileDb.DbItem> mapOfItems = new HashMap<>();

protected boolean enableAdd;
protected SimpleFileDbQueueRunnable queueRunnable; // if adds are enabled
protected long maxFileSize = 5 * 1000 * 1000000; // 5GB default
protected ExecutorService exService;

private long hits;
private long misses;
private long adds;
private long hitDurationMillis;
private long missDurationMillis;

public Map<String, Object> getCacheStats() {
Map<String, Object> map = new HashMap<>();
map.put("hits", hits);
if (hits > 0) {
map.put("hitAvgMs", hitDurationMillis / hits);
}
map.put("misses", misses);
if (misses > 0) {
map.put("missAvgMs", missDurationMillis / misses);
}
map.put("adds", adds);
return map;
}

public boolean isEnableAdd() {
return enableAdd;
}

public void setEnableAdd(boolean enableAdd) {
this.enableAdd = enableAdd;
}

public long getMaxFileSize() {
return maxFileSize;
}

public void setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
}

public void addDb(SimpleFileDb db) {
dbNames.add(db.getName());
dbs.add(db);
mapOfItems.putAll(db.getMapOfItems());
if (enableAdd) {
if (queueRunnable == null) {
queueRunnable = new SimpleFileDbQueueRunnable(db, 1000, maxFileSize);
exService = Executors.newSingleThreadExecutor();
exService.submit(this.queueRunnable);
}
}
}

protected void saveToDb(String hash, byte[] bytes) {
if (queueRunnable != null) {
adds = incrementLong(adds, 1);
queueRunnable.add(hash, bytes);
}
}

public boolean containsDb(String name) {
return dbNames.contains(name);
}

public int getNumDbs() {
return dbs.size();
}

public int size() {
return mapOfItems.size();
}

public long getMisses() {
return misses;
}

public long getHits() {
return hits;
}

protected void recordHit(long startTime) {
long durationMillis = System.currentTimeMillis() - startTime;
hits = incrementLong(hits, 1);
hitDurationMillis = incrementLong(hitDurationMillis, durationMillis);
}

protected void recordMiss(long startTime) {
long durationMillis = System.currentTimeMillis()- startTime;
misses = incrementLong(misses, 1);
missDurationMillis = incrementLong(missDurationMillis, durationMillis);
}

protected long incrementLong(long val, long amount) {
if (val > Long.MAX_VALUE - 10000) {
val = 0;
} else {
val = val + amount;
}
return val;
}

}
9 changes: 9 additions & 0 deletions src/main/java/org/hashsplit4j/store/SimpleFileDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ public SimpleFileDb(String name, File keysFile, File valuesFile) {
this.valuesFile = valuesFile;
}


public long getKeysFileSize() {
return keysFile.length();
}

public long getValuesFileSize() {
return valuesFile.length();
}

public String getName() {
return name;
}
Expand Down
68 changes: 14 additions & 54 deletions src/main/java/org/hashsplit4j/store/SimpleFileDbBlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
package org.hashsplit4j.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.hashsplit4j.api.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,26 +11,13 @@
*
* @author brad
*/
public class SimpleFileDbBlobStore implements BlobStore {
public class SimpleFileDbBlobStore extends AbstractFileDbBlobStore implements BlobStore {

private static final Logger log = LoggerFactory.getLogger(SimpleFileDbBlobStore.class);

static long incrementLong(long val) {
if (val > Long.MAX_VALUE - 100) {
val = 0;
} else {
val = val + 1;
}
return val;
}

private final BlobStore wrapped;
private final List<SimpleFileDb> dbs = new ArrayList<>();
private final Set<String> dbNames = new HashSet<>();
private final Map<String, SimpleFileDb.DbItem> mapOfItems = new HashMap<>();

private long hits;
private long misses;


public SimpleFileDbBlobStore(BlobStore wrapped) {
this.wrapped = wrapped;
Expand All @@ -50,47 +31,35 @@ public String getBlobKey(String hash) {
return "b-" + hash;
}

public void addDb(SimpleFileDb db) {
dbNames.add(db.getName());
dbs.add(db);
mapOfItems.putAll(db.getMapOfItems());
}

public boolean containsDb(String name) {
return dbNames.contains(name);
}

public int getNumDbs() {
return dbs.size();
}

public int size() {
return mapOfItems.size();
}

@Override
public void setBlob(String hash, byte[] bytes) {
wrapped.setBlob(hash, bytes);
}

@Override
public byte[] getBlob(String hash) {
long nanos = System.nanoTime();
long startTime = System.currentTimeMillis();
String key = getBlobKey(hash);
SimpleFileDb.DbItem item = mapOfItems.get(key);
if (item != null) {
try {
hits = incrementLong(hits);
return item.data();
} catch (IOException ex) {
log.warn("Exception looking up blob {} from simplefiledb: {}", hash, ex);
} finally {
nanos = System.nanoTime() - nanos;
log.info("getBlob: duration={} nanos", nanos);
recordHit(startTime);
}
}
misses = incrementLong(misses);
return wrapped.getBlob(hash);

startTime = System.currentTimeMillis();
byte[] bytes = wrapped.getBlob(hash);
recordMiss(startTime);
if (enableAdd && bytes != null) {
// save to the simple DB unless exceeded size
saveToDb(hash, bytes);
}
return bytes;

}

@Override
Expand All @@ -101,13 +70,4 @@ public boolean hasBlob(String hash) {
}
return wrapped.hasBlob(hash);
}

public long getMisses() {
return misses;
}

public long getHits() {
return hits;
}

}
Loading

0 comments on commit 72233f1

Please sign in to comment.