Skip to content

Commit

Permalink
TIKA-1330 clean up logging in tika-batch ant tika-app integration of …
Browse files Browse the repository at this point in the history
…tika-batch, take 2

git-svn-id: https://svn.apache.org/repos/asf/tika/trunk@1670751 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
tballison committed Apr 1, 2015
1 parent 4f35d50 commit f5cb417
Showing 1 changed file with 121 additions and 124 deletions.
245 changes: 121 additions & 124 deletions tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,61 +20,61 @@
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;


/**
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;


/**
* This is a base class for file consumers. The
* goal of this class is to abstract out the multithreading
* and recordkeeping components.
* <p/>
*/
public abstract class FileResourceConsumer implements Callable<IFileProcessorFutureResult> {

private enum STATE {
NOT_YET_STARTED,
ACTIVELY_CONSUMING,
SWALLOWED_POISON,
*/
public abstract class FileResourceConsumer implements Callable<IFileProcessorFutureResult> {

private enum STATE {
NOT_YET_STARTED,
ACTIVELY_CONSUMING,
SWALLOWED_POISON,
THREAD_INTERRUPTED,
EXCEEDED_MAX_CONSEC_WAIT_MILLIS,
ASKED_TO_SHUTDOWN,
TIMED_OUT,
CONSUMER_EXCEPTION,
CONSUMER_ERROR,
COMPLETED
}

public static String TIMED_OUT = "timed_out";
public static String OOM = "oom";
public static String IO_IS = "io_on_inputstream";
public static String IO_OS = "io_on_outputstream";
public static String PARSE_ERR = "parse_err";
public static String PARSE_EX = "parse_ex";

public static String ELAPSED_MILLIS = "elapsedMS";

private static AtomicInteger numConsumers = new AtomicInteger(-1);
protected static Logger logger = LoggerFactory.getLogger(FileResourceConsumer.class);

private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes

COMPLETED
}

public static String TIMED_OUT = "timed_out";
public static String OOM = "oom";
public static String IO_IS = "io_on_inputstream";
public static String IO_OS = "io_on_outputstream";
public static String PARSE_ERR = "parse_err";
public static String PARSE_EX = "parse_ex";

public static String ELAPSED_MILLIS = "elapsedMS";

private static AtomicInteger numConsumers = new AtomicInteger(-1);
protected static Logger logger = LoggerFactory.getLogger(FileResourceConsumer.class);

private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes

private final ArrayBlockingQueue<FileResource> fileQueue;

private final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
Expand Down Expand Up @@ -257,35 +257,35 @@ public FileStarted checkForTimedOutMillis(long staleThresholdMillis) {
FileStarted tmp = currentFile;
if (tmp == null) {
return null;
}
if (tmp.getElapsedMillis() > staleThresholdMillis) {
setEndedState(STATE.TIMED_OUT);
logger.error("{}", getXMLifiedLogMsg(
TIMED_OUT,
tmp.getResourceId(),
ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis())));
return tmp;
}
}
return null;
}

protected String getXMLifiedLogMsg(String type, String resourceId, String... attrs) {
return getXMLifiedLogMsg(type, resourceId, null, attrs);
}

/**
* Use this for structured output that captures resourceId and other attributes.
*
* @param type entity name for exception
* @param resourceId resourceId string
* @param t throwable can be null
* @param attrs (array of key0, value0, key1, value1, etc.)
*/
protected String getXMLifiedLogMsg(String type, String resourceId, Throwable t, String... attrs) {

StringWriter writer = new StringWriter();
try {
}
if (tmp.getElapsedMillis() > staleThresholdMillis) {
setEndedState(STATE.TIMED_OUT);
logger.error("{}", getXMLifiedLogMsg(
TIMED_OUT,
tmp.getResourceId(),
ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis())));
return tmp;
}
}
return null;
}

protected String getXMLifiedLogMsg(String type, String resourceId, String... attrs) {
return getXMLifiedLogMsg(type, resourceId, null, attrs);
}

/**
* Use this for structured output that captures resourceId and other attributes.
*
* @param type entity name for exception
* @param resourceId resourceId string
* @param t throwable can be null
* @param attrs (array of key0, value0, key1, value1, etc.)
*/
protected String getXMLifiedLogMsg(String type, String resourceId, Throwable t, String... attrs) {

StringWriter writer = new StringWriter();
try {
XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(writer);
xml.writeStartDocument();
xml.writeStartElement(type);
Expand All @@ -306,13 +306,13 @@ protected String getXMLifiedLogMsg(String type, String resourceId, Throwable t,
xml.writeEndDocument();
xml.flush();
xml.close();
} catch (XMLStreamException e) {
logger.error("error writing xml stream for: " + resourceId, t);
}
return writer.toString();
}

private FileResource getNextFileResource() throws InterruptedException {
} catch (XMLStreamException e) {
logger.error("error writing xml stream for: " + resourceId, t);
}
return writer.toString();
}

private FileResource getNextFileResource() throws InterruptedException {
FileResource fileResource = null;
long start = new Date().getTime();
while (fileResource == null) {
Expand Down Expand Up @@ -382,49 +382,46 @@ private void setEndedState(STATE cause) {
synchronized(lock) {
if (currentState == STATE.NOT_YET_STARTED ||
currentState == STATE.ACTIVELY_CONSUMING ||
currentState == STATE.ASKED_TO_SHUTDOWN) {
currentState = cause;
}
}
}

/**
* Utility method to handle logging equivalently among all
* implementing classes. Use, override or avoid as desired.
* <p>
* This will throw Errors, but it will catch all Exceptions and log them
* @param resourceId resourceId
* @param parser parser to use
* @param is inputStream (will be closed by this method!)
* @param handler handler for the content
* @param m metadata
* @param parseContext parse context
* @throws Throwable
*/
protected void parse(final String resourceId, final Parser parser, InputStream is,
final ContentHandler handler,
final Metadata m, final ParseContext parseContext) throws Throwable {

try {
parser.parse(is, handler, m, parseContext);
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
logger.error(getXMLifiedLogMsg(OOM,
resourceId, t));
throw t;
} else if (t instanceof Error) {
logger.error(getXMLifiedLogMsg(PARSE_ERR,
resourceId, t));
throw t;
} else {
//warn, but do not rethrow
logger.warn(getXMLifiedLogMsg(PARSE_EX,
resourceId, t));
incrementHandledExceptions();
}
} finally {
close(is);
}
}

currentState == STATE.ASKED_TO_SHUTDOWN) {
currentState = cause;
}
}
}

/**
* Utility method to handle logging equivalently among all
* implementing classes. Use, override or avoid as desired.
*
* @param resourceId resourceId
* @param parser parser to use
* @param is inputStream (will be closed by this method!)
* @param handler handler for the content
* @param m metadata
* @param parseContext parse context
* @throws Throwable (logs and then throws whatever was thrown (if anything)
*/
protected void parse(final String resourceId, final Parser parser, InputStream is,
final ContentHandler handler,
final Metadata m, final ParseContext parseContext) throws Throwable {

try {
parser.parse(is, handler, m, parseContext);
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
logger.error(getXMLifiedLogMsg(OOM,
resourceId, t));
} else if (t instanceof Error) {
logger.error(getXMLifiedLogMsg(PARSE_ERR,
resourceId, t));
} else {
logger.warn(getXMLifiedLogMsg(PARSE_EX,
resourceId, t));
incrementHandledExceptions();
}
throw t;
} finally {
close(is);
}
}

}

0 comments on commit f5cb417

Please sign in to comment.