Skip to content

Commit

Permalink
fix cache populate incorrect content when numBackgroundThreads>1 (apa…
Browse files Browse the repository at this point in the history
…che#3943)

* fix cache populate incorrect content when numBackgroundThreads>1

* simplify code by using Futures.allAsList and use CountDownLatch in UT

* fix test code style and assert countDownLatch.await()
  • Loading branch information
kaijianding authored and nishantmonu51 committed Feb 17, 2017
1 parent 797488a commit a029b33
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 18 deletions.
33 changes: 18 additions & 15 deletions server/src/main/java/io/druid/client/CachingQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.guava.BaseSequence;
Expand All @@ -47,7 +47,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -152,7 +151,6 @@ public void cleanup(Iterator<T> iterFromMake)
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.<ListenableFuture<?>>newLinkedList());
if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final List<Object> cacheResults = Lists.newLinkedList();

return Sequences.withEffect(
Sequences.map(
Expand All @@ -162,17 +160,23 @@ public void cleanup(Iterator<T> iterFromMake)
@Override
public T apply(final T input)
{
cacheFutures.add(
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
cacheResults.add(cacheFn.apply(input));
}
final SettableFuture<Object> future = SettableFuture.create();
cacheFutures.add(future);
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
future.set(cacheFn.apply(input));
}
catch (Exception e) {
// if there is exception, should setException to quit the caching processing
future.setException(e);
}
)
}
}
);
return input;
}
Expand All @@ -184,8 +188,7 @@ public void run()
public void run()
{
try {
Futures.allAsList(cacheFutures).get();
CacheUtil.populate(cache, mapper, key, cacheResults);
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
}
catch (Exception e) {
log.error(e, "Error while getting future for cache task");
Expand Down
80 changes: 77 additions & 3 deletions server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheStats;
import io.druid.client.cache.MapCache;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -56,6 +58,8 @@
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -64,10 +68,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@RunWith(Parameterized.class)
public class CachingQueryRunnerTest
{
@Parameterized.Parameters(name = "numBackgroundThreads={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0));
}

private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
Expand All @@ -83,6 +98,17 @@ public class CachingQueryRunnerTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};

private ExecutorService backgroundExecutorService;

public CachingQueryRunnerTest(int numBackgroundThreads)
{
if (numBackgroundThreads > 0) {
backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads);
} else {
backgroundExecutorService = MoreExecutors.sameThreadExecutor();
}
}

@Test
public void testCloseAndPopulate() throws Exception
{
Expand Down Expand Up @@ -183,7 +209,52 @@ public void after(boolean isDone, Throwable thrown) throws Exception
}
);

Cache cache = MapCache.create(1024 * 1024);
final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1);
Cache cache = new Cache()
{
private final Map<NamedKey, byte[]> baseMap = new ConcurrentHashMap<>();

@Override
public byte[] get(NamedKey key)
{
return baseMap.get(key);
}

@Override
public void put(NamedKey key, byte[] value)
{
baseMap.put(key, value);
cacheMustBePutOnce.countDown();
}

@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
return null;
}

@Override
public void close(String namespace)
{
}

@Override
public CacheStats getStats()
{
return null;
}

@Override
public boolean isLocal()
{
return true;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
}
};

String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
Expand All @@ -203,7 +274,7 @@ public Sequence run(Query query, Map responseContext)
return resultSeq;
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override
Expand Down Expand Up @@ -237,6 +308,9 @@ public boolean isUseCache()
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes.toString(), results.toString());

// wait for background caching finish
// wait at most 10 seconds to fail the test to avoid block overall tests
Assert.assertTrue("cache must be populated", cacheMustBePutOnce.await(10, TimeUnit.SECONDS));
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);

Expand Down Expand Up @@ -293,7 +367,7 @@ public Sequence run(Query query, Map responseContext)
return Sequences.empty();
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override
Expand Down

0 comments on commit a029b33

Please sign in to comment.