Skip to content

Commit

Permalink
TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve docum…
Browse files Browse the repository at this point in the history
…entation and remove the unused AsyncClientConfig.java
  • Loading branch information
tballison committed Jul 28, 2021
1 parent f181fc4 commit ead2e75
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 82 deletions.
13 changes: 3 additions & 10 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.utils.ProcessUtils;

Expand Down Expand Up @@ -101,8 +100,8 @@ public PipesResult process(FetchEmitTuple t) throws IOException {
if (! ping()) {
restart();
}
if (pipesConfig.getMaxFilesProcessed() > 0 &&
filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
LOG.info("restarting server after hitting max files: " + filesProcessed);
restart();
}
Expand Down Expand Up @@ -356,13 +355,7 @@ private String[] getCommandline() {
commandLine.add(
ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));

//turn off emit batching
String maxForEmitBatchBytes = "0";
if (pipesConfig instanceof AsyncConfig) {
maxForEmitBatchBytes =
Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
}
commandLine.add(maxForEmitBatchBytes);
commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
LOG.debug("commandline: " + commandLine);
Expand Down
75 changes: 66 additions & 9 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,46 @@

public class PipesConfigBase extends ConfigBase {

private long timeoutMillis = 30000;
private long startupTimeoutMillis = 240000;
private long shutdownClientAfterMillis = 300000;
private int numClients = 10;
/**
* default size to send back to the PipesClient for batch
* emitting. If an extract is larger than this, it will be emitted
* directly from the forked PipesServer.
*/
public static final long DEFAULT_MAX_FOR_EMIT_BATCH = 100000;

public static final long DEFAULT_TIMEOUT_MILLIS = 60000;

public static final long DEFAULT_STARTUP_TIMEOUT_MILLIS = 240000;

public static final long DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS = 300000;

public static final int DEFAULT_NUM_CLIENTS = 4;

public static final int DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS = 10000;

//if an extract is larger than this, the forked PipesServer should
//emit the extract directly and not send the contents back to the PipesClient
private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;

private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
private int numClients = DEFAULT_NUM_CLIENTS;

private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;

private List<String> forkedJvmArgs = new ArrayList<>();
private int maxFilesProcessed = 10000;
private Path tikaConfig;
private String javaPath = "java";

public long getTimeoutMillis() {
return timeoutMillis;
}

/**
* How long to wait in milliseconds before timing out the forked process.
* @param timeoutMillis
*/
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
Expand All @@ -46,6 +73,12 @@ public long getShutdownClientAfterMillis() {
return shutdownClientAfterMillis;
}

/**
* If the client has been inactive after this many milliseconds,
* shut it down.
*
* @param shutdownClientAfterMillis
*/
public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
this.shutdownClientAfterMillis = shutdownClientAfterMillis;
}
Expand All @@ -66,12 +99,17 @@ public void setForkedJvmArgs(List<String> jvmArgs) {
this.forkedJvmArgs = jvmArgs;
}

public int getMaxFilesProcessed() {
return maxFilesProcessed;
/**
* Restart the forked PipesServer after it has processed this many files to avoid
* slow-building memory leaks.
* @return
*/
public int getMaxFilesProcessedPerProcess() {
return maxFilesProcessedPerProcess;
}

public void setMaxFilesProcessed(int maxFilesProcessed) {
this.maxFilesProcessed = maxFilesProcessed;
public void setMaxFilesProcessedPerProcess(int maxFilesProcessedPerProcess) {
this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
}

public Path getTikaConfig() {
Expand All @@ -97,4 +135,23 @@ public void setJavaPath(String javaPath) {
public long getStartupTimeoutMillis() {
return startupTimeoutMillis;
}

/**
* What is the maximum bytes size per extract that
* will be allowed to be shipped back to the emit queue in the forking process.
* If an extract is too big, skip the emit queue and forward it directly from the
* forked PipesServer.
* If set to <code>0</code>, this will never send an extract back for batch emitting,
* but will always emit the extract directly from the forked PipeServer.
* If set to <code>-1</code>, this will always send the extract back for batch emitting.
*
* @return the threshold extract size at which to emit directly from the forked PipeServer
*/
public long getMaxForEmitBatchBytes() {
return maxForEmitBatchBytes;
}

public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
}
}
24 changes: 12 additions & 12 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public static STATUS lookup(int val) {
private final Path tikaConfigPath;
private final DataInputStream input;
private final DataOutputStream output;
private final long maxExtractSizeToReturn;
//if an extract is larger than this value, emit it directly;
//if it is smaller than this value, write it back to the
//PipesClient so that it can cache the extracts and then batch emit.
private final long maxForEmitBatchBytes;
private final long serverParseTimeoutMillis;
private final long serverWaitTimeoutMillis;
private Parser autoDetectParser;
Expand All @@ -128,17 +131,15 @@ public static STATUS lookup(int val) {
private volatile boolean parsing;
private volatile long since;

//logging is fussy...the logging frameworks grab stderr and stdout
//before we can redirect. slf4j complains on stderr, log4j2 unconfigured writes to stdout
//We can add logging later but it has to be done carefully...

public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
long maxExtractSizeToReturn,
long maxForEmitBatchBytes,
long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
throws IOException, TikaException, SAXException {
this.tikaConfigPath = tikaConfigPath;
this.input = new DataInputStream(in);
this.output = new DataOutputStream(out);
this.maxExtractSizeToReturn = maxExtractSizeToReturn;
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
this.serverParseTimeoutMillis = serverParseTimeoutMillis;
this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
this.parsing = false;
Expand Down Expand Up @@ -334,14 +335,14 @@ private void actuallyParse(FetchEmitTuple t) {
t.setEmitKey(emitKey);
}
EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
emit(t.getId(), emitData, stack);
} else {
write(emitData, stack);
//ignore the stack, it is stored in the emit data
write(emitData);
}
} else {
write(STATUS.PARSE_EXCEPTION_NO_EMIT,
stack.getBytes(StandardCharsets.UTF_8));
write(STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
}

}
Expand Down Expand Up @@ -509,8 +510,7 @@ private void initializeParser() throws TikaException, IOException, SAXException
}


private void write(EmitData emitData, String stack) {
//TODO -- what do we do with the stack?
private void write(EmitData emitData) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class AsyncConfig extends PipesConfigBase {
private long emitWithinMillis = 10000;
private long emitMaxEstimatedBytes = 100000;

private long maxForEmitBatchBytes = 0;
private int queueSize = 10000;
private int numEmitters = 1;

Expand Down Expand Up @@ -73,22 +72,6 @@ public void setEmitMaxEstimatedBytes(long emitMaxEstimatedBytes) {
}


/**
* What is the maximum bytes size per extract that
* will be allowed in the emit queue. If an extract is too
* big, skip the emit queue and forward it directly from the processor. If
* set to <code>0</code>, this will never send an extract back for batch emitting,
* but will emit the extract directly from the processor.
* @return
*/
public long getMaxForEmitBatchBytes() {
return maxForEmitBatchBytes;
}

public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
}

public void setNumEmitters(int numEmitters) {
this.numEmitters = numEmitters;
}
Expand Down

0 comments on commit ead2e75

Please sign in to comment.