Skip to content

Commit

Permalink
[FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSample…
Browse files Browse the repository at this point in the history
…Coordinator

This closes apache#1732.
  • Loading branch information
uce committed Feb 29, 2016
1 parent 734ba01 commit 9580b8f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -170,6 +171,10 @@ public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
if (executionContext != null) {
pendingStats.add(vertex);

if (LOG.isDebugEnabled()) {
LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
}

Future<StackTraceSample> sample = coordinator.triggerStackTraceSample(
vertex.getTaskVertices(),
numSamples,
Expand Down Expand Up @@ -246,7 +251,7 @@ public void onComplete(Throwable failure, StackTraceSample success) throws Throw
OperatorBackPressureStats stats = createStatsFromSample(success);
operatorStatsCache.put(vertex, stats);
} else {
LOG.warn("Failed to gather stack trace sample.", failure);
LOG.debug("Failed to gather stack trace sample.", failure);
}
} catch (Throwable t) {
LOG.error("Error during stats completion.", t);
Expand Down Expand Up @@ -278,7 +283,7 @@ private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample)
if (sampledTasks.contains(taskId)) {
subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
} else {
throw new RuntimeException("Outdated sample. A task, which is part of the " +
LOG.debug("Outdated sample. A task, which is part of the " +
"sample has been reset.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ public void run() {
pending.getSampleId());

pending.discard(new RuntimeException("Time out"));
pendingSamples.remove(pending.getSampleId());
if (pendingSamples.remove(pending.getSampleId()) != null) {
rememberRecentSampleId(pending.getSampleId());
}
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -319,7 +321,9 @@ public void collectStackTraces(
sampleId, executionId);
}
} else {
throw new IllegalStateException("Unknown sample ID " + sampleId);
if (LOG.isDebugEnabled()) {
LOG.debug("Unknown sample ID " + sampleId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,13 @@ public void testTriggerStackTraceSampleTimeout() throws Exception {
Throwable cause = sampleFuture.failed().value().get().get();
assertTrue(cause.getCause().getMessage().contains("Time out"));

// Collect after the timeout
try {
ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
}
// Collect after the timeout (should be ignored)
ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
}

/** Tests that collecting an unknown sample fails. */
@Test(expected = IllegalStateException.class)
/** Tests that collecting an unknown sample is ignored. */
@Test
public void testCollectStackTraceForUnknownSample() throws Exception {
coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ object StackTraceSampleMessages {
sampleId: Int,
executionId: ExecutionAttemptID,
samples: java.util.List[Array[StackTraceElement]])
extends StackTraceSampleMessages
extends StackTraceSampleMessages {

override def toString: String =
s"ResponseStackTraceSampleSuccess($sampleId, $executionId, ${samples.size()} samples)"
}

/**
* Response after a failed stack trace sample (sent by the task managers to
Expand Down

0 comments on commit 9580b8f

Please sign in to comment.