Skip to content

Commit

Permalink
Hadoop InputRowParser for Orc file (apache#3019)
Browse files Browse the repository at this point in the history
* InputRowParser to decode OrcStruct from OrcNewInputFormat

* add unit test for orc hadoop indexing

* update docs and fix test code bug

* doc updated

* resove maven dependency conflict

* remove unused imports

* fix returning array type from Object[] to correct primitive array type

* fix to support getDimension() of MapBasedRow : changing return type of orc list from array to list

* rebase and updated based on comments

* updated based on comments

* on reflecting review comments

* fix bug in typeStringFromParseSpec() and add unit test

* add license header
  • Loading branch information
sirpkt authored and fjy committed Jul 26, 2016
1 parent 76fabcf commit 95a5809
Show file tree
Hide file tree
Showing 11 changed files with 1,203 additions and 0 deletions.
91 changes: 91 additions & 0 deletions docs/content/development/extensions-contrib/orc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---
layout: doc_page
---

# Orc

To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.

This extension enables Druid to ingest and understand the Apache Orc data format offline.

## Orc Hadoop Parser

This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"`.

Field | Type | Description | Required
----------|-------------|----------------------------------------------------------------------------------------|---------
type | String | This should say `orc` | yes
parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Any parse spec that extends ParseSpec is possible but only their TimestampSpec and DimensionsSpec are used. | yes
typeString| String | String representation of Orc struct type info. If not specified, auto constructed from parseSpec but all metric columns are dropped | no

For example of `typeString`, string column col1 and array of string column col2 is represented by `"struct<col1:string,col2:array<string>>"`.

Currently, it only supports java primitive types and array of java primitive types, which means only 'list' of compound types in [ORC types](https://orc.apache.org/docs/types.html) is supported (list of list is not supported).

For example of hadoop indexing:
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "/data/path/in/HDFS/"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<time:string,name:string>"
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {},
"leaveIntermediate": true
}
}
}

```

Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.
1 change: 1 addition & 0 deletions docs/content/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
Expand Down
63 changes: 63 additions & 0 deletions extensions-contrib/orc-extensions/example/hadoop_orc_job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "wikipedia.gz.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"col1",
"col2"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<timestamp:string,col1:string,col2:array<string>,val1:float>"
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}
150 changes: 150 additions & 0 deletions extensions-contrib/orc-extensions/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-orc-extensions</artifactId>
<name>druid-orc-extensions</name>
<description>druid-orc-extensions</description>

<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-orc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;

import java.util.Arrays;
import java.util.List;

public class OrcExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules() {
return Arrays.asList(
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
new NamedType(OrcHadoopInputRowParser.class, "orc")
)
);
}

@Override
public void configure(Binder binder) {

}
}
Loading

0 comments on commit 95a5809

Please sign in to comment.