Skip to content

Commit

Permalink
NIFI-1874 Added Character Set Detection to IdentifyMimeType
Browse files Browse the repository at this point in the history
NIFI-4550 New Processor not required based on improvements to IdentifyMimeType

- Added mime.charset FlowFile attribute when not null for text MIME types

This closes apache#8011

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
EndzeitBegins authored and exceptionfactory committed Nov 15, 2023
1 parent 5815d83 commit 6c333cd
Show file tree
Hide file tree
Showing 5 changed files with 556 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ The following binary components are provided under the Mozilla Public License v2

(MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:10.6 - http://www.saxonica.com/)

*****************
Mozilla Public License v1.1
*****************

The following binary components are provided under the Mozilla Public License v1.1. See project link for details.

(MPL 1.1) Java port of universalchardet (com.github.albfernandez:juniversalchardet - https://github.com/albfernandez/juniversalchardet)

*****************
Public Domain
*****************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-parser-text-module</artifactId>
<version>${tika.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,20 @@
*/
package org.apache.nifi.processors.standard;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.util.Collection;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -53,50 +40,55 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.detect.EncodingDetector;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.HttpHeaders;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.mime.MediaType;
import org.apache.tika.mime.MimeType;
import org.apache.tika.mime.MimeTypeException;
import org.apache.tika.mime.MimeTypes;
import org.apache.tika.mime.MimeTypesFactory;
import org.apache.tika.mime.MimeTypeException;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


/**
* <p>
* Attempts to detect the MIME Type of a FlowFile by examining its contents. If the MIME Type is determined, it is added
* to an attribute with the name mime.type. In addition, mime.extension is set if a common file extension is known.
* </p>
*
* <p>
* MIME Type detection is performed by Apache Tika; more information about detection is available at http://tika.apache.org.
*
* <ul>
* <li>application/flowfile-v3</li>
* <li>application/flowfile-v1</li>
* </ul>
* </p>
*/
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"compression", "gzip", "bzip2", "zip", "MIME", "mime.type", "file", "identify"})
@CapabilityDescription("Attempts to identify the MIME Type used for a FlowFile. If the MIME Type can be identified, "
+ "an attribute with the name 'mime.type' is added with the value being the MIME Type. If the MIME Type cannot be determined, "
+ "the value will be set to 'application/octet-stream'. In addition, the attribute mime.extension will be set if a common file "
+ "extension for the MIME Type is known.")
+ "the value will be set to 'application/octet-stream'. In addition, the attribute 'mime.extension' will be set if a common file "
+ "extension for the MIME Type is known. If the MIME Type detected is of type text/*, attempts to identify the charset used " +
"and an attribute with the name 'mime.charset' is added with the value being the charset.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. "
+ "If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream"),
@WritesAttribute(attribute = "mime.extension", description = "This Processor sets the FlowFile's mime.extension attribute to the file "
+ "extension associated with the detected MIME Type. "
+ "If there is no correlated extension, the attribute's value will be empty")
@WritesAttribute(attribute = "mime.type", description = "This Processor sets the FlowFile's mime.type attribute to the detected MIME Type. "
+ "If unable to detect the MIME Type, the attribute's value will be set to application/octet-stream"),
@WritesAttribute(attribute = "mime.extension", description = "This Processor sets the FlowFile's mime.extension attribute to the file "
+ "extension associated with the detected MIME Type. "
+ "If there is no correlated extension, the attribute's value will be empty"),
@WritesAttribute(attribute = "mime.charset", description = "This Processor sets the FlowFile's mime.charset attribute to the detected charset. "
+ "If unable to detect the charset or the detected MIME type is not of type text/*, the attribute will not be set")
}
)
public class IdentifyMimeType extends AbstractProcessor {
Expand All @@ -105,13 +97,13 @@ public class IdentifyMimeType extends AbstractProcessor {
static final AllowableValue MERGE = new AllowableValue("Merge", "Merge", "Use config together with default NiFi MIME Types.");

public static final PropertyDescriptor USE_FILENAME_IN_DETECTION = new PropertyDescriptor.Builder()
.displayName("Use Filename In Detection")
.name("use-filename-in-detection")
.description("If true will pass the filename to Tika to aid in detection.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
.displayName("Use Filename In Detection")
.name("use-filename-in-detection")
.description("If true will pass the filename to Tika to aid in detection.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();

public static final PropertyDescriptor CONFIG_STRATEGY = new PropertyDescriptor.Builder()
.displayName("Config Strategy")
Expand Down Expand Up @@ -152,6 +144,7 @@ public class IdentifyMimeType extends AbstractProcessor {

private final TikaConfig config;
private Detector detector;
private EncodingDetector encodingDetector;
private MimeTypes mimeTypes;

public IdentifyMimeType() {
Expand Down Expand Up @@ -186,15 +179,17 @@ public void migrateProperties(PropertyConfiguration config) {
public void setup(final ProcessContext context) throws IOException {
String configStrategy = context.getProperty(CONFIG_STRATEGY).getValue();

if (configStrategy.equals(PRESET.getValue())){
if (configStrategy.equals(PRESET.getValue())) {
this.detector = config.getDetector();
this.mimeTypes = config.getMimeRepository();
} else {
setCustomMimeTypes(configStrategy, context);
}

this.encodingDetector = config.getEncodingDetector();
}

private void setCustomMimeTypes(String configStrategy, ProcessContext context) throws IOException {
private void setCustomMimeTypes(String configStrategy, ProcessContext context) {
String configBody = context.getProperty(MIME_CONFIG_BODY).getValue();
String configFile = context.getProperty(MIME_CONFIG_FILE).evaluateAttributeExpressions().getValue();

Expand Down Expand Up @@ -233,53 +228,64 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final ComponentLog logger = getLogger();
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream stream) throws IOException {
try (final InputStream in = new BufferedInputStream(stream);
final TikaInputStream tikaStream = TikaInputStream.get(in)) {
Metadata metadata = new Metadata();

if (filename != null && context.getProperty(USE_FILENAME_IN_DETECTION).asBoolean()) {
metadata.add(TikaCoreProperties.RESOURCE_NAME_KEY, filename);
}
// Get mime type
MediaType mediatype = detector.detect(tikaStream, metadata);
mimeTypeRef.set(mediatype.toString());
}

final String mediaTypeString;
final String extension;
final Charset charset;

try (final InputStream flowFileStream = session.read(flowFile);
final TikaInputStream tikaStream = TikaInputStream.get(flowFileStream)) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());

Metadata metadata = new Metadata();
if (filename != null && context.getProperty(USE_FILENAME_IN_DETECTION).asBoolean()) {
metadata.add(TikaCoreProperties.RESOURCE_NAME_KEY, filename);
}
});

String mimeType = mimeTypeRef.get();
final MediaType mediaType = detector.detect(tikaStream, metadata);
mediaTypeString = mediaType.getBaseType().toString();
extension = lookupExtension(mediaTypeString, logger);
charset = identifyCharset(tikaStream, metadata, mediaType);
} catch (IOException e) {
throw new ProcessException("Failed to identify MIME type from content stream", e);
}

flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mediaTypeString);
flowFile = session.putAttribute(flowFile, "mime.extension", extension);
if (charset != null) {
flowFile = session.putAttribute(flowFile, "mime.charset", charset.name());
}
logger.info("Identified {} as having MIME Type {}", flowFile, mediaTypeString);

session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
}

private String lookupExtension(String mediaTypeString, ComponentLog logger) {
String extension = "";
try {
MimeType mimetype;
mimetype = mimeTypes.forName(mimeType);
extension = mimetype.getExtension();
MimeType mimeType = mimeTypes.forName(mediaTypeString);
extension = mimeType.getExtension();
} catch (MimeTypeException e) {
logger.warn("MIME type extension lookup failed", e);
}

// Workaround for bug in Tika - https://issues.apache.org/jira/browse/TIKA-1563
if (mimeType != null && mimeType.equals("application/gzip") && extension.equals(".tgz")) {
if (mediaTypeString.equals("application/gzip") && extension.equals(".tgz")) {
extension = ".gz";
}
return extension;
}

private Charset identifyCharset(TikaInputStream tikaStream, Metadata metadata, MediaType mediaType) throws IOException {
// only mime-types text/* have a charset parameter
if (mediaType.getType().equals("text")) {
metadata.add(HttpHeaders.CONTENT_TYPE, mediaType.toString());

if (mimeType == null) {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/octet-stream");
flowFile = session.putAttribute(flowFile, "mime.extension", "");
logger.info("Unable to identify MIME Type for {}; setting to application/octet-stream", new Object[]{flowFile});
return encodingDetector.detect(tikaStream, metadata);
} else {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType);
flowFile = session.putAttribute(flowFile, "mime.extension", extension);
logger.info("Identified {} as having MIME Type {}", new Object[]{flowFile, mimeType});
return null;
}

session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public void testFiles() throws IOException {
expectedExtensions.put("grid.gif", ".gif");
expectedExtensions.put("2.custom", ".txt");

final Map<String, String> expectedCharsets = getCommonExpectedCharsets();
expectedCharsets.put("bgBannerFoot.png", null);
expectedCharsets.put("blueBtnBg.jpg", null);
expectedCharsets.put("grid.gif", null);
expectedCharsets.put("2.custom", "ISO-8859-1");

final List<MockFlowFile> filesOut = runner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);
for (final MockFlowFile file : filesOut) {
final String filename = file.getAttribute(CoreAttributes.FILENAME.key());
Expand All @@ -97,8 +103,12 @@ public void testFiles() throws IOException {
final String extension = file.getAttribute("mime.extension");
final String expectedExtension = expectedExtensions.get(filename);

assertEquals(expected, mimeType, "Expected " + file + " to have MIME Type " + expected + ", but it was " + mimeType);
assertEquals(expectedExtension, extension, "Expected " + file + " to have extension " + expectedExtension + ", but it was " + extension);
final String charset = file.getAttribute("mime.charset");
final String expectedCharset = expectedCharsets.get(filename);

assertEquals(expected, mimeType, "Expected " + file + " to have MIME Type \"" + expected + "\", but it was \"" + mimeType + "\"");
assertEquals(expectedExtension, extension, "Expected " + file + " to have extension \"" + expectedExtension + "\", but it was \"" + extension + "\"");
assertEquals(expectedCharset, charset, "Expected " + file + " to have charset \"" + expectedCharset + "\", but it was \"" + charset + "\"");
}
}

Expand Down Expand Up @@ -164,6 +174,7 @@ public void testReplaceWithConfigBody() throws IOException {
expectedMimeTypes.put("flowfilev1.tar", "application/octet-stream");
expectedMimeTypes.put("fake.csv", "text/plain");
expectedMimeTypes.put("2.custom", "custom/abcd");
expectedMimeTypes.put("charset-utf-8.txt", "text/plain");

final Map<String, String> expectedExtensions = new HashMap<>();
expectedExtensions.put("1.7z", "");
Expand All @@ -187,6 +198,7 @@ public void testReplaceWithConfigBody() throws IOException {
expectedExtensions.put("flowfilev1.tar", "");
expectedExtensions.put("fake.csv", "");
expectedExtensions.put("2.custom", ".abcd");
expectedExtensions.put("charset-utf-8.txt", "");

final List<MockFlowFile> filesOut = runner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);
for (final MockFlowFile file : filesOut) {
Expand Down Expand Up @@ -250,6 +262,7 @@ public void testReplaceWithConfigFile() throws IOException {
expectedMimeTypes.put("flowfilev1.tar", "application/octet-stream");
expectedMimeTypes.put("fake.csv", "text/plain");
expectedMimeTypes.put("2.custom", "text/plain");
expectedMimeTypes.put("charset-utf-8.txt", "text/plain");

final Map<String, String> expectedExtensions = new HashMap<>();
expectedExtensions.put("1.7z", "");
Expand All @@ -273,6 +286,7 @@ public void testReplaceWithConfigFile() throws IOException {
expectedExtensions.put("flowfilev1.tar", "");
expectedExtensions.put("fake.csv", "");
expectedExtensions.put("2.custom", "");
expectedExtensions.put("charset-utf-8.txt", "");

final List<MockFlowFile> filesOut = runner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);
for (final MockFlowFile file : filesOut) {
Expand Down Expand Up @@ -428,6 +442,7 @@ private Map<String, String> getCommonExpectedMimeTypes() {
expectedMimeTypes.put("flowfilev3WithXhtml", StandardFlowFileMediaType.VERSION_3.getMediaType());
expectedMimeTypes.put("flowfilev1.tar", StandardFlowFileMediaType.VERSION_1.getMediaType());
expectedMimeTypes.put("fake.csv", "text/csv");
expectedMimeTypes.put("charset-utf-8.txt", "text/plain");

return expectedMimeTypes;
}
Expand All @@ -452,8 +467,31 @@ private Map<String, String> getCommonExpectedExtensions() {
expectedExtensions.put("flowfilev3WithXhtml", "");
expectedExtensions.put("flowfilev1.tar", "");
expectedExtensions.put("fake.csv", ".csv");
expectedExtensions.put("charset-utf-8.txt", ".txt");

return expectedExtensions;
}

private static Map<String, String> getCommonExpectedCharsets() {
final Map<String, String> expectedCharsets = new HashMap<>();
expectedCharsets.put("1.7z", null);
expectedCharsets.put("1.mdb", null);
expectedCharsets.put("1.txt", "ISO-8859-1");
expectedCharsets.put("1.csv", "ISO-8859-1");
expectedCharsets.put("1.txt.bz2", null);
expectedCharsets.put("1.txt.gz", null);
expectedCharsets.put("1.zip", null);
expectedCharsets.put("1.pdf", null);
expectedCharsets.put("1.tar", null);
expectedCharsets.put("1.tar.gz", null);
expectedCharsets.put("1.jar", null);
expectedCharsets.put("1.xml", null);
expectedCharsets.put("1.xhtml", null);
expectedCharsets.put("flowfilev3", null);
expectedCharsets.put("flowfilev3WithXhtml", null);
expectedCharsets.put("flowfilev1.tar", null);
expectedCharsets.put("fake.csv", "ISO-8859-1");
expectedCharsets.put("charset-utf-8.txt", "UTF-8");
return expectedCharsets;
}
}
Loading

0 comments on commit 6c333cd

Please sign in to comment.