Skip to content

Commit

Permalink
Orc reader/writer added
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashish Kumar committed May 9, 2017
1 parent 562085f commit 6fd65b2
Show file tree
Hide file tree
Showing 8 changed files with 635 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@
<artifactId>azure-storage</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.3.3</version>
</dependency>
</dependencies>

<build>
Expand Down
8 changes: 7 additions & 1 deletion src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ aws.proxy.http.port=
# secor.upload.manager.class.
#
# http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
aws.region=
aws.region=SG
aws.endpoint=

# Toggle the AWS S3 client between virtual host style access and path style
Expand Down Expand Up @@ -398,3 +398,9 @@ parquet.enable.dictionary=true
# Enable or disable validation for Parquet writers. Validates records written against the schema. Defaults to false in
# Parquet 1.9.
parquet.validation=false

# User can configure ORC schema for each Kafka topic. Common schema is also possible. This property is mandatory
# if DefaultORCSchemaProvider is used. ORC schema for all the topics should be defined like this:
secor.orc.message.schema.*=struct<a:int\,b:int\,c:struct<d:int\,e:string>\,f:array<string>\,g:int>
# Individual topic schema:
secor.orc.message.schema.topic1=struct<a:int\,b:int\,c:struct<d:int\,e:string>\,f:array<string>\,g:int>
15 changes: 15 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,19 @@ public String getThriftProtocolClass() {
public String getMetricsCollectorClass() {
return mProperties.getString("secor.monitoring.metrics.collector.class");
}

public Map<String, String> getPropertyMapForPrefix(String prefix) {
Iterator<String> keys = mProperties.getKeys(prefix);
Map<String, String> map = new HashMap<String, String>();
while (keys.hasNext()) {
String key = keys.next();
String value = mProperties.getString(key);
map.put(key.substring(prefix.length() + 1), value);
}
return map;
}

public Map<String, String> getORCMessageSchema() {
return getPropertyMapForPrefix("secor.orc.message.schema");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package com.pinterest.secor.io.impl;

import java.io.IOException;
import java.io.StringWriter;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.impl.ZlibCodec;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileReaderWriterFactory;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.util.orc.JsonFieldFiller;
import com.pinterest.secor.util.orc.VectorColumnFiller;
import com.pinterest.secor.util.orc.VectorColumnFiller.JsonConverter;
import com.pinterest.secor.util.orc.schema.DefaultORCSchemaProvider;
import com.pinterest.secor.util.orc.schema.ORCScehmaProvider;

/**
* ORC reader/writer implementation
*
* @author Ashish ([email protected])
*
*/
public class JsonORCFileReaderWriterFactory implements FileReaderWriterFactory {

private ORCScehmaProvider schemaProvider;

public JsonORCFileReaderWriterFactory(SecorConfig config) {
schemaProvider = new DefaultORCSchemaProvider(config);
}

@Override
public FileReader BuildFileReader(LogFilePath logFilePath,
CompressionCodec codec) throws Exception {
return new JsonORCFileReader(logFilePath, codec);
}

@Override
public FileWriter BuildFileWriter(LogFilePath logFilePath,
CompressionCodec codec) throws Exception {
return new JsonORCFileWriter(logFilePath, codec);
}

protected class JsonORCFileReader implements FileReader {

private int rowIndex = 0;
private long offset;
private RecordReader rows;
private VectorizedRowBatch batch;
private TypeDescription schema;

@SuppressWarnings("deprecation")
public JsonORCFileReader(LogFilePath logFilePath, CompressionCodec codec)
throws IOException {
schema = schemaProvider.getSchema(logFilePath.getTopic());
Path path = new Path(logFilePath.getLogFilePath());
Reader reader = OrcFile.createReader(path,
OrcFile.readerOptions(new Configuration(true)));
offset = logFilePath.getOffset();
rows = reader.rows();
batch = reader.getSchema().createRowBatch();
rows.nextBatch(batch);
}

@Override
public KeyValue next() throws IOException {
boolean endOfBatch = false;
StringWriter sw = new StringWriter();

if (rowIndex > batch.size - 1) {
endOfBatch = !rows.nextBatch(batch);
rowIndex = 0;
}

if (endOfBatch) {
rows.close();
return null;
}

try {
JsonFieldFiller.processRow(new JSONWriter(sw), batch, schema,
rowIndex);
} catch (JSONException e) {
e.printStackTrace();
}
rowIndex++;
return new KeyValue(offset++, sw.toString().getBytes("UTF-8"));
}

@Override
public void close() throws IOException {
rows.close();
}
}

protected class JsonORCFileWriter implements FileWriter {

private Gson gson = new Gson();
private Writer writer;
private JsonConverter[] converters;
private VectorizedRowBatch batch;
private int rowIndex;
private TypeDescription schema;

public JsonORCFileWriter(LogFilePath logFilePath, CompressionCodec codec)
throws IOException {
Configuration conf = new Configuration();
Path path = new Path(logFilePath.getLogFilePath());
schema = schemaProvider.getSchema(logFilePath.getTopic());
List<TypeDescription> fieldTypes = schema.getChildren();
converters = new JsonConverter[fieldTypes.size()];
for (int c = 0; c < converters.length; ++c) {
converters[c] = VectorColumnFiller.createConverter(fieldTypes
.get(c));
}

writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf)
.compress(resolveCompression(codec)).setSchema(schema));
batch = schema.createRowBatch();
}

@Override
public long getLength() throws IOException {
return writer.getRawDataSize();
}

@Override
public void write(KeyValue keyValue) throws IOException {
rowIndex = batch.size++;
VectorColumnFiller.fillRow(rowIndex, converters, schema, batch,
gson.fromJson(new String(keyValue.getValue()),
JsonObject.class));
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}

@Override
public void close() throws IOException {
writer.addRowBatch(batch);
writer.close();
}
}

/**
* Used for returning the compression kind used in ORC
*
* @param codec
* @return
*/
private CompressionKind resolveCompression(CompressionCodec codec) {
if (codec instanceof Lz4Codec)
return CompressionKind.LZ4;
else if (codec instanceof SnappyCodec)
return CompressionKind.SNAPPY;
else if (codec instanceof ZlibCodec)
return CompressionKind.ZLIB;
else
return CompressionKind.NONE;
}
}
126 changes: 126 additions & 0 deletions src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.pinterest.secor.util.orc;

import java.util.List;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.orc.TypeDescription;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONWriter;

/**
*
* @author Ashish ([email protected])
*
*/
public class JsonFieldFiller {

public static void processRow(JSONWriter writer, VectorizedRowBatch batch,
TypeDescription schema, int row) throws JSONException {
if (schema.getCategory() == TypeDescription.Category.STRUCT) {
List<TypeDescription> fieldTypes = schema.getChildren();
List<String> fieldNames = schema.getFieldNames();
writer.object();
for (int c = 0; c < batch.cols.length; ++c) {
writer.key(fieldNames.get(c));
setValue(writer, batch.cols[c], fieldTypes.get(c), row);
}
writer.endObject();
} else {
setValue(writer, batch.cols[0], schema, row);
}
}

static void setValue(JSONWriter writer, ColumnVector vector,
TypeDescription schema, int row) throws JSONException {
if (vector.isRepeating) {
row = 0;
}
if (vector.noNulls || !vector.isNull[row]) {
switch (schema.getCategory()) {
case BOOLEAN:
writer.value(((LongColumnVector) vector).vector[row] != 0);
break;
case BYTE:
case SHORT:
case INT:
case LONG:
writer.value(((LongColumnVector) vector).vector[row]);
break;
case FLOAT:
case DOUBLE:
writer.value(((DoubleColumnVector) vector).vector[row]);
break;
case STRING:
case CHAR:
case VARCHAR:
writer.value(((BytesColumnVector) vector).toString(row));
break;
case DECIMAL:
writer.value(((DecimalColumnVector) vector).vector[row]
.toString());
break;
case DATE:
writer.value(new DateWritable(
(int) ((LongColumnVector) vector).vector[row])
.toString());
break;
case TIMESTAMP:
writer.value(((TimestampColumnVector) vector)
.asScratchTimestamp(row).toString());
break;
case LIST:
setList(writer, (ListColumnVector) vector, schema, row);
break;
case STRUCT:
setStruct(writer, (StructColumnVector) vector, schema, row);
break;
case UNION:
// printUnion(writer, (UnionColumnVector) vector, schema, row);
break;
case BINARY:
// printBinary(writer, (BytesColumnVector) vector, row);
break;
case MAP:
// printMap(writer, (MapColumnVector) vector, schema, row);
break;
default:
throw new IllegalArgumentException("Unknown type "
+ schema.toString());
}
} else {
writer.value(null);
}
}

private static void setList(JSONWriter writer, ListColumnVector vector,
TypeDescription schema, int row) throws JSONException {
writer.array();
int offset = (int) vector.offsets[row];
TypeDescription childType = schema.getChildren().get(0);
for (int i = 0; i < vector.lengths[row]; ++i) {
setValue(writer, vector.child, childType, offset + i);
}
writer.endArray();
}

private static void setStruct(JSONWriter writer, StructColumnVector batch,
TypeDescription schema, int row) throws JSONException {
writer.object();
List<String> fieldNames = schema.getFieldNames();
List<TypeDescription> fieldTypes = schema.getChildren();
for (int i = 0; i < fieldTypes.size(); ++i) {
writer.key(fieldNames.get(i));
setValue(writer, batch.fields[i], fieldTypes.get(i), row);
}
writer.endObject();
}
}
Loading

0 comments on commit 6fd65b2

Please sign in to comment.