Skip to content

Commit

Permalink
Protobuf extension (apache#4039)
Browse files Browse the repository at this point in the history
* move ProtoBufInputRowParser from processing module to protobuf extensions

* Ported PR apache#3509

* add DynamicMessage

* fix local test stuff that slipped in

* add license header

* removed redundant type name

* removed commented code

* fix code style

* rename ProtoBuf -> Protobuf

* pom.xml: shade protobuf classes, handle .desc resource file as binary file

* clean up error messages

* pick first message type from descriptor if not specified

* fix protoMessageType null check. add test case

* move protobuf-extension from contrib to core

* document: add new configuration keys, and descriptions

* update document. add examples

* move protobuf-extension from contrib to core (2nd try)

* touch

* include protobuf extensions in the distribution

* fix whitespace

* include protobuf example in the distribution

* example: create new pb obj everytime

* document: use properly quoted json

* fix whitespace

* bump parent version to 0.10.1-SNAPSHOT

* ignore Override check

* touch
  • Loading branch information
knoguchi authored and fjy committed May 30, 2017
1 parent 7889891 commit 3400f60
Show file tree
Hide file tree
Showing 26 changed files with 4,230 additions and 1,412 deletions.
4 changes: 3 additions & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@
<argument>-c</argument>
<argument>io.druid.extensions:druid-lookups-cached-single</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-protobuf-extensions</argument>
<argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>
<argument>-c</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-kerberos</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-s3-extensions</argument>
Expand Down
7 changes: 7 additions & 0 deletions distribution/src/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@
</includes>
<outputDirectory>conf-quickstart/tranquility</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/quickstart/protobuf</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>quickstart/protobuf</outputDirectory>
</fileSet>

<fileSet>
<directory>../examples/conf</directory>
Expand Down
203 changes: 203 additions & 0 deletions docs/content/development/extensions-core/protobuf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
---
layout: doc_page
---

# Protobuf

This extension enables Druid to ingest and understand the Protobuf data format. Make sure to [include](../../operations/including-extensions.html) `druid-protobuf-extensions` as an extension.

## Protobuf Parser


| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `protobuf`. | no |
| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes |
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be json. See [JSON ParseSpec](../../ingestion/index.html) for more configuration options. Please note timeAndDims parseSpec is no longer supported. | yes |

## Example: Load Protobuf messages from Kafka

This example demonstrates how to load Protobuf messages from Kafka. Please read the [Load from Kafka tutorial](../../tutorial/tutorial-kafka.html) first. This example will use the same "metrics" dataset.

Files used in this example are found at `./examples/quickstart/protobuf` in your Druid directory.

- We will use [Kafka Indexing Service](./kafka-ingestion.html) instead of Tranquility.
- Kafka broker host is `localhost:9092`.
- Kafka topic is `metrics_pb` instead of `metrics`.
- datasource name is `metrics-kafka-pb` instead of `metrics-kafka` to avoid the confusion.

Here is the metrics JSON example.

```json
{
"unit": "milliseconds",
"http_method": "GET",
"value": 44,
"timestamp": "2017-04-06T02:36:22Z",
"http_code": "200",
"page": "/",
"metricType": "request/latency",
"server": "www1.example.com"
}
```

### Proto file

The proto file should look like this. Save it as metrics.proto.

```
syntax = "proto3";
message Metrics {
string unit = 1;
string http_method = 2;
int32 value = 3;
string timestamp = 4;
string http_code = 5;
string page = 6;
string metricType = 7;
string server = 8;
}
```

### Descriptor file

Using the `protoc` Protobuf compiler to generate the descriptor file. Save the metrics.desc file either in the classpath or reachable by URL. In this example the descriptor file was saved at /tmp/metrics.desc.

```
protoc -o /tmp/metrics.desc metrics.proto
```

### Supervisor spec JSON

Below is the complete Supervisor spec JSON to be submitted to the Overlord.
Please make sure these keys are properly configured for successful ingestion.

- `descriptor` for the descriptor file URL.
- `protoMessageType` from the proto definition.
- parseSpec `format` must be `json`.
- `topic` to subscribe. The topic is "metrics_pb" instead of "metrics".
- `bootstrap.server` is the kafka broker host.

```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka2",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"unit",
"http_method",
"http_code",
"page",
"metricType",
"server"
],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics_pb",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
```

## Kafka Producer

Here is the sample script that publishes the metrics to Kafka in Protobuf format.

1. Run `protoc` again with the Python binding option. This command generates `metrics_pb2.py` file.
```
protoc -o metrics.desc metrics.proto --python_out=.
```

2. Create Kafka producer script.

This script requires `protobuf` and `kafka-python` modules.

```python
#!/usr/bin/env python

import sys
import json

from kafka import KafkaProducer
from metrics_pb2 import Metrics

producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'metrics_pb'
metrics = Metrics()

for row in iter(sys.stdin):
d = json.loads(row)
for k, v in d.items():
setattr(metrics, k, v)
pb = metrics.SerializeToString()
producer.send(topic, pb)
```

3. run producer

```
./bin/generate-example-metrics | ./pb_publisher.py
```

4. test

```
kafka-console-consumer --zookeeper localhost --topic metrics_pb
```

It should print messages like this
> millisecondsGETR"2017-04-06T03:23:56Z*2002/list:request/latencyBwww1.example.com
1 change: 1 addition & 0 deletions docs/content/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Core extensions are maintained by Druid committers.
|druid-kerberos|Kerberos authentication for druid nodes.|[link](../development/extensions-core/druid-kerberos.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
|druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
Expand Down
7 changes: 0 additions & 7 deletions docs/content/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ If `type` is not included, the parser defaults to `string`. For additional data
| type | String | This should say `string` in general, or `hadoopyString` when used in a Hadoop indexing job. | no |
| parseSpec | JSON Object | Specifies the format, timestamp, and dimensions of the data. | yes |

### Protobuf Parser

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes |

### ParseSpec

ParseSpecs serve two purposes:
Expand Down
71 changes: 71 additions & 0 deletions examples/quickstart/protobuf/kafka-metrics-pb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka2",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"unit",
"http_method",
"http_code",
"page",
"metricType",
"server"
],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics_pb",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
15 changes: 15 additions & 0 deletions examples/quickstart/protobuf/metrics.desc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

�
metrics.proto"�
Metrics
unit ( Runit
http_method ( R
httpMethod
value (Rvalue
timestamp ( R timestamp
http_code ( RhttpCode
page ( Rpage

metricType ( R
metricType
server ( Rserverbproto3
Expand Down
11 changes: 11 additions & 0 deletions examples/quickstart/protobuf/metrics.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";
message Metrics {
string unit = 1;
string http_method = 2;
int32 value = 3;
string timestamp = 4;
string http_code = 5;
string page = 6;
string metricType = 7;
string server = 8;
}
Loading

0 comments on commit 3400f60

Please sign in to comment.