Skip to content

Commit

Permalink
Lookup cache refactoring (the main part of apache#3667) (apache#3697)
Browse files Browse the repository at this point in the history
* Lookup cache refactoring (the main part of apache#3667)

* Use PowerMock's static methods in NamespaceLookupExtractorFactoryTest

* Fix KafkaLookupExtractorFactoryTest

* Use VisibleForTesting annotation instead of Javadoc comment

* Create a NamespaceExtractionCacheManager separately for each test in NamespaceExtractionCacheManagersTest

* Rename CacheScheduler.NoCache.ENTRY_DISPOSED to ENTRY_CLOSED

* Reduce visibility of NamespaceExtractionCacheManager.cacheCount() and monitor() implementations, and don't run NamespaceExtractionCacheManagerExecutorsTest with off-heap cache (it didn't before)

* In NamespaceLookupExtractorFactory, use safer idiom to check if CacheState is NoCache or VersionedCache

* More logging in CacheHandler constructor and close(), VersionedCache.close()

* PR comments addressed

* Make CacheScheduler.EntryImpl AutoCloseable, avoid 'dispose' verb in comments, logging and naming in CacheScheduler in favor of 'close'

* More Javadoc comments to CacheScheduler

* Fix NPE

* Remove logging in OnHeapNamespaceExtractionCacheManager.expungeCollectedCaches()

* Make NamespaceExtractionCacheManagersTest.testRacyCreation() to have similar load to what it be before the refactoring

* Unwrap NamespaceExtractionCacheManager.scheduledExecutorService from unneeded MoreExecutors.listeningDecorator() and specify that this is ScheduledThreadPoolExecutor, which ensures happens-before between periodic runs of the tasks

* More comments on MapDbCacheDisposer.disposed

* Replace concat with Long.toString()

* Comment on why NamespaceExtractionCacheManager.scheduledExecutorService() returns ScheduledThreadPoolExecutor

* Place logging statements in VersionedCache.close() and CacheHandler.close() after actual closing logic, because logging may fail

* Make JDBCExtractionNamespaceCacheFactory and StaticMapExtractionNamespaceCacheFactory to try to close newly created VersionedCache if population has failed, as it is done already in URIExtractionNamespaceCacheFactory

* Don't close the whole CacheScheduler.Entry, if the cache update task failed

* Replace AtomicLong updateCounter and firstRunLatch with Phaser-based UpdateCounter in CacheScheduler.EntryImpl
  • Loading branch information
leventov authored and b-slim committed Dec 24, 2016
1 parent 0e5bd8b commit 76cb06a
Show file tree
Hide file tree
Showing 27 changed files with 1,766 additions and 1,648 deletions.
10 changes: 10 additions & 0 deletions extensions-core/kafka-extraction-namespace/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,15 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import io.druid.concurrent.Execs;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.server.lookup.namespace.cache.CacheHandler;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
Expand Down Expand Up @@ -82,6 +82,7 @@ public String fromBytes(byte[] bytes)
private final String factoryId;
private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null);
private final AtomicBoolean started = new AtomicBoolean(false);
private CacheHandler cacheHandler;

private volatile ConsumerConnector consumerConnector;
private volatile ListenableFuture<?> future = null;
Expand Down Expand Up @@ -182,7 +183,8 @@ public boolean start()
kafkaProperties.setProperty("group.id", factoryId);
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
final Map<String, String> map = cacheManager.getCacheMap(factoryId);
cacheHandler = cacheManager.createCache();
final Map<String, String> map = cacheHandler.getCache();
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
Expand Down Expand Up @@ -280,7 +282,7 @@ public void onFailure(Throwable t)
LOG.warn("Could not cancel kafka listening thread");
}
LOG.error(e, "Failed to start kafka extraction factory");
cacheManager.delete(factoryId);
cacheHandler.close();
return false;
}

Expand Down Expand Up @@ -319,10 +321,7 @@ public boolean close()
return false;
}
}
if (!cacheManager.delete(factoryId)) {
LOG.error("Error removing [%s] for topic [%s] from cache", factoryId, getKafkaTopic());
return false;
}
cacheHandler.close();
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.StringUtils;
import io.druid.server.lookup.namespace.cache.CacheHandler;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
Expand All @@ -41,6 +41,11 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -53,14 +58,30 @@

import static io.druid.query.lookup.KafkaLookupExtractorFactory.DEFAULT_STRING_DECODER;

@RunWith(PowerMockRunner.class)
@PrepareForTest({
NamespaceExtractionCacheManager.class,
CacheHandler.class
})
@PowerMockIgnore({
"javax.management.*",
"javax.net.ssl.*",
"org.apache.logging.*",
"org.slf4j.*",
"com.sun.*",
"javax.script.*",
"jdk.*"
})
public class KafkaLookupExtractorFactoryTest
{
private static final String TOPIC = "some_topic";
private static final Map<String, String> DEFAULT_PROPERTIES = ImmutableMap.of(
"some.property", "some.value"
);
private final ObjectMapper mapper = new DefaultObjectMapper();
final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
private final NamespaceExtractionCacheManager cacheManager = PowerMock.createStrictMock(NamespaceExtractionCacheManager.class);
private final CacheHandler cacheHandler = PowerMock.createStrictMock(CacheHandler.class);


@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand Down Expand Up @@ -258,9 +279,9 @@ public void testStopWithoutStart()
@Test
public void testStartStop()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
Expand All @@ -270,10 +291,12 @@ public void testStartStop()
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
cacheHandler.close();
EasyMock.expectLastCall();

final AtomicBoolean threadWasInterrupted = new AtomicBoolean(false);
consumerConnector.shutdown();
Expand All @@ -286,7 +309,7 @@ public Object answer() throws Throwable {
}
}).times(2);

EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
Expand All @@ -307,18 +330,20 @@ ConsumerConnector buildConnector(Properties properties)
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertFalse(threadWasInterrupted.get());

EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager, cacheHandler);
}


@Test
public void testStartFailsFromTimeout() throws Exception
{
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
EasyMock.replay(cacheManager);
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
Expand All @@ -343,56 +368,15 @@ ConsumerConnector buildConnector(Properties properties)
Assert.assertFalse(factory.start());
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertTrue(factory.getFuture().isCancelled());
EasyMock.verify(cacheManager);
}

@Test
public void testStopDeleteError()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
EasyMock.eq(
DEFAULT_STRING_DECODER),
EasyMock.eq(DEFAULT_STRING_DECODER)
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(false).once();
consumerConnector.shutdown();
EasyMock.expectLastCall().anyTimes();

EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("zookeeper.connect", "localhost")
)
{
@Override
ConsumerConnector buildConnector(Properties properties)
{
return consumerConnector;
}
};
Assert.assertTrue(factory.start());
Assert.assertFalse(factory.close());
EasyMock.verify(cacheManager, kafkaStream, consumerConnector, consumerIterator);
PowerMock.verify(cacheManager, cacheHandler);
}


@Test
public void testStartStopStart()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
Expand All @@ -402,13 +386,15 @@ public void testStartStopStart()
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
consumerConnector.shutdown();
EasyMock.expectLastCall().times(2);
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
Expand All @@ -424,15 +410,15 @@ ConsumerConnector buildConnector(Properties properties)
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertFalse(factory.start());
EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager, cacheHandler);
}

@Test
public void testStartStartStop()
{
final KafkaStream<String, String> kafkaStream = EasyMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = EasyMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = EasyMock.createStrictMock(ConsumerConnector.class);
final KafkaStream<String, String> kafkaStream = PowerMock.createStrictMock(KafkaStream.class);
final ConsumerIterator<String, String> consumerIterator = PowerMock.createStrictMock(ConsumerIterator.class);
final ConsumerConnector consumerConnector = PowerMock.createStrictMock(ConsumerConnector.class);
EasyMock.expect(consumerConnector.createMessageStreamsByFilter(
EasyMock.anyObject(TopicFilter.class),
EasyMock.anyInt(),
Expand All @@ -442,13 +428,15 @@ public void testStartStartStop()
)).andReturn(ImmutableList.of(kafkaStream)).once();
EasyMock.expect(kafkaStream.iterator()).andReturn(consumerIterator).anyTimes();
EasyMock.expect(consumerIterator.hasNext()).andAnswer(getBlockingAnswer()).anyTimes();
EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
.andReturn(new ConcurrentHashMap<String, String>())
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<String, String>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
consumerConnector.shutdown();
EasyMock.expectLastCall().times(3);
EasyMock.replay(cacheManager, kafkaStream, consumerConnector, consumerIterator);
PowerMock.replay(cacheManager, cacheHandler, kafkaStream, consumerConnector, consumerIterator);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
Expand All @@ -467,54 +455,54 @@ ConsumerConnector buildConnector(Properties properties)
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager, cacheHandler);
}

@Test
public void testStartFailsOnMissingConnect()
{
expectedException.expectMessage("zookeeper.connect required property");
EasyMock.replay(cacheManager);
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.<String, String>of()
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager);
}

@Test
public void testStartFailsOnGroupID()
{
expectedException.expectMessage(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
EasyMock.replay(cacheManager);
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("group.id", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager);
}

@Test
public void testStartFailsOnAutoOffset()
{
expectedException.expectMessage(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
EasyMock.replay(cacheManager);
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("auto.offset.reset", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
EasyMock.verify(cacheManager);
PowerMock.verify(cacheManager);
}

@Test
Expand All @@ -531,7 +519,7 @@ public void testFailsGetNotStarted()
@Test
public void testSerDe() throws Exception
{
final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
final NamespaceExtractionCacheManager cacheManager = PowerMock.createStrictMock(NamespaceExtractionCacheManager.class);
final String kafkaTopic = "some_topic";
final Map<String, String> kafkaProperties = ImmutableMap.of("some_key", "some_value");
final long connectTimeout = 999;
Expand Down
10 changes: 10 additions & 0 deletions extensions-core/lookups-cached-global/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,15 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 76cb06a

Please sign in to comment.