Skip to content

Commit

Permalink
improvements to coordinator lookups management (apache#3855)
Browse files Browse the repository at this point in the history
* coordinator lookups mgmt improvements

* revert replaces removal, deprecate it instead

* convert and use older specs stored in db

* more tests and updates

* review comments

* add behavior for 0.10.0 to 0.9.2 downgrade

* incorporating more review comments

* remove explicit lock and use LifecycleLock in LookupReferencesManager. use LifecycleLock in LookupCoordinatorManager as well

* wip on LookupCoordinatorManager

* lifecycle lock

* refactor thread creation into utility method

* more review comments addressed

* support smooth roll back of lookup snapshots from 0.10.0 to 0.9.2

* correctly use LifecycleLock in LookupCoordinatorManager and remove synchronization from start/stop

* run lookup mgmt on leader coordinator only

* wip: changes to do multiple start() and stop() on LookupCoordinatorManager

* lifecycleLock fix usage in LookupReferencesManagerTest

* add LifecycleLock back

* fix license hdr

* some fixes

* make LookupReferencesManager.getAllLookupsState() consistent while still being lockless

* address review comments

* addressing leventov's comments

* address charle's comments

* add IOE.java

* for safety in LookupReferencesManager mainThread check for lifecycle started state on each loop in addition to interrupt

* move thread creation utility method to Execs

* fix names

* add tests for LookupCoordinatorManager.lookupManagementLoop()

* add further tests for figuring out toBeLoaded and toBeDropped on LookupCoordinatorManager

* address leventov comments

* remove LookupsStateWithMap and parameterize LookupsState

* address review comments

* address more review comments

* misc fixes
  • Loading branch information
himanshug authored Apr 28, 2017
1 parent b9fd30e commit 5a5a274
Show file tree
Hide file tree
Showing 37 changed files with 3,155 additions and 2,279 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/io/druid/concurrent/Execs.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.concurrent;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -83,9 +85,21 @@ public static ThreadFactory makeThreadFactory(@NotNull String nameFormat, @Nulla
if (priority != null) {
builder.setPriority(priority);
}

return builder.build();
}

public static Thread makeThread(String name, Runnable runnable, boolean isDaemon)
{
Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "name null/empty");
Preconditions.checkNotNull(runnable, "null runnable");

Thread t = new Thread(runnable);
t.setName(name);
t.setDaemon(isDaemon);
return t;
}

/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
Expand Down
267 changes: 134 additions & 133 deletions docs/content/querying/lookups.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,86 +64,123 @@ These endpoints will return one of the following results:

## Configuration propagation behavior
The configuration is propagated to the query serving nodes (broker / router / peon / historical) by the coordinator.
The query serving nodes have an internal API for managing `POST`/`GET`/`DELETE` of lookups.
The coordinator periodically checks the dynamic configuration for changes and, when it detects a change it does the following:
The query serving nodes have an internal API for managing lookups on the node and those are used by the coordinator.
The coordinator periodically checks if any of the nodes need to load/drop lookups and updates them appropriately.

1. Post all lookups for a tier to all Druid nodes within that tier.
2. Delete lookups from a tier which were dropped between the prior configuration values and this one.

If there is no configuration change, the coordinator checks for any nodes which might be new since the last time it propagated lookups and adds all lookups for that node (assuming that node's tier has lookups).
If there are errors while trying to add or update configuration on a node, that node is temporarily skipped until the next management period. The next management period the update will attempt to be propagated again.
If there is an error while trying to delete a lookup from a node (or if a node is down when the coordinator is propagating the config), the delete is not attempted again. In such a case it is possible that a node has lookups that are no longer managed by the coordinator.

## Bulk update
Lookups can be updated in bulk by posting a JSON object to `/druid/coordinator/v1/lookups`. The format of the json object is as follows:

```json
{
"tierName": {
"lookupExtractorFactoryName": {
"type": "someExtractorFactoryType",
"someExtractorField": "someExtractorValue"
"<tierName>": {
"<lookupName>": {
"version": "<version>",
"lookupExtractorFactory": {
"type": "<someExtractorFactoryType>",
"<someExtractorField>": "<someExtractorValue>"
}
}
}
}
```

So a config might look something like:
Note that "version" is an arbitrary string assigned by the user, when making updates to existing lookup then user would need to specify a lexicographically higher version.

For example, a config might look something like:

```json
{
"__default": {
"country_code": {
"type": "map",
"map": {"77483": "United States"}
},
"site_id": {
"type": "cachedNamespace",
"extractionNamespace": {
"type": "jdbc",
"connectorConfig": {
"createTables": true,
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd"
},
"table": "lookupTable",
"keyColumn": "country_id",
"valueColumn": "country_name",
"tsColumn": "timeColumn"
"__default": {
"country_code": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"77483": "United States"
}
}
},
"site_id": {
"version": "v0",
"lookupExtractorFactory": {
"type": "cachedNamespace",
"extractionNamespace": {
"type": "jdbc",
"connectorConfig": {
"createTables": true,
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd"
},
"firstCacheTimeout": 120000,
"injective":true
},
"site_id_customer1": {
"type": "map",
"map": {"847632": "Internal Use Only"}
"table": "lookupTable",
"keyColumn": "country_id",
"valueColumn": "country_name",
"tsColumn": "timeColumn"
},
"site_id_customer2": {
"type": "map",
"map": {"AHF77": "Home"}
"firstCacheTimeout": 120000,
"injective": true
}
},
"site_id_customer1": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"847632": "Internal Use Only"
}
}
},
"realtime_customer1": {
"country_code": {
"type": "map",
"map": {"77483": "United States"}
},
"site_id_customer1": {
"type": "map",
"map": {"847632": "Internal Use Only"}
"site_id_customer2": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"AHF77": "Home"
}
}
}
},
"realtime_customer1": {
"country_code": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"77483": "United States"
}
}
},
"realtime_customer2": {
"country_code": {
"type": "map",
"map": {"77483": "United States"}
},
"site_id_customer2": {
"type": "map",
"map": {"AHF77": "Home"}
"site_id_customer1": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"847632": "Internal Use Only"
}
}
}
},
"realtime_customer2": {
"country_code": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"77483": "United States"
}
}
},
"site_id_customer2": {
"version": "v0",
"lookupExtractorFactory": {
"type": "map",
"map": {
"AHF77": "Home"
}
}
}
}
}
```

Expand All @@ -156,8 +193,13 @@ For example, a post to `/druid/coordinator/v1/lookups/realtime_customer1/site_id

```json
{
"type": "map",
"map": {"847632": "Internal Use Only"}
"version": "v1",
"lookupExtractorFactory": {
"type": "map",
"map": {
"847632": "Internal Use Only"
}
}
}
```

Expand All @@ -170,8 +212,13 @@ Using the prior example, a `GET` to `/druid/coordinator/v1/lookups/realtime_cust

```json
{
"type": "map",
"map": {"AHF77": "Home"}
"version": "v1",
"lookupExtractorFactory": {
"type": "map",
"map": {
"AHF77": "Home"
}
}
}
```

Expand All @@ -189,24 +236,26 @@ A `GET` to `/druid/coordinator/v1/lookups/{tier}` will return a list of known lo

The Peon, Router, Broker, and Historical nodes all have the ability to consume lookup configuration.
There is an internal API these nodes use to list/load/drop their lookups starting at `/druid/listen/v1/lookups`.
These follow the same convention for return values as the cluster wide dynamic configuration.
Usage of these endpoints is quite advanced and not recommended for most users.
The endpoints are as follows:
These follow the same convention for return values as the cluster wide dynamic configuration. Following endpoints
can be used for debugging purposes but not otherwise.

## Get Lookups

A `GET` to the node at `/druid/listen/v1/lookups` will return a json map of all the lookups currently active on the node.
The return value will be a json map of the lookups to their extractor factories.

```json

{
"some_lookup_name": {
"type": "map",
"map": {"77483": "United States"}
"site_id_customer2": {
"version": "v1",
"lookupExtractorFactory": {
"type": "map",
"map": {
"AHF77": "Home"
}
}
}
}

```

## Get Lookup
Expand All @@ -216,70 +265,16 @@ The return value will be the json representation of the factory.

```json
{
"type": "map",
"map": {"77483", "United States"}
}
```

## Bulk Add or Update Lookups

A `POST` to the node at `/druid/listen/v1/lookups` of a JSON map of lookup names to LookupExtractorFactory will cause the service to add or update its lookups.
The return value will be a JSON map in the following format:

```json
{
"status": "accepted",
"failedUpdates": {}
}

```

If a lookup cannot be started, or is left in an undefined state, the lookup in error will be returned in the `failedUpdates` field as per:

```json
{
"status": "accepted",
"failedUpdates": {
"country_code": {
"type": "map",
"map": {"77483": "United States"}
}
}
}

```

The `failedUpdates` field of the return value should be checked if a user is wanting to assure that every update succeeded.

## Add or Update Lookup

A `POST` to the node at `/druid/listen/v1/lookups/some_lookup_name` will behave very similarly to a bulk update.

If `some_lookup_name` is desired to have the LookupExtractorFactory definition of

```json
{
"type": "map",
"map": {"77483": "United States"}
}
```

Then a post to `/druid/listen/v1/lookups/some_lookup_name` will behave the same as a `POST` to `/druid/listen/v1/lookups` of

```json

{
"some_lookup_name": {
"version": "v1",
"lookupExtractorFactory": {
"type": "map",
"map": {"77483": "United States"}
"map": {
"AHF77": "Home"
}
}
}

```

## Remove a Lookup
A `DELETE` to `/druid/listen/v1/lookups/some_lookup_name` will remove that lookup from the node. Success will reflect the ID.

# Configuration
See the [coordinator configuration guilde](../configuration/coordinator.html) for coordinator configuration

Expand All @@ -294,10 +289,9 @@ To configure the behavior of the dynamic configuration manager, use the followin

|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.lookups.hostDeleteTimeout`|Timeout (in ms) PER HOST for processing DELETE requests for dropping lookups|`1000`(1 second)|
|`druid.manager.lookups.hostUpdateTimeout`|Timeout (in ms) PER HOST for processing an update/add (POST) for new or updated lookups|`10000`(10 seconds)|
|`druid.manager.lookups.updateAllTimeout`|Timeout (in ms) TOTAL for processing update/adds on ALL hosts. Safety valve in case too many hosts timeout on their update|`60000`(1 minute)|
|`druid.manager.lookups.period`|How long to pause between management cycles|`30000`(30 seconds)|
|`druid.manager.lookups.hostTimeout`|Timeout (in ms) PER HOST for processing request|`2000`(2 seconds)|
|`druid.manager.lookups.allHostTimeout`|Timeout (in ms) to finish lookup management on all the nodes.|`900000`(15 mins)|
|`druid.manager.lookups.period`|How long to pause between management cycles|`120000`(2 mins)|
|`druid.manager.lookups.threadPoolSize`|Number of service nodes that can be managed concurrently|`10`|

## Saving configuration across restarts
Expand All @@ -312,4 +306,11 @@ It is possible to save the configuration across restarts such that a node will n

Lookup implementations can provide some introspection capabilities by implementing `LookupIntrospectHandler`. User will send request to `/druid/lookups/v1/introspect/{lookupId}` to enable introspection on a given lookup.

For instance you can list all the keys/values of a map based lookup by issuing a `GET` request to `/druid/lookups/v1/introspect/{lookupId}/keys"` or `/druid/lookups/v1/introspect/{lookupId}/values"`
For instance you can list all the keys/values of a map based lookup by issuing a `GET` request to `/druid/lookups/v1/introspect/{lookupId}/keys"` or `/druid/lookups/v1/introspect/{lookupId}/values"`

## Druid version 0.10.0 to 0.10.1 upgrade/downgrade
Overall druid cluster lookups configuration is persisted in metadata store and also individual lookup nodes optionally persist a snapshot of loaded lookups on disk.
If upgrading from druid version 0.10.0 to 0.10.1, then migration for all persisted metadata is handled automatically.
If downgrading from 0.10.1 to 0.9.0 then lookups updates done via coordinator while 0.10.1 was running, would be lost.


Loading

0 comments on commit 5a5a274

Please sign in to comment.