Skip to content

Commit

Permalink
TEZ-2896. Fix thread names used during Input/Output initialization. (…
Browse files Browse the repository at this point in the history
…sseth)
  • Loading branch information
sidseth committed Oct 15, 2015
1 parent b421e4f commit 25f0247
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-2896. Fix thread names used during Input/Output initialization.
TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
TEZ-2887. Tez build failure due to missing dependency in pom files.
TEZ-1692. Reduce code duplication between TezMapredSplitsGrouper and TezMapreduceSplitsGrouper.
Expand Down Expand Up @@ -214,6 +215,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES
TEZ-2896. Fix thread names used during Input/Output initialization.
TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
TEZ-2885. Remove counter logs from AMWebController.
TEZ-2887. Tez build failure due to missing dependency in pom files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
this.initializerExecutor = Executors.newFixedThreadPool(
numInitializers,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Initializer %d").build());
.setNameFormat("I/O Setup %d").build());
this.initializerCompletionService = new ExecutorCompletionService<Void>(
this.initializerExecutor);
this.groupInputSpecs = taskSpec.getGroupInputs();
Expand Down Expand Up @@ -410,15 +410,14 @@ public InitializeInputCallable(InputSpec inputSpec, int inputIndex) {
protected Void callInternal() throws Exception {
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(oldThreadName + "{" + inputSpec.getSourceVertexName() + "}");
Thread.currentThread().setName(oldThreadName + " Initialize: {" + inputSpec.getSourceVertexName() + "}");
return _callInternal();
} finally {
Thread.currentThread().setName(oldThreadName);
}
}

protected Void _callInternal() throws Exception {

if (LOG.isDebugEnabled()) {
LOG.debug("Initializing Input using InputSpec: " + inputSpec);
}
Expand Down Expand Up @@ -464,7 +463,6 @@ protected Void callInternal() throws Exception {
}

protected Void _callInternal() throws Exception {
Thread.currentThread().setName("InitializerStart {" + srcVertexName + "}");
if (LOG.isDebugEnabled()) {
LOG.debug("Starting Input with src edge: " + srcVertexName);
}
Expand All @@ -489,15 +487,14 @@ public InitializeOutputCallable(OutputSpec outputSpec, int outputIndex) {
protected Void callInternal() throws Exception {
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(oldThreadName + "{" + outputSpec.getDestinationVertexName() + "}");
Thread.currentThread().setName(oldThreadName + " Initialize: {" + outputSpec.getDestinationVertexName() + "}");
return _callInternal();
} finally {
Thread.currentThread().setName(oldThreadName);
}
}

protected Void _callInternal() throws Exception {
Thread.currentThread().setName("Initializer {" + outputSpec.getDestinationVertexName() + "}");
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing Output using OutputSpec: " + outputSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifie

LOG.info(srcNameTrimmed + ": "
+ "RequestedMemory=" + memReq
+ ", AssignedMemorty=" + this.memoryLimit
+ ", AssignedMemory=" + this.memoryLimit
+ ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit
);

Expand Down

0 comments on commit 25f0247

Please sign in to comment.