Skip to content

Commit

Permalink
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control p…
Browse files Browse the repository at this point in the history
…rocessor initialization relative to Inputs/Outputs (jeagles)
  • Loading branch information
Jonathan Eagles committed Apr 14, 2016
1 parent a998677 commit 4675a65
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3202. Reduce the memory need for jobs with high number of segments
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs

Release 0.8.3: 2016-04-14

Expand Down Expand Up @@ -434,6 +435,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.

ALL CHANGES:
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs
TEZ-3202. Reduce the memory need for jobs with high number of segments
TEZ-3188. Move tez.submit.hosts out of TezConfiguration to TezConfigurationConstants.
TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle
Expand Down
22 changes: 22 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,28 @@ public TezConfiguration(boolean loadDefaults) {
"max-event-backlog";
public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000;

/**
* Boolean value. Backwards compatibility setting for initializing IO processor before
* inputs and outputs.
* Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_TASK_INITIALIZE_PROCESSOR_FIRST = TEZ_TASK_PREFIX +
"initialize-processor-first";
public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT = false;

/**
* Boolean value. Backwards compatibility setting for initializing inputs and outputs
* serially instead of the parallel default.
* Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY = TEZ_TASK_PREFIX +
"initialize-processor-io-serially";
public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false;

/**
* Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
* components need to make successive progress notifications. If the progress is not notified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final HadoopShim hadoopShim;
private final int maxEventBacklog;

private final boolean initializeProcessorFirst;
private final boolean initializeProcessorIOSerially;

public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
Expand Down Expand Up @@ -189,8 +192,15 @@ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
this.state.set(State.NEW);
this.appAttemptNumber = appAttemptNumber;
this.initializeProcessorFirst = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST,
TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT);
this.initializeProcessorIOSerially = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY,
TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT);
int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread.
numInitializers = (numInitializers == 0 ? 1 : numInitializers);
if (initializeProcessorIOSerially) {
numInitializers = 1;
}
this.initializerExecutor = Executors.newFixedThreadPool(
numInitializers,
new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -219,6 +229,10 @@ public void initialize() throws Exception {
this.processorContext = createProcessorContext();
this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);

if (initializeProcessorFirst || initializeProcessorIOSerially) {
// Initialize processor in the current thread.
initializeLogicalIOProcessor();
}
int numTasks = 0;

int inputIndex = 0;
Expand All @@ -235,9 +249,10 @@ public void initialize() throws Exception {
numTasks++;
}

// Initialize processor in the current thread.
initializeLogicalIOProcessor();

if (!(initializeProcessorFirst || initializeProcessorIOSerially)) {
// Initialize processor in the current thread.
initializeLogicalIOProcessor();
}
int completedTasks = 0;
while (completedTasks < numTasks) {
LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
Expand Down

0 comments on commit 4675a65

Please sign in to comment.