Skip to content

Commit

Permalink
Move off-heap QTL global cache delete lock outside of subclass lock (a…
Browse files Browse the repository at this point in the history
…pache#3597)

* Move off-heap QTL global cache delete lock outside of subclass lock

* Make `delete` thread safe
  • Loading branch information
drcrallen authored and nishantmonu51 committed Oct 27, 2016
1 parent 0799640 commit 78159d7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.query.lookup.namespace.ExtractionNamespace;
import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;

import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
Expand All @@ -49,6 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
*
Expand All @@ -71,16 +73,17 @@ public NamespaceImplData(
final ListenableFuture<?> future;
final ExtractionNamespace namespace;
final String name;
final Object changeLock = new Object();
final AtomicBoolean enabled = new AtomicBoolean(false);
final CountDownLatch firstRun = new CountDownLatch(1);
final AtomicReference<String> latestVersion = new AtomicReference<>(null);
}

private static final Logger log = new Logger(NamespaceExtractionCacheManager.class);
private final ListeningScheduledExecutorService listeningScheduledExecutorService;
protected final ConcurrentMap<String, NamespaceImplData> implData = new ConcurrentHashMap<>();
protected final AtomicLong tasksStarted = new AtomicLong(0);
protected final ServiceEmitter serviceEmitter;
private final ConcurrentHashMap<String, String> lastVersion = new ConcurrentHashMap<>();
private final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceCacheFactory<?>> namespaceFunctionFactoryMap;

public NamespaceExtractionCacheManager(
Expand Down Expand Up @@ -148,10 +151,8 @@ protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws Interrupt
}


protected <T extends ExtractionNamespace> Runnable getPostRunnable(
protected Runnable getPostRunnable(
final String id,
final T namespace,
final ExtractionNamespaceCacheFactory<T> factory,
final String cacheId
)
{
Expand All @@ -165,17 +166,20 @@ public void run()
// was removed
return;
}
synchronized (namespaceDatum.enabled) {
try {
try {
if (!namespaceDatum.enabled.get()) {
// skip because it was disabled
return;
}
synchronized (namespaceDatum.enabled) {
if (!namespaceDatum.enabled.get()) {
// skip because it was disabled
return;
}
swapAndClearCache(id, cacheId);
}
finally {
namespaceDatum.firstRun.countDown();
}
}
finally {
namespaceDatum.firstRun.countDown();
}
}
};
Expand Down Expand Up @@ -221,7 +225,10 @@ public boolean scheduleOrUpdate(
if (log.isDebugEnabled()) {
log.debug("Namespace [%s] needs updated to [%s]", implDatum.namespace, namespace);
}
removeNamespaceLocalMetadata(implDatum);
// Ensure it is not changing state right now.
synchronized (implDatum.changeLock) {
removeNamespaceLocalMetadata(implDatum);
}
schedule(id, namespace);
return true;
}
Expand Down Expand Up @@ -257,59 +264,59 @@ public boolean scheduleAndWait(
return success;
}

@GuardedBy("implDatum.changeLock")
private void cancelFuture(final NamespaceImplData implDatum)
{
synchronized (implDatum.enabled) {
final CountDownLatch latch = new CountDownLatch(1);
final ListenableFuture<?> future = implDatum.future;
Futures.addCallback(
future, new FutureCallback<Object>()
final CountDownLatch latch = new CountDownLatch(1);
final ListenableFuture<?> future = implDatum.future;
Futures.addCallback(
future, new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
@Override
public void onSuccess(Object result)
{
latch.countDown();
}
latch.countDown();
}

@Override
public void onFailure(Throwable t)
{
// Expect CancellationException
latch.countDown();
if (!(t instanceof CancellationException)) {
log.error(t, "Error in namespace [%s]", implDatum.name);
}
@Override
public void onFailure(Throwable t)
{
// Expect CancellationException
latch.countDown();
if (!(t instanceof CancellationException)) {
log.error(t, "Error in namespace [%s]", implDatum.name);
}
}
);
if (!future.isDone()
&& !future.cancel(true)) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up
throw new ISE("Future for namespace [%s] was not able to be canceled", implDatum.name);
}
try {
latch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
);
if (!future.isDone()
&& !future.cancel(true)) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up
throw new ISE("Future for namespace [%s] was not able to be canceled", implDatum.name);
}
try {
latch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}

// Not thread safe
@GuardedBy("implDatum.changeLock")
private boolean removeNamespaceLocalMetadata(final NamespaceImplData implDatum)
{
if (implDatum == null) {
return false;
}
synchronized (implDatum.enabled) {
if (!implDatum.enabled.compareAndSet(true, false)) {
return false;
}
if (!implDatum.future.isDone()) {
cancelFuture(implDatum);
}
return implData.remove(implDatum.name, implDatum);
// "Leader" election for doing the deletion
if (!implDatum.enabled.compareAndSet(true, false)) {
return false;
}
if (!implDatum.future.isDone()) {
cancelFuture(implDatum);
}
return implData.remove(implDatum.name, implDatum);
}

// Optimistic scheduling of updates to a namespace.
Expand All @@ -321,7 +328,7 @@ public <T extends ExtractionNamespace> ListenableFuture<?> schedule(final String
throw new ISE("Cannot find factory for namespace [%s]", namespace);
}
final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString());
return schedule(id, namespace, factory, getPostRunnable(id, namespace, factory, cacheId), cacheId);
return schedule(id, namespace, factory, getPostRunnable(id, cacheId), cacheId);
}

// For testing purposes this is protected
Expand All @@ -336,7 +343,7 @@ protected <T extends ExtractionNamespace> ListenableFuture<?> schedule(
log.debug("Trying to update namespace [%s]", id);
final NamespaceImplData implDatum = implData.get(id);
if (implDatum != null) {
synchronized (implDatum.enabled) {
synchronized (implDatum.changeLock) {
if (implDatum.enabled.get()) {
// We also check at the end of the function, but fail fast here
throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace.toString());
Expand All @@ -345,6 +352,8 @@ protected <T extends ExtractionNamespace> ListenableFuture<?> schedule(
}
final long updateMs = namespace.getPollMs();
final CountDownLatch startLatch = new CountDownLatch(1);
// Must be set before leader election occurs or else runnable will fail
final AtomicReference<NamespaceImplData> implDataAtomicReference = new AtomicReference<>(null);

final Runnable command = new Runnable()
{
Expand All @@ -354,8 +363,13 @@ public void run()
try {
startLatch.await(); // wait for "election" to leadership or cancellation
if (!Thread.currentThread().isInterrupted()) {
final NamespaceImplData implData = implDataAtomicReference.get();
if (implData == null) {
// should never happen
throw new NullPointerException(String.format("No data for namespace [%s]", id));
}
final Map<String, String> cache = getCacheMap(cacheId);
final String preVersion = lastVersion.get(id);
final String preVersion = implData.latestVersion.get();
final Callable<String> runnable = factory.getCachePopulator(id, namespace, preVersion, cache);

tasksStarted.incrementAndGet();
Expand All @@ -364,7 +378,9 @@ public void run()
throw new CancellationException(String.format("Version `%s` already exists", preVersion));
}
if (newVersion != null) {
lastVersion.put(id, newVersion);
if (!implData.latestVersion.compareAndSet(preVersion, newVersion)) {
log.wtf("Somehow multiple threads are updating the same implData for [%s]", id);
}
}
postRunnable.run();
log.debug("Namespace [%s] successfully updated", id);
Expand Down Expand Up @@ -392,7 +408,9 @@ public void run()
future = listeningScheduledExecutorService.schedule(command, 0, TimeUnit.MILLISECONDS);
}

// Do not need to synchronize here as we haven't set enabled to true yet, and haven't released startLatch
final NamespaceImplData me = new NamespaceImplData(future, namespace, id);
implDataAtomicReference.set(me);
final NamespaceImplData other = implData.putIfAbsent(id, me);
if (other != null) {
if (!future.isDone() && !future.cancel(true)) {
Expand Down Expand Up @@ -433,8 +451,6 @@ public void run()

/**
* Clears out resources used by the namespace such as threads. Implementations may override this and call super.delete(...) if they have resources of their own which need cleared.
* <p/>
* This particular method is NOT thread safe, and any impl which is intended to be thread safe should safe-guard calls to this method.
*
* @param ns The namespace to be deleted
*
Expand All @@ -445,25 +461,31 @@ public void run()
public boolean delete(final String ns)
{
final NamespaceImplData implDatum = implData.get(ns);
final boolean deleted = removeNamespaceLocalMetadata(implDatum);
// At this point we have won leader election on canceling this implDatum
if (deleted) {
log.info("Deleting namespace [%s]", ns);
lastVersion.remove(implDatum.name);
return true;
} else {
log.debug("Did not delete namespace [%s]", ns);
if (implDatum == null) {
log.debug("Found no running cache for [%s]", ns);
return false;
}
synchronized (implDatum.changeLock) {
if (removeNamespaceLocalMetadata(implDatum)) {
log.info("Deleted namespace [%s]", ns);
return true;
} else {
log.debug("Did not delete namespace [%s]", ns);
return false;
}
}
}

public String getVersion(String namespace)
{
if (namespace == null) {
return null;
} else {
return lastVersion.get(namespace);
}
final NamespaceImplData implDatum = implData.get(namespace);
if (implDatum == null) {
return null;
}
return implDatum.latestVersion.get();
}

public Collection<String> getKnownIDs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,21 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)
@Override
public boolean delete(final String namespaceKey)
{
// `super.delete` has a synchronization in it, don't call it in the lock.
if (!super.delete(namespaceKey)) {
return false;
}
final Lock lock = nsLocks.get(namespaceKey);
lock.lock();
try {
if (super.delete(namespaceKey)) {
final String mmapDBkey = currentNamespaceCache.remove(namespaceKey);
if (mmapDBkey != null) {
final long pre = tmpFile.length();
mmapDB.delete(mmapDBkey);
log.debug("MapDB file size: pre %d post %d", pre, tmpFile.length());
return true;
} else {
return false;
}
} else {
final String mmapDBkey = currentNamespaceCache.remove(namespaceKey);
if (mmapDBkey == null) {
return false;
}
final long pre = tmpFile.length();
mmapDB.delete(mmapDBkey);
log.debug("MapDB file size: pre %d post %d", pre, tmpFile.length());
return true;
}
finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ public ConcurrentMap<String, String> getCacheMap(String namespaceOrCacheKey)
@Override
public boolean delete(final String namespaceKey)
{
// `super.delete` has a synchronization in it, don't call it in the lock.
if (!super.delete(namespaceKey)) {
return false;
}
final Lock lock = nsLocks.get(namespaceKey);
lock.lock();
try {
return super.delete(namespaceKey) && mapMap.remove(namespaceKey) != null;
return mapMap.remove(namespaceKey) != null;
}
finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,12 @@ lifecycle, new NoopServiceEmitter(),
)
{
@Override
protected <T extends ExtractionNamespace> Runnable getPostRunnable(
protected Runnable getPostRunnable(
final String id,
final T namespace,
final ExtractionNamespaceCacheFactory<T> factory,
final String cacheId
)
{
final Runnable runnable = super.getPostRunnable(id, namespace, factory, cacheId);
final Runnable runnable = super.getPostRunnable(id, cacheId);
cacheUpdateAlerts.putIfAbsent(id, new Object());
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(id);
return new Runnable()
Expand Down

0 comments on commit 78159d7

Please sign in to comment.