Skip to content

Commit

Permalink
PARQUET-79: add a streaming Thrift API, to enable processing the meta…
Browse files Browse the repository at this point in the history
…data as we read it and skipping unnecessary fields.

This pull request provides an API to read thrift in a streaming fashion.
This enables ignoring fields that are not needed without loading them into memory.
It also aloow treating the data as it comes instead of when it's fully loaded in memory.

Author: julien <[email protected]>

Closes apache#8 from julienledem/streaming_metadata and squashes the following commits:

621769a [julien] cleanup refactoring
a58913d [julien] rename add to consume
e5c78fc [julien] #simplify
cb386ce [julien] RIP TypedConsumerProvider, @tsdeng did not like you
8dd801e [julien] Merge branch 'master' into streaming_metadata
958726f [julien] javadoc; fix apis
9be786a [julien] added simple readMetaData method
bee937a [julien] refactor, cleanup
6368bdc [julien] streaming thrift reader
71c85de [julien] first stab
  • Loading branch information
julienledem authored and Dmitriy Ryaboy committed Aug 30, 2014
1 parent 02abe09 commit 97964f3
Show file tree
Hide file tree
Showing 7 changed files with 711 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
142 changes: 142 additions & 0 deletions src/main/java/parquet/format/Util.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
package parquet.format;

import static parquet.format.FileMetaData._Fields.CREATED_BY;
import static parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA;
import static parquet.format.FileMetaData._Fields.NUM_ROWS;
import static parquet.format.FileMetaData._Fields.ROW_GROUPS;
import static parquet.format.FileMetaData._Fields.SCHEMA;
import static parquet.format.FileMetaData._Fields.VERSION;
import static parquet.format.event.Consumers.fieldConsumer;
import static parquet.format.event.Consumers.listElementsOf;
import static parquet.format.event.Consumers.listOf;
import static parquet.format.event.Consumers.struct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;

import parquet.format.event.Consumers.Consumer;
import parquet.format.event.Consumers.DelegatingFieldConsumer;
import parquet.format.event.EventBasedThriftReader;
import parquet.format.event.TypedConsumer.I32Consumer;
import parquet.format.event.TypedConsumer.I64Consumer;
import parquet.format.event.TypedConsumer.StringConsumer;

/**
* Utility to read/write metadata
* We use the TCompactProtocol to serialize metadata
Expand All @@ -34,6 +53,129 @@ public static void writeFileMetaData(parquet.format.FileMetaData fileMetadata, O
public static FileMetaData readFileMetaData(InputStream from) throws IOException {
return read(from, new FileMetaData());
}
/**
* reads the meta data from the stream
* @param from the stream to read the metadata from
* @param skipRowGroups whether row groups should be skipped
* @return the resulting metadata
* @throws IOException
*/
public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException {
FileMetaData md = new FileMetaData();
if (skipRowGroups) {
readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups);
} else {
read(from, md);
}
return md;
}

/**
* To read metadata in a streaming fashion.
*
* @author Julien Le Dem
*
*/
public static abstract class FileMetaDataConsumer {
abstract public void setVersion(int version);
abstract public void setSchema(List<SchemaElement> schema);
abstract public void setNumRows(long numRows);
abstract public void addRowGroup(RowGroup rowGroup);
abstract public void addKeyValueMetaData(KeyValue kv);
abstract public void setCreatedBy(String createdBy);
}

/**
* Simple default consumer that sets the fields
*
* @author Julien Le Dem
*
*/
public static final class DefaultFileMetaDataConsumer extends FileMetaDataConsumer {
private final FileMetaData md;

public DefaultFileMetaDataConsumer(FileMetaData md) {
this.md = md;
}

@Override
public void setVersion(int version) {
md.setVersion(version);
}

@Override
public void setSchema(List<SchemaElement> schema) {
md.setSchema(schema);
}

@Override
public void setNumRows(long numRows) {
md.setNum_rows(numRows);
}

@Override
public void setCreatedBy(String createdBy) {
md.setCreated_by(createdBy);
}

@Override
public void addRowGroup(RowGroup rowGroup) {
md.addToRow_groups(rowGroup);
}

@Override
public void addKeyValueMetaData(KeyValue kv) {
md.addToKey_value_metadata(kv);
}
}

public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException {
readFileMetaData(from, consumer, false);
}

public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
@Override
public void consume(int value) {
consumer.setVersion(value);
}
}).onField(SCHEMA, listOf(SchemaElement.class, new Consumer<List<SchemaElement>>() {
@Override
public void consume(List<SchemaElement> schema) {
consumer.setSchema(schema);
}
})).onField(NUM_ROWS, new I64Consumer() {
@Override
public void consume(long value) {
consumer.setNumRows(value);
}
}).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer<KeyValue>() {
@Override
public void consume(KeyValue kv) {
consumer.addKeyValueMetaData(kv);
}
}))).onField(CREATED_BY, new StringConsumer() {
@Override
public void consume(String value) {
consumer.setCreatedBy(value);
}
});
if (!skipRowGroups) {
eventConsumer = eventConsumer.onField(ROW_GROUPS, listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() {
@Override
public void consume(RowGroup rowGroup) {
consumer.addRowGroup(rowGroup);
}
})));
}
new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);

} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}

private static TProtocol protocol(OutputStream to) {
return protocol(new TIOStreamTransport(to));
Expand Down
181 changes: 181 additions & 0 deletions src/main/java/parquet/format/event/Consumers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package parquet.format.event;

import static java.util.Collections.unmodifiableMap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TList;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolUtil;

import parquet.format.event.Consumers.Consumer;
import parquet.format.event.TypedConsumer.BoolConsumer;
import parquet.format.event.TypedConsumer.ListConsumer;
import parquet.format.event.TypedConsumer.StructConsumer;

/**
* Entry point for reading thrift in a streaming fashion
*
* @author Julien Le Dem
*
*/
public class Consumers {

/**
* To consume objects coming from a DelegatingFieldConsumer
* @author Julien Le Dem
*
* @param <T> the type of consumed objects
*/
public static interface Consumer<T> {
void consume(T t);
}

/**
* Delegates reading the field to TypedConsumers.
* There is one TypedConsumer per thrift type.
* use {@link DelegatingFieldConsumer#onField(TFieldIdEnum, BoolConsumer)} et al. to consume specific thrift fields.
* @see Consumers#fieldConsumer()
* @author Julien Le Dem
*
*/
public static class DelegatingFieldConsumer implements FieldConsumer {

private final Map<Short, TypedConsumer> contexts;
private final FieldConsumer defaultFieldEventConsumer;

private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer, Map<Short, TypedConsumer> contexts) {
this.defaultFieldEventConsumer = defaultFieldEventConsumer;
this.contexts = unmodifiableMap(contexts);
}

private DelegatingFieldConsumer() {
this(new SkippingFieldConsumer());
}

private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer) {
this(defaultFieldEventConsumer, Collections.<Short, TypedConsumer>emptyMap());
}

public DelegatingFieldConsumer onField(TFieldIdEnum e, TypedConsumer typedConsumer) {
Map<Short, TypedConsumer> newContexts = new HashMap<Short, TypedConsumer>(contexts);
newContexts.put(e.getThriftFieldId(), typedConsumer);
return new DelegatingFieldConsumer(defaultFieldEventConsumer, newContexts);
}

@Override
public void consumeField(
TProtocol protocol, EventBasedThriftReader reader,
short id, byte type) throws TException {
TypedConsumer delegate = contexts.get(id);
if (delegate != null) {
delegate.read(protocol, reader, type);
} else {
defaultFieldEventConsumer.consumeField(protocol, reader, id, type);
}
}
}

/**
* call onField on the resulting DelegatingFieldConsumer to handle individual fields
* @return a new DelegatingFieldConsumer
*/
public static DelegatingFieldConsumer fieldConsumer() {
return new DelegatingFieldConsumer();
}

/**
* To consume a list of elements
* @param c the type of the list content
* @param consumer the consumer that will receive the list
* @return a ListConsumer that can be passed to the DelegatingFieldConsumer
*/
public static <T extends TBase<T,? extends TFieldIdEnum>> ListConsumer listOf(Class<T> c, final Consumer<List<T>> consumer) {
class ListConsumer implements Consumer<T> {
List<T> list;
@Override
public void consume(T t) {
list.add(t);
}
}
final ListConsumer co = new ListConsumer();
return new DelegatingListElementsConsumer(struct(c, co)) {
@Override
public void consumeList(TProtocol protocol,
EventBasedThriftReader reader, TList tList) throws TException {
co.list = new ArrayList<T>();
super.consumeList(protocol, reader, tList);
consumer.consume(co.list);
}
};
}

/**
* To consume list elements one by one
* @param consumer the consumer that will read the elements
* @return a ListConsumer that can be passed to the DelegatingFieldConsumer
*/
public static ListConsumer listElementsOf(TypedConsumer consumer) {
return new DelegatingListElementsConsumer(consumer);
}

public static <T extends TBase<T,? extends TFieldIdEnum>> StructConsumer struct(final Class<T> c, final Consumer<T> consumer) {
return new TBaseStructConsumer<T>(c, consumer);
}
}

class SkippingFieldConsumer implements FieldConsumer {
@Override
public void consumeField(TProtocol protocol, EventBasedThriftReader reader, short id, byte type) throws TException {
TProtocolUtil.skip(protocol, type);
}
}

class DelegatingListElementsConsumer extends ListConsumer {

private TypedConsumer elementConsumer;

protected DelegatingListElementsConsumer(TypedConsumer consumer) {
this.elementConsumer = consumer;
}

@Override
public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException {
elementConsumer.read(protocol, reader, elemType);
}
}
class TBaseStructConsumer<T extends TBase<T, ? extends TFieldIdEnum>> extends StructConsumer {

private final Class<T> c;
private Consumer<T> consumer;

public TBaseStructConsumer(Class<T> c, Consumer<T> consumer) {
this.c = c;
this.consumer = consumer;
}

@Override
public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException {
T o = newObject();
o.read(protocol);
consumer.consume(o);
}

protected T newObject() {
try {
return c.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(c.getName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException(c.getName(), e);
}
}

}
Loading

0 comments on commit 97964f3

Please sign in to comment.