Skip to content

Commit

Permalink
Some code refactor for better performance of Avro-Extension (apache…
Browse files Browse the repository at this point in the history
…#4092)

* 1. Collections.singletonList instand of Arrays.asList; 2. close FSDataInputStream/ByteBufferInputStream for releasing resource; 3. convert com.google.common.base.Function into java.util.function.Function; 4. others code refactor

* Put each param on its own line for  code style

* Revert GenericRecordAsMap back about `Function`
  • Loading branch information
asdf2014 authored and gianm committed Apr 25, 2017
1 parent d51097c commit de815da
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ public InputRow parse(ByteBuffer input)
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false);
}

protected static InputRow parseGenericRecord(GenericRecord record, ParseSpec parseSpec, List<String> dimensions,
boolean fromPigAvroStorage, boolean binaryAsString)
protected static InputRow parseGenericRecord(
GenericRecord record,
ParseSpec parseSpec,
List<String> dimensions,
boolean fromPigAvroStorage,
boolean binaryAsString
)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.schemarepo.json.GsonJsonUtil;
import org.schemarepo.json.JsonUtil;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class AvroExtensionsModule implements DruidModule
Expand All @@ -46,7 +46,7 @@ public AvroExtensionsModule() {}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
return Collections.singletonList(
new SimpleModule("AvroInputRowParserModule")
.registerSubtypes(
new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.druid.data.input.avro;

import io.druid.java.util.common.logger.Logger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
Expand All @@ -31,8 +32,6 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import io.druid.java.util.common.logger.Logger;

import java.io.IOException;

public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericRecord>
Expand All @@ -55,13 +54,10 @@ public RecordReader<NullWritable, GenericRecord> createRecordReader(
String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
if (StringUtils.isNotBlank(schemaFilePath)) {
log.info("Using file: %s as reader schema.", schemaFilePath);
FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath));
try {
try (FSDataInputStream inputStream =
FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath))) {
readerSchema = new Schema.Parser().parse(inputStream);
}
finally {
inputStream.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder

private final Schema schemaObj;
private final Map<String, Object> schema;
private final DatumReader<GenericRecord> reader;

@JsonCreator
public InlineSchemaAvroBytesDecoder(
Expand All @@ -61,14 +62,16 @@ public InlineSchemaAvroBytesDecoder(
String schemaStr = mapper.writeValueAsString(schema);

LOGGER.info("Schema string [%s]", schemaStr);
schemaObj = new Schema.Parser().parse(schemaStr);
this.schemaObj = new Schema.Parser().parse(schemaStr);
this.reader = new GenericDatumReader<>(this.schemaObj);
}

//For UT only
@VisibleForTesting
InlineSchemaAvroBytesDecoder(Schema schemaObj)
{
this.schemaObj = schemaObj;
this.reader = new GenericDatumReader<>(schemaObj);
this.schema = null;
}

Expand All @@ -81,9 +84,7 @@ public Map<String, Object> getSchema()
@Override
public GenericRecord parse(ByteBuffer bytes)
{
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public InlineSchemasAvroBytesDecoder(

Map<String, Object> schema = e.getValue();
String schemaStr = mapper.writeValueAsString(schema);
;

LOGGER.info("Schema string [%s] = [%s]", id, schemaStr);
schemaObjs.put(id, new Schema.Parser().parse(schemaStr));
Expand Down Expand Up @@ -116,10 +115,8 @@ public GenericRecord parse(ByteBuffer bytes)
throw new ParseException("Failed to find schema for id [%s]", schemaId);
}

try {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));

DatumReader<GenericRecord> reader = new GenericDatumReader<>(schemaObj);
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public SchemaRegistryBasedAvroBytesDecoder(
}

//For UT only
@VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
@VisibleForTesting
SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
{
this.registry = registry;
}
Expand All @@ -63,7 +64,8 @@ public GenericRecord parse(ByteBuffer bytes)
Schema schema = registry.getByID(id);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} catch (Exception e) {
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!");
}
}
Expand All @@ -81,7 +83,6 @@ public boolean equals(Object o)
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;

return registry != null ? registry.equals(that.registry) : that.registry == null;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public SchemaRepoBasedAvroBytesDecoder(
{
this.subjectAndIdConverter = subjectAndIdConverter;
this.schemaRepository = schemaRepository;
typedRepository = new TypedSchemaRepository<ID, Schema, SUBJECT>(
this.typedRepository = new TypedSchemaRepository<>(
schemaRepository,
subjectAndIdConverter.getIdConverter(),
new AvroSchemaConverter(false),
Expand All @@ -77,9 +77,8 @@ public GenericRecord parse(ByteBuffer bytes)
{
Pair<SUBJECT, ID> subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes);
Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import io.druid.java.util.common.Pair;

import org.schemarepo.api.converter.Converter;
import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
Expand Down Expand Up @@ -51,7 +49,7 @@ public Avro1124SubjectAndIdConverter(@JsonProperty("topic") String topic)
@Override
public Pair<String, Integer> getSubjectAndId(ByteBuffer payload)
{
return new Pair<String, Integer>(topic, payload.getInt());
return new Pair<>(topic, payload.getInt());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import io.druid.java.util.common.Pair;

import org.schemarepo.api.converter.Converter;

import java.nio.ByteBuffer;
Expand Down

0 comments on commit de815da

Please sign in to comment.