Skip to content

Commit

Permalink
[FLINK-18662][metrics] Expose persisted and processed bytes metrics t…
Browse files Browse the repository at this point in the history
…o the REST and Web UI
  • Loading branch information
pnowojski committed Oct 13, 2020
1 parent adad56b commit fbd9321
Show file tree
Hide file tree
Showing 31 changed files with 566 additions and 70 deletions.
58 changes: 58 additions & 0 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1562,6 +1568,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1593,6 +1605,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1639,6 +1657,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1694,6 +1718,14 @@
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"persisted_data" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"processed_data" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"state_size" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics",
Expand Down Expand Up @@ -1868,6 +1900,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1899,6 +1937,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1983,6 +2027,12 @@
"num_subtasks" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"state_size" : {
"type" : "integer"
},
Expand Down Expand Up @@ -2020,6 +2070,14 @@
"duration" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"persisted" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"processed" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
}
}
},
Expand Down
58 changes: 58 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,14 @@
"alignment_buffered" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"processed_data" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"persisted_data" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
}
}
},
Expand Down Expand Up @@ -975,6 +983,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1006,6 +1020,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1056,6 +1076,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1128,6 +1154,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1248,6 +1280,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1279,6 +1317,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1333,6 +1377,12 @@
"alignment_buffered" : {
"type" : "integer"
},
"processed_data" : {
"type" : "integer"
},
"persisted_data" : {
"type" : "integer"
},
"num_subtasks" : {
"type" : "integer"
},
Expand Down Expand Up @@ -1384,6 +1434,14 @@
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"processed" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"persisted" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
},
"duration" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:MinMaxAvgStatistics"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<th><strong>Latest Acknowledgment</strong></th>
<th><strong>End to End Duration</strong></th>
<th><strong>Checkpointed Data Size</strong></th>
<th><strong>Buffered During Alignment</strong></th>
<th><strong>Processed (persisted) in-flight data</strong></th>
</tr>
</thead>
<tbody *ngIf="checkPointDetail">
Expand All @@ -60,7 +60,7 @@
<td *ngIf="checkPointDetail['tasks'][vertex.id]['end_to_end_duration'] >= 0">{{ checkPointDetail['tasks'][vertex.id]['end_to_end_duration'] | humanizeDuration}}</td>
<td *ngIf="checkPointDetail['tasks'][vertex.id]['end_to_end_duration'] <0">n/a</td>
<td>{{ checkPointDetail['tasks'][vertex.id]['state_size'] | humanizeBytes }}</td>
<td>{{ checkPointDetail['tasks'][vertex.id]['alignment_buffered'] | humanizeBytes }}</td>
<td>{{ checkPointDetail['tasks'][vertex.id]['processed_data'] | humanizeBytes }} ({{ checkPointDetail['tasks'][vertex.id]['persisted_data'] | humanizeBytes }})</td>
</tr>
<tr [nzExpand]="vertex.expand">
<td colspan="7" *ngIf="vertex.expand" class="collapse-td">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
<th><strong>Latest Acknowledgement</strong></th>
<th><strong>End to End Duration</strong></th>
<th><strong>Checkpointed Data Size</strong></th>
<th><strong>Buffered During Alignment</strong></th>
<th><strong>Processed (persisted) in-flight data</strong></th>
</tr>
</thead>
<tbody>
Expand All @@ -151,7 +151,7 @@
<td *ngIf="checkpoint['end_to_end_duration'] >= 0">{{ checkpoint['end_to_end_duration'] | humanizeDuration}}</td>
<td *ngIf="checkpoint['end_to_end_duration'] <0">n/a</td>
<td>{{ checkpoint['state_size'] | humanizeBytes }}</td>
<td>{{ checkpoint['alignment_buffered'] | humanizeBytes }}</td>
<td>{{ checkpoint['processed_data'] | humanizeBytes }} ({{ checkpoint['persisted_data'] | humanizeBytes }})</td>
</tr>
<tr [nzExpand]="checkpoint.expand">
<td colspan="11" *ngIf="checkpoint.expand" class="collapse-td">
Expand All @@ -176,7 +176,7 @@
<th></th>
<th><strong>End to End Duration</strong></th>
<th><strong>Checkpointed Data Size</strong></th>
<th><strong>Buffered During Alignment</strong></th>
<th><strong>Processed (persisted) in-flight data</strong></th>
</tr>
</thead>
<tbody>
Expand All @@ -185,19 +185,19 @@
<td><strong>Minimum</strong></td>
<td>{{ checkPointStats['summary']['end_to_end_duration']['min'] | humanizeDuration}}</td>
<td>{{ checkPointStats['summary']['state_size']['min'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['alignment_buffered']['min'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['processed_data']['min'] | humanizeBytes }} ({{ checkPointStats['summary']['persisted_data']['min'] | humanizeBytes }})</td>
</tr>
<tr>
<td><strong>Average</strong></td>
<td>{{ checkPointStats['summary']['end_to_end_duration']['avg'] | humanizeDuration}}</td>
<td>{{ checkPointStats['summary']['state_size']['avg'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['alignment_buffered']['avg'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['processed_data']['avg'] | humanizeBytes }} ({{ checkPointStats['summary']['persisted_data']['avg'] | humanizeBytes }})</td>
</tr>
<tr>
<td><strong>Maximum</strong></td>
<td>{{ checkPointStats['summary']['end_to_end_duration']['max'] | humanizeDuration}}</td>
<td>{{ checkPointStats['summary']['state_size']['max'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['alignment_buffered']['max'] | humanizeBytes }}</td>
<td>{{ checkPointStats['summary']['processed_data']['max'] | humanizeBytes }} ({{ checkPointStats['summary']['persisted_data']['max'] | humanizeBytes }})</td>
</tr>
</ng-container>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<th><strong>Checkpointed Data Size</strong></th>
<th><strong>Sync Duration</strong></th>
<th><strong>Async Duration</strong></th>
<th><strong>Alignment Buffered</strong></th>
<th><strong>Processed (persisted) Data</strong></th>
<th><strong>Alignment Duration</strong></th>
<th><strong>Start Delay</strong></th>
</tr>
Expand All @@ -45,7 +45,7 @@
<td>{{ subTaskCheckPoint['summary']['state_size']['min'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['sync']['min'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['async']['min'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['buffered']['min'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['processed']['min'] | humanizeBytes }} ({{ subTaskCheckPoint['summary']['alignment']['persisted']['min'] | humanizeBytes }})</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['duration']['min'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['start_delay']['min'] | humanizeDuration}}</td>
</tr>
Expand All @@ -55,7 +55,7 @@
<td>{{ subTaskCheckPoint['summary']['state_size']['avg'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['sync']['avg'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['async']['avg'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['buffered']['avg'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['processed']['avg'] | humanizeBytes }} ({{ subTaskCheckPoint['summary']['alignment']['persisted']['avg'] | humanizeBytes }})</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['duration']['avg'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['start_delay']['avg'] | humanizeDuration}}</td>
</tr>
Expand All @@ -65,7 +65,7 @@
<td>{{ subTaskCheckPoint['summary']['state_size']['max'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['sync']['max'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['checkpoint_duration']['async']['max'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['buffered']['max'] | humanizeBytes }}</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['processed']['max'] | humanizeBytes }} ({{ subTaskCheckPoint['summary']['alignment']['persisted']['max'] | humanizeBytes }})</td>
<td>{{ subTaskCheckPoint['summary']['alignment']['duration']['max'] | humanizeDuration}}</td>
<td>{{ subTaskCheckPoint['summary']['start_delay']['max'] | humanizeDuration}}</td>
</tr>
Expand All @@ -88,7 +88,7 @@
<th nzSortKey="state_size" nzShowSort><strong>Checkpointed Data Size</strong></th>
<th nzSortKey="checkpoint.sync" nzShowSort><strong>Sync Duration</strong></th>
<th nzSortKey="checkpoint.async" nzShowSort><strong>Async Duration</strong></th>
<th nzSortKey="alignment.buffered" nzShowSort><strong>Alignment Buffered</strong></th>
<th nzSortKey="alignment.processed" nzShowSort><strong>Processed (persisted) Data</strong></th>
<th nzSortKey="alignment.duration" nzShowSort><strong>Alignment Duration</strong></th>
<th nzSortKey="start_delay" nzShowSort><strong>Start Delay</strong></th>
</tr>
Expand All @@ -102,7 +102,7 @@
<td>{{ subTask['state_size'] | humanizeBytes }}</td>
<td>{{ subTask['checkpoint']['sync'] | humanizeDuration}}</td>
<td>{{ subTask['checkpoint']['async'] | humanizeDuration}}</td>
<td>{{ subTask['alignment']['buffered'] | humanizeBytes}}</td>
<td>{{ subTask['alignment']['processed'] | humanizeBytes }} ({{ subTask['alignment']['persisted'] | humanizeBytes }})</td>
<td>{{ subTask['alignment']['duration'] | humanizeDuration}}</td>
<td>{{ subTask['start_delay'] | humanizeDuration}}</td>
</ng-container>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ public abstract class AbstractCheckpointStats implements Serializable {
*/
public abstract long getStateSize();

/**
* @return the total number of processed bytes during the checkpoint.
*/
public abstract long getProcessedData();

/**
* @return the total number of persisted bytes during the checkpoint.
*/
public abstract long getPersistedData();

/**
* Returns the latest acknowledged subtask stats or <code>null</code> if
* none was acknowledged yet.
Expand Down
Loading

0 comments on commit fbd9321

Please sign in to comment.