Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/druid-io/druid
Browse files Browse the repository at this point in the history
  • Loading branch information
fjy committed Jun 29, 2015
2 parents 0d8a526 + 2f522f8 commit e42d5ac
Show file tree
Hide file tree
Showing 36 changed files with 843 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark;


import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
public class ConciseComplementBenchmark
{

// Number of rows to read, the test will read random rows
@Param({"1000", "10000", "100000", "1000000", "1000000"})
int emptyRows;

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void uncompressed(Blackhole blackhole)
{
final ImmutableConciseSet set = ImmutableConciseSet.complement(null, emptyRows);
blackhole.consume(set);
assert (emptyRows == set.size());
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The indexing service also uses its own set of paths. These configs can be includ
If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`.
For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.announcementsPath` will default to `/druid1/announcements` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`.

The following path is used service discovery and are **not** affected by `druid.zk.paths.base` and **must** be specified separately.
The following path is used for service discovery. It is **not** affected by `druid.zk.paths.base` and **must** be specified separately.

|Property|Description|Default|
|--------|-----------|-------|
Expand Down
4 changes: 2 additions & 2 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ The overlord can dynamically change worker behavior.
The JSON object can be submitted to the overlord via a POST request at:

```
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker
```

Optional Header Parameters for auditing the config change can also be specified.
Expand Down Expand Up @@ -153,7 +153,7 @@ Issuing a GET request at the same URL will return the current worker config spec
To view the audit history of worker config issue a GET request to the URL -

```
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
```

default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in overlord runtime.properties.
Expand Down
4 changes: 2 additions & 2 deletions docs/content/design/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ This separation allows each node to only care about what it is best at. By separ

The following diagram shows how queries and data flow through this architecture, and which nodes (and external dependencies, discussed below) are involved:

<img src="../img/druid-dataflow-3.png" width="800"/>
<img src="../../img/druid-dataflow-3.png" width="800"/>

All nodes can be run in some highly available fashion, either as symmetric peers in a share-nothing cluster or as hot-swap failover nodes.

Expand All @@ -51,7 +51,7 @@ Aside from these nodes, there are 3 external dependencies to the system:

The following diagram illustrates the cluster's management layer, showing how certain nodes and dependencies help manage the cluster by tracking and exchanging metadata:

<img src="../img/druid-manage-1.png" width="800"/>
<img src="../../img/druid-manage-1.png" width="800"/>


### Segments and Data Storage
Expand Down
2 changes: 1 addition & 1 deletion docs/content/design/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Overlords and middle managers may run on the same node or across multiple nodes
Indexing Service Overview
-------------------------

![Indexing Service](../img/indexing_service.png "Indexing Service")
![Indexing Service](../../img/indexing_service.png "Indexing Service")

<!--
Preamble
Expand Down
2 changes: 1 addition & 1 deletion docs/content/design/realtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Segment Propagation

The segment propagation diagram for real-time data ingestion can be seen below:

![Segment Propagation](../img/segmentPropagation.png "Segment Propagation")
![Segment Propagation](../../img/segmentPropagation.png "Segment Propagation")

You can read about the various components shown in this diagram under the Architecture section (see the menu on the right). Note that some of the names are now outdated.

Expand Down
130 changes: 127 additions & 3 deletions docs/content/design/segments.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,125 @@ layout: doc_page
Segments
========

Druid segments contain data for a time interval, stored as separate columns. Dimensions (string columns) have inverted indexes associated with them for each dimension value. Metric columns are LZ4 compressed.

Druid stores its index in *segment files*, which are partitioned by
time. In a basic setup, one segment file is created for each time
interval, where the time inteval is configurable in the
`segmentGranularity` parameter of the `granularitySpec`, which is
documented [here](../ingestion/batch-ingestion.html). For druid to
operate well under heavy query load, it is important for the segment
file size to be within the recommended range of 300mb-700mb. If your
segment files are larger than this range, then consider either
changing the the granularity of the time interval or partitioning your
data and tweaking the `targetPartitionSize` in your `partitioningSpec`
(a good starting point for this parameter is 5 million rows). See the
sharding section below and the 'Partitioning specification' section of
the [Batch ingestion](../ingestion/batch-ingestion.html) documentation
for more information.

### A segment file's core data structures
Here we describe the internal structure of segment files, which is
essentially *columnar*: the data for each column is laid out in
separate data structures. By storing each column separately, Druid can
decrease query latency by scanning only those columns actually needed
for a query. There are three basic column types: the timestamp
column, dimension columns, and metric columns, as illustrated in the
image below:

![Druid column types](../../img/druid-column-types.png "Druid Column Types")

The timestamp and metric columns are simple: behind the scenes each of
these is an array of integer or floating point values compressed with
LZ4. Once a query knows which rows it needs to select, it simply
decompresses these, pulls out the relevant rows, and applies the
desired aggregation operator. As with all columns, if a query doesn’t
require a column, then that column’s data is just skipped over.

Dimensions columns are different because they support filter and
group-by operations, so each dimension requires the following
three data structures:

1. A dictionary that maps values (which are always treated as strings) to integer IDs,
2. For each distinct value in the column, a bitmap that indicates which rows contain that value, and
3. A list of the column’s values, encoded using the dictionary in 1.

Why these three data structures? The dictionary simply maps string
values to integer ids so that the values in 2 and 3 can be
represented compactly. The bitmaps in 2 -- also known as *inverted
indexes* allow for quick filtering operations (specifically, bitmaps
are convenient for quickly applying AND and OR operators). Finally,
the list of values in 3 are needed for *group by* and *TopN*
queries. In other words, queries that solely aggregate metrics based
on filters do not need to touch the list of dimension values stored in
3.

To get a concrete sense of these data structures, consider the ‘page’
column from the example data above. The three data structures that
represent this dimension are illustrated in the diagram below.

```
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}
2: Column data
[0,
0,
1,
1]
3: Bitmaps - one for each unique value of the column
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,0,1,1]
```

Note that the bitmap is different from the first two data structures:
whereas the first two grow linearly in the size of the data (in the
worst case), the size of the bitmap section is the product of data
size * column cardinality. Compression will help us here though
because we know that each row will have only non-zero entry in a only
a single bitmap. This means that high cardinality columns will have
extremely sparse, and therefore highly compressible, bitmaps. Druid
exploits this using compression algorithms that are specially suited
for bitmaps, such as roaring bitmap compression.

### Multi-value columns

If a data source makes use of multi-value columns, then the data
structures within the segment files look a bit different. Let's
imagine that in the example above, the second row were tagged with
both the 'Ke$ha' *and* 'Justin Bieber' topics. In this case, the three
data structures would now look as follows:

```
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}
2: Column data
[0,
[0,1], <--Row value of multi-value column can have array of values
1,
1]
3: Bitmaps - one for each unique value
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,1,1,1]
^
|
|
Multi-value column has multiple non-zero entries
```

Note the changes to the second row in the column data and the Ke$ha
bitmap. If a row has more than one value for a column, its entry in
the 'column data' is an array of values. Additionally, a row with *n*
values in a column columns will have *n* non-zero valued entries in
that column's bitmaps.

Naming Convention
-----------------
Expand All @@ -17,7 +135,7 @@ datasource_intervalStart_intervalEnd_version_partitionNum
Segment Components
------------------

A segment is comprised of several files, listed below.
Behind the scenes, a segment is comprised of several files, listed below.

* `version.bin`

Expand Down Expand Up @@ -52,4 +170,10 @@ Sharding Data to Create Segments

### Sharding Data by Dimension

If the cumulative total number of rows for the different values of a given column exceed some configurable threshold, multiple segments representing the same time interval for the same datasource may be created. These segments will contain some partition number as part of their identifier. Sharding by dimension reduces some of the the costs associated with operations over high cardinality dimensions.
If the cumulative total number of rows for the different values of a
given column exceed some configurable threshold, multiple segments
representing the same time interval for the same datasource may be
created. These segments will contain some partition number as part of
their identifier. Sharding by dimension reduces some of the the costs
associated with operations over high cardinality dimensions. For more
information on sharding, see the ingestion documentat
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ layout: doc_page
# Integrating Druid With Other Technologies
This page discusses how we can integrate druid with other technologies. Event streams can be stored in a distributed queue like Kafka, then it can be streamed to a distributed realtime computation system like Twitter Storm / Samza and then it can be feed into Druid via Tranquility plugin. With Tranquility, Middlemanager & Peons will act as a realtime node and they handle realtime queries, segment handoff and realtime indexing.

<img src="../img/druid-production.png" width="800"/>
<img src="../../img/druid-production.png" width="800"/>
51 changes: 51 additions & 0 deletions docs/content/ingestion/firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,58 @@ See [Examples](../tutorials/examples.html). This firehose creates a stream of ra
#### RabbitMqFirehose

This firehose ingests events from a define rabbit-mq queue.
<br>
**Note:** Add **amqp-client-3.2.1.jar** to lib directory of druid to use this firehose.
<br>
A sample spec for rabbitmq firehose:

```json
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"port": "5672",
"username": "test-dude",
"password": "test-word",
"virtualHost": "test-vhost",
"uri": "amqp://mqserver:1234/vhost",
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false",

"maxRetries": "10",
"retryIntervalSeconds": "1",
"maxDurationSeconds": "300"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
}
```
|property|description|Default|required?|
|--------|-----------|---------|
|host|The hostname of the RabbitMQ broker to connect to|localhost|no|
|port|The port number to connect to on the RabbitMQ broker|5672|no|
|username|The username to use to connect to RabbitMQ|guest|no|
|password|The password to use to connect to RabbitMQ|guest|no|
|virtualHost|The virtual host to connect to|/|no|
|uri|The URI string to use to connect to RabbitMQ| |no|
|exchange|The exchange to connect to| |yes|
|queue|The queue to connect to or create| |yes|
|routingKey|The routing key to use to bind the queue to the exchange| |yes|
|durable|Whether the queue should be durable|false|no|
|exclusive|Whether the queue should be exclusive|false|no|
|autoDelete|Whether the queue should auto-delete on disconnect|false|no|
|maxRetries|The max number of reconnection retry attempts| |yes|
|retryIntervalSeconds|The reconnection interval| |yes|
|maxDurationSeconds|The max duration of trying to reconnect| |yes|
#### LocalFirehose

This Firehose can be used to read the data from files on local disk.
Expand Down
1 change: 0 additions & 1 deletion docs/content/misc/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"maxRowsInMemory": 500000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
Expand Down
1 change: 0 additions & 1 deletion docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Available Metrics
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|

### Historical
Expand Down
6 changes: 3 additions & 3 deletions docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ All JavaScript functions must return numerical values.
```json
{
"type": "javascript",
"name": "sum(log(x)/y) + 10",
"name": "sum(log(x)*y) + 10",
"fieldNames": ["x", "y"],
"fnAggregate" : "function(current, a, b) { return current + (Math.log(a) * b); }",
"fnCombine" : "function(partialA, partialB) { return partialA + partialB; }",
Expand Down Expand Up @@ -137,11 +137,11 @@ SELECT COUNT(DISTINCT(value)) FROM (

#### Cardinality by row

When setting `byRow` to `true` it computes the cardinality by row, i.e. the cardinality of distinct dimension combinations
When setting `byRow` to `true` it computes the cardinality by row, i.e. the cardinality of distinct dimension combinations.
This is equivalent to something akin to

```sql
SELECT COUNT(*) FROM ( SELECT DIM1, DIM2, DIM3 FROM <datasource> GROUP BY DIM1, DIM2, DIM3
SELECT COUNT(*) FROM ( SELECT DIM1, DIM2, DIM3 FROM <datasource> GROUP BY DIM1, DIM2, DIM3 )
```

**Example**
Expand Down
2 changes: 1 addition & 1 deletion docs/content/querying/timeseriesquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ There are 7 main parts to a timeseries query:
|postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no|
|context|See [Context](../querying/query-context.html)|no|

To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) the result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this:
To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this:

```json
[
Expand Down
Binary file added docs/img/druid-column-types.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public void configure(Binder binder)
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);

final Configuration conf = new Configuration();

// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
conf.setClassLoader(getClass().getClassLoader());

if (props != null) {
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public Boolean call() throws Exception
public static Path prependFSIfNullScheme(FileSystem fs, Path path)
{
if (path.toUri().getScheme() == null) {
path = new Path(fs.getUri().toString(), String.format("./%s", path));
path = fs.makeQualified(path);
}
return path;
}
Expand Down
Loading

0 comments on commit e42d5ac

Please sign in to comment.