Skip to content

Commit

Permalink
[QTL] Implement LookupExtractorFactory of namespaced lookup (apache#2926
Browse files Browse the repository at this point in the history
)

* support LookupReferencesManager registration of namespaced lookup and eliminate static configurations for lookup from namespecd lookup extensions

- druid-namespace-lookup and druid-kafka-extraction-namespace are modified
- However, druid-namespace-lookup still has configuration about ON/OFF
  HEAP cache manager selection, which is not namespace wide
  configuration but node wide configuration as multiple namespace shares
  the same cache manager

* update KafkaExtractionNamespaceTest to reflect argument signature changes

* Add more synchronization functionality to NamespaceLookupExtractorFactory

* Remove old way of using extraction namespaces

* resolve compile error by supporting LookupIntrospectHandler

* Remove kafka lookups

* Remove unused stuff

* Fix start and stop behavior to be consistent with new javadocs

* Remove unused strings

* Add timeout option

* Address comments on configurations and improve docs

* Add more options and update hash key and replaces

* Move monitoring to the overriding classes

* Add better start/stop logging

* Remove old docs about namespace names

* Fix bad comma

* Add `@JsonIgnore` to lookup factory

* Address code review comments

* Remove ExtractionNamespace from module json registration

* Fix problems with naming and initialization. Add tests

* Optimize imports / reformat

* Fix future not being properly cancelled on failed initial scheduling

* Fix delete returns

* Add more docs about whole introspection

* Add `/version` introspection point for lookups

* Add more tests and address comments

* Add StaticMap extraction namespace for testing. Also add a bunch of tests

* Move cache system property to `druid.lookup.namespace.cache.type`

* Make VERSION lower case

* Change poll period to 0ms  for StaticMap

* Move cache key to bytebuffer

* Change hashCode and equals on static map extraction fn

* Add more comments on StaticMap

* Address comments

* Make scheduleAndWait use a latch

* Sanity renames and fix imports

* Remove extra info in docs

* Fix review comments

* Strengthen failure on start from warn to error

* Address comments

* Rename namespace-lookup to lookups-cached-global

* Fix injective mis-naming
* Also add serde test
  • Loading branch information
drcrallen authored and fjy committed May 24, 2016
1 parent 0ac1b27 commit 8024b91
Show file tree
Hide file tree
Showing 43 changed files with 2,128 additions and 1,355 deletions.
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-namespace-lookup</argument>
<argument>io.druid.extensions:druid-lookups-cached-global</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>
<argument>-c</argument>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ layout: doc_page
Lookups are an <a href="../experimental.html">experimental</a> feature.
</div>

Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` and `druid-kafka-extraction-namespace` as an extension.
Make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-global` and `druid-kafka-extraction-namespace` as an extension.

If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8) as a LookupExtractorFactory.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@ layout: doc_page
Lookups are an <a href="../experimental.html">experimental</a> feature.
</div>

Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` as an extension.
Make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-global` as an extension.

## Configuration
<div class="note caution">
Static configuration is no longer supported. Only cluster wide configuration is supported
</div>

Cached namespace lookups are appropriate for lookups which are not possible to pass at query time due to their size,
or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers,
and are small enough to reasonably populate on a node. This usually means tens to tens of thousands of entries per lookup.

Namespaced lookups are appropriate for lookups which are not possible to pass at query time due to their size,
or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers.
Namespaced lookups can be specified as part of the runtime properties file. The property is a list of the namespaces
described as per the sections on this page. For example:
Cached namespace lookups all draw from the same cache pool, allowing each node to have a fixed cache pool that can be used by namespace lookups.

Cached namespace lookups can be specified as part of the [cluster wide config for lookups](../../querying/lookups.html) as a type of `cachedNamespace`

```json
druid.query.extraction.namespace.lookups=
[
{
{
"type": "cachedNamespace",
"extractionNamespace": {
"type": "uri",
"namespace": "some_uri_lookup",
"uri": "file:/tmp/prefix/",
Expand All @@ -33,7 +39,14 @@ described as per the sections on this page. For example:
},
"pollPeriod": "PT5M"
},
{
"firstCacheTimeout": 0
}
```

```json
{
"type": "cachedNamespace",
"extractionNamespace": {
"type": "jdbc",
"namespace": "some_jdbc_lookup",
"connectorConfig": {
Expand All @@ -46,12 +59,21 @@ described as per the sections on this page. For example:
"keyColumn": "mykeyColumn",
"valueColumn": "MyValueColumn",
"tsColumn": "timeColumn"
}
]
},
"firstCacheTimeout": 120000,
"injective":true
}
```

The parameters are as follows
|Property|Description|Required|Default|
|--------|-----------|--------|-------|
|`extractionNamespace`|Specifies how to populate the local cache. See below|Yes|-|
|`firstCacheTimeout`|How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait|No|`60000` (1 minute)|
|`injective`|If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to `true`|No|`false`|

Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes:
`druid-namespace-lookup`
`druid-lookups-cached-global`

## Cache Settings

Expand All @@ -60,11 +82,15 @@ setting namespaces (broker, peon, historical)

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.extraction.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
|`druid.lookup.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|

The cache is populated in different ways depending on the settings below. In general, most namespaces employ
a `pollPeriod` at the end of which time they poll the remote resource of interest for updates.

`onHeap` uses `ConcurrentMap`s in the java heap, and thus affects garbage collection and heap sizing.
`offHeap` uses a 10MB on-heap buffer and MapDB using memory-mapped files in the java temporary directory.
So if total `cachedNamespace` lookup size is in excess of 10MB, the extra will be kept in memory as page cache, and paged in and out by general OS tunings.

# Supported Lookups

For additional lookups, please see our [extensions list](../extensions.html).
Expand All @@ -76,27 +102,25 @@ The remapping values for each namespaced lookup can be specified by a json objec
```json
{
"type":"uri",
"namespace":"some_lookup",
"uri": "s3://bucket/some/key/prefix/renames-0003.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
"pollPeriod":"PT5M",
"pollPeriod":"PT5M"
}
```

```json
{
"type":"uri",
"namespace":"some_lookup",
"uriPrefix": "s3://bucket/some/key/prefix/",
"fileRegex":"renames-[0-9]*\\.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
"pollPeriod":"PT5M",
"pollPeriod":"PT5M"
}
```
|Property|Description|Required|Default|
Expand Down Expand Up @@ -250,3 +274,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"pollPeriod":600000
}
```

# Introspection

Cached namespace lookups have introspection points at `/keys` and `/values` which return a complete set of the keys and values (respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup.
2 changes: 1 addition & 1 deletion docs/content/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Core extensions are maintained by Druid committers.
|druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|
|druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)|
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|druid-namespace-lookup|Required module for [lookups](../querying/lookups.html).|[link](../development/extensions-core/namespaced-lookup.html)|
|druid-lookups-cached-global|Required module for [lookups](../querying/lookups.html).|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
|postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|
Expand Down
2 changes: 1 addition & 1 deletion examples/conf/druid/_common/common.runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# This is not the full list of Druid extensions, but common ones that people often use. You may need to change this list
# based on your particular setup.
druid.extensions.loadList=["druid-kafka-eight", "druid-s3-extensions", "druid-histogram", "druid-datasketches", "druid-namespace-lookup", "mysql-metadata-storage"]
druid.extensions.loadList=["druid-kafka-eight", "druid-s3-extensions", "druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "mysql-metadata-storage"]

# If you have a different version of Hadoop, place your Hadoop client jar files in your hadoop-dependencies directory
# and uncomment the line below to point to your directory.
Expand Down
2 changes: 1 addition & 1 deletion extensions-core/kafka-extraction-namespace/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-namespace-lookup</artifactId>
<artifactId>druid-lookups-cached-global</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,18 @@
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.ws.rs.GET;
import javax.ws.rs.core.Response;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
Expand All @@ -52,16 +63,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.ws.rs.GET;
import javax.ws.rs.core.Response;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;

@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
Expand Down Expand Up @@ -95,15 +96,15 @@ public String fromBytes(byte[] bytes)
private final long connectTimeout;

@JsonProperty
private final boolean isOneToOne;
private final boolean injective;

@JsonCreator
public KafkaLookupExtractorFactory(
@JacksonInject NamespaceExtractionCacheManager cacheManager,
@JsonProperty("kafkaTopic") final String kafkaTopic,
@JsonProperty("kafkaProperties") final Map<String, String> kafkaProperties,
@JsonProperty("connectTimeout") @Min(0) long connectTimeout,
@JsonProperty("isOneToOne") boolean isOneToOne
@JsonProperty("injective") boolean injective
)
{
this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
Expand All @@ -114,7 +115,7 @@ public KafkaLookupExtractorFactory(
));
this.cacheManager = cacheManager;
this.connectTimeout = connectTimeout;
this.isOneToOne = isOneToOne;
this.injective = injective;
}

public KafkaLookupExtractorFactory(
Expand All @@ -141,9 +142,9 @@ public long getConnectTimeout()
return connectTimeout;
}

public boolean isOneToOne()
public boolean isInjective()
{
return isOneToOne;
return injective;
}

@Override
Expand Down Expand Up @@ -335,7 +336,7 @@ public boolean replaces(@Nullable LookupExtractorFactory other)
return !(getKafkaTopic().equals(that.getKafkaTopic())
&& getKafkaProperties().equals(that.getKafkaProperties())
&& getConnectTimeout() == that.getConnectTimeout()
&& isOneToOne() == that.isOneToOne()
&& isInjective() == that.isInjective()
);
}

Expand All @@ -351,7 +352,7 @@ public LookupExtractor get()
{
final Map<String, String> map = Preconditions.checkNotNull(mapRef.get(), "Not started");
final long startCount = doubleEventCount.get();
return new MapLookupExtractor(map, isOneToOne())
return new MapLookupExtractor(map, isInjective())
{
@Override
public byte[] getCacheKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
Expand Down Expand Up @@ -72,7 +72,7 @@ public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
if ("io.druid.server.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
if ("io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
return cacheManager;
} else {
return null;
Expand Down Expand Up @@ -507,6 +507,31 @@ public void testFailsGetNotStarted()
).get();
}

@Test
public void testSerDe() throws Exception
{
final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
final String kafkaTopic = "some_topic";
final Map<String, String> kafkaProperties = ImmutableMap.of("some_key", "some_value");
final long connectTimeout = 999;
final boolean injective = true;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
kafkaTopic,
kafkaProperties,
connectTimeout,
injective
);
final KafkaLookupExtractorFactory otherFactory = mapper.readValue(
mapper.writeValueAsString(factory),
KafkaLookupExtractorFactory.class
);
Assert.assertEquals(kafkaTopic, otherFactory.getKafkaTopic());
Assert.assertEquals(kafkaProperties, otherFactory.getKafkaProperties());
Assert.assertEquals(connectTimeout, otherFactory.getConnectTimeout());
Assert.assertEquals(injective, otherFactory.isInjective());
}

@Test
public void testDefaultDecoder()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.metamx.common.logger.Logger;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.server.namespace.NamespacedExtractionModule;
import io.druid.server.lookup.namespace.NamespaceExtractionModule;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void configure(Binder binder)
}
},
// These injections fail under IntelliJ but are required for maven
new NamespacedExtractionModule(),
new NamespaceExtractionModule(),
new KafkaExtractionNamespaceModule()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-namespace-lookup</artifactId>
<name>druid-namespace-lookup</name>
<artifactId>druid-lookups-cached-global</artifactId>
<name>druid-lookups-cached-global</name>
<description>Extension to rename Druid dimension values using namespaces</description>

<parent>
Expand Down Expand Up @@ -77,5 +77,10 @@
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* Simple class that takes a `ByteSource` and uses a `Parser<K, V>` to populate a `Map<K, V>`
* The `ByteSource` must be UTF-8 encoded
*
* <p>
* If this is handy for other use cases pleaes move this class into a common module
*/
public class MapPopulator<K, V>
Expand Down
Loading

0 comments on commit 8024b91

Please sign in to comment.