Skip to content

Commit

Permalink
NIFI-1797 - Added compression codec property to CreateHadoopSequenceF…
Browse files Browse the repository at this point in the history
…ile processor

This closes: apache#1387

Signed-off-by: Andre F de Miranda <[email protected]>
  • Loading branch information
pvillard31 authored and trixpan committed Mar 2, 2017
1 parent a32e150 commit dcdfd3d
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.nifi.processors.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
Expand All @@ -31,12 +33,14 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.nifi.util.StopWatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* <p>
Expand Down Expand Up @@ -88,7 +92,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
}
// Optional Properties.
static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("compression type")
.name("Compression type")
.description("Type of compression to use when creating Sequence File")
.allowableValues(SequenceFile.CompressionType.values())
.build();
Expand All @@ -105,6 +109,7 @@ public Set<Relationship> getRelationships() {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> someProps = new ArrayList<>(properties);
someProps.add(COMPRESSION_TYPE);
someProps.add(COMPRESSION_CODEC);
return someProps;
}

Expand Down Expand Up @@ -149,13 +154,28 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
default:
sequenceFileWriter = new SequenceFileWriterImpl();
}
String value = context.getProperty(COMPRESSION_TYPE).getValue();
SequenceFile.CompressionType compressionType = value == null

final Configuration configuration = getConfiguration();
if (configuration == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, RELATIONSHIP_FAILURE);
context.yield();
return;
}

final CompressionCodec codec = getCompressionCodec(context, configuration);

final String value = context.getProperty(COMPRESSION_TYPE).getValue();
final SequenceFile.CompressionType compressionType = value == null
? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);

final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);

try {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
StopWatch stopWatch = new StopWatch(true);
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (ProcessException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -48,7 +48,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {

@Override
public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session,
final Configuration configuration, final CompressionType compressionType) {
final Configuration configuration, final CompressionType compressionType, final CompressionCodec compressionCodec) {

if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Cannot write " + flowFile
Expand Down Expand Up @@ -97,7 +97,7 @@ public void process(InputStream in, OutputStream out) throws IOException {
SequenceFile.Writer.stream(fsDataOutputStream),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(InputStreamWritable.class),
SequenceFile.Writer.compression(compressionType, new DefaultCodec()))) {
SequenceFile.Writer.compression(compressionType, compressionCodec))) {

processInputStream(in, flowFile, writer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;

Expand All @@ -31,7 +32,8 @@ public interface SequenceFileWriter {
* @param session session
* @param configuration configuration
* @param compressionType compression type
* @param compressionCodec compression codec
* @return the written to SequenceFile flow file
*/
FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType);
FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType, CompressionCodec compressionCodec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.nifi.processors.hadoop;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -30,8 +33,6 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -49,7 +50,6 @@
public class TestCreateHadoopSequenceFile {

private TestRunner controller;
private static Logger LOGGER;

private final File testdata = new File("src/test/resources/testdata");
private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"),
Expand All @@ -61,7 +61,6 @@ public class TestCreateHadoopSequenceFile {

@BeforeClass
public static void setUpClass() {
LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class);
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug");
}
Expand Down Expand Up @@ -204,6 +203,147 @@ public void testMergedFlowfilePackagedData() throws IOException {
// fos.close();
}

@Test
public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {

controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());

File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();

List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);

assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());

MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();


final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();

assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));

final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;

assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);

final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;

assertEquals(BZip2Codec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}

@Test
public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {

controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());

File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();

List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);

assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());

MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();


final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();

assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));

final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;

assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);

final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;

assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}

@Test
public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {

controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());

File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();

List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);

assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());

MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();


final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();

assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));

final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;

assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);

final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;

assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}

private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile {

private KerberosProperties testKerbersProperties;
Expand All @@ -217,4 +357,5 @@ protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerbersProperties;
}
}

}

0 comments on commit dcdfd3d

Please sign in to comment.