Skip to content

Commit

Permalink
[FLINK-20718][metrics] Add busyTimeMsPerSecond metric
Browse files Browse the repository at this point in the history
It's defined as inverted value of idleTimeMsPerSecond
  • Loading branch information
pnowojski committed Jan 6, 2021
1 parent 9c68f02 commit a137963
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>busyTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
Expand Down
5 changes: 5 additions & 0 deletions docs/ops/metrics.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>busyTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ public static String currentInputWatermarkName(int index) {
}

public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
public static final String TASK_BUSY_TIME = "busyTimeMs" + SUFFIX_RATE;
public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
Expand Down Expand Up @@ -46,8 +47,11 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numRecordsOutRate;
private final Meter numBuffersOutRate;
private final Meter idleTimePerSecond;
private final Gauge busyTimePerSecond;
private final Meter backPressuredTimePerSecond;

private volatile boolean busyTimeEnabled;

public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);

Expand All @@ -71,6 +75,7 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter()));
this.backPressuredTimePerSecond =
meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter()));
this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
}

public IOMetrics createSnapshot() {
Expand Down Expand Up @@ -109,6 +114,15 @@ public Meter getBackPressuredTimePerSecond() {
return backPressuredTimePerSecond;
}

public void setEnableBusyTime(boolean enabled) {
busyTimeEnabled = enabled;
}

private double getBusyTimePerSecond() {
double busyTime = idleTimePerSecond.getRate() + backPressuredTimePerSecond.getRate();
return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
}

// ============================================================================================
// Metric Reuse
// ============================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ private SourceStreamTask(Environment env, Object lock) throws Exception {
StreamTaskActionExecutor.synchronizedExecutor(lock));
this.lock = Preconditions.checkNotNull(lock);
this.sourceThread = new LegacySourceFunctionThread();

getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ protected StreamTask(
new ExecutorThreadFactory("channel-state-unspilling"));

injectChannelStateWriterIntoChannels();

environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
}

private void injectChannelStateWriterIntoChannels() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testOpenClose() throws Exception {
}

@Test(timeout = 60_000)
public void testStartDelayMetric() throws Exception {
public void testMetrics() throws Exception {
long sleepTime = 42;
StreamTaskMailboxTestHarnessBuilder<String> builder =
new StreamTaskMailboxTestHarnessBuilder<>(
Expand Down Expand Up @@ -145,6 +145,8 @@ public void testStartDelayMetric() throws Exception {
(Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
assertThat(
checkpointStartDelayGauge.getValue(), greaterThanOrEqualTo(sleepTime * 1_000_000));
Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME);
assertTrue(Double.isNaN(busyTimeGauge.getValue()));
}

/**
Expand Down

0 comments on commit a137963

Please sign in to comment.