Skip to content

Commit

Permalink
KAFKA-9924: Add remaining property-based RocksDB metrics as described…
Browse files Browse the repository at this point in the history
… in KIP-607 (apache#9232)

This commit adds the remaining property-based RocksDB metrics as described in KIP-607, except for num-entries-active-mem-table, which was added in PR apache#9177.

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
cadonna authored Sep 2, 2020
1 parent 24b03a6 commit c04000c
Show file tree
Hide file tree
Showing 8 changed files with 1,225 additions and 75 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ public class RocksDBMetricsIntegrationTest {
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";
private static final String NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE = "num-entries-active-mem-table";
private static final String NUMBER_OF_DELETES_ACTIVE_MEMTABLE = "num-deletes-active-mem-table";
private static final String NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES = "num-entries-imm-mem-tables";
private static final String NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES = "num-deletes-imm-mem-tables";
private static final String NUMBER_OF_IMMUTABLE_MEMTABLES = "num-immutable-mem-table";
private static final String CURRENT_SIZE_OF_ACTIVE_MEMTABLE = "cur-size-active-mem-table";
private static final String CURRENT_SIZE_OF_ALL_MEMTABLES = "cur-size-all-mem-tables";
private static final String SIZE_OF_ALL_MEMTABLES = "size-all-mem-tables";
private static final String MEMTABLE_FLUSH_PENDING = "mem-table-flush-pending";
private static final String NUMBER_OF_RUNNING_FLUSHES = "num-running-flushes";
private static final String COMPACTION_PENDING = "compaction-pending";
private static final String NUMBER_OF_RUNNING_COMPACTIONS = "num-running-compactions";
private static final String ESTIMATED_BYTES_OF_PENDING_COMPACTION = "estimate-pending-compaction-bytes";
private static final String TOTAL_SST_FILES_SIZE = "total-sst-files-size";
private static final String LIVE_SST_FILES_SIZE = "live-sst-files-size";
private static final String NUMBER_OF_LIVE_VERSIONS = "num-live-versions";
private static final String CAPACITY_OF_BLOCK_CACHE = "block-cache-capacity";
private static final String USAGE_OF_BLOCK_CACHE = "block-cache-usage";
private static final String PINNED_USAGE_OF_BLOCK_CACHE = "block-cache-pinned-usage";
private static final String ESTIMATED_NUMBER_OF_KEYS = "estimate-num-keys";
private static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
private static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";

@Parameters(name = "{0}")
public static Collection<Object[]> data() {
Expand Down Expand Up @@ -164,6 +185,7 @@ private Properties streamsConfig() {
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return streamsConfiguration;
}

Expand Down Expand Up @@ -202,7 +224,7 @@ private void cleanUpStateRunVerifyAndClose(final StreamsBuilder builder,
kafkaStreams.close();
}

private void produceRecords() throws Exception {
private void produceRecords() {
final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
final Properties prop = TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
Expand Down Expand Up @@ -257,6 +279,27 @@ private void verifyThatRocksDBMetricsAreExposed(final KafkaStreams kafkaStreams,
checkMetricByName(listMetricStore, NUMBER_OF_OPEN_FILES, 1);
checkMetricByName(listMetricStore, NUMBER_OF_FILE_ERRORS, 1);
checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE, 1);
checkMetricByName(listMetricStore, NUMBER_OF_DELETES_ACTIVE_MEMTABLE, 1);
checkMetricByName(listMetricStore, NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES, 1);
checkMetricByName(listMetricStore, NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES, 1);
checkMetricByName(listMetricStore, NUMBER_OF_IMMUTABLE_MEMTABLES, 1);
checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ACTIVE_MEMTABLE, 1);
checkMetricByName(listMetricStore, CURRENT_SIZE_OF_ALL_MEMTABLES, 1);
checkMetricByName(listMetricStore, SIZE_OF_ALL_MEMTABLES, 1);
checkMetricByName(listMetricStore, MEMTABLE_FLUSH_PENDING, 1);
checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_FLUSHES, 1);
checkMetricByName(listMetricStore, COMPACTION_PENDING, 1);
checkMetricByName(listMetricStore, NUMBER_OF_RUNNING_COMPACTIONS, 1);
checkMetricByName(listMetricStore, ESTIMATED_BYTES_OF_PENDING_COMPACTION, 1);
checkMetricByName(listMetricStore, TOTAL_SST_FILES_SIZE, 1);
checkMetricByName(listMetricStore, LIVE_SST_FILES_SIZE, 1);
checkMetricByName(listMetricStore, NUMBER_OF_LIVE_VERSIONS, 1);
checkMetricByName(listMetricStore, CAPACITY_OF_BLOCK_CACHE, 1);
checkMetricByName(listMetricStore, USAGE_OF_BLOCK_CACHE, 1);
checkMetricByName(listMetricStore, PINNED_USAGE_OF_BLOCK_CACHE, 1);
checkMetricByName(listMetricStore, ESTIMATED_NUMBER_OF_KEYS, 1);
checkMetricByName(listMetricStore, ESTIMATED_MEMORY_OF_TABLE_READERS, 1);
checkMetricByName(listMetricStore, NUMBER_OF_BACKGROUND_ERRORS, 1);
}

private void checkMetricByName(final List<Metric> listMetric,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public class StreamsMetricsImplTest {
private final static String STORE_ID_TAG = "-state-id";
private final static String STORE_NAME1 = "store1";
private final static String STORE_NAME2 = "store2";
private final static Map<String, String> STORE_LEVEL_TAG_MAP = mkMap(
mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
mkEntry(TASK_ID_TAG, TASK_ID1),
mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
);
private final static String RECORD_CACHE_ID_TAG = "record-cache-id";
private final static String ENTITY_NAME = "test-entity";
private final static String OPERATION_NAME = "test-operation";
Expand All @@ -125,11 +130,6 @@ public class StreamsMetricsImplTest {
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
private final Map<String, String> clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
private final Map<String, String> storeLevelTagMap = mkMap(
mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()),
mkEntry(TASK_ID_TAG, TASK_ID1),
mkEntry(SCOPE_NAME + STORE_ID_TAG, STORE_NAME1)
);
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags);
private final MetricName metricName2 =
Expand Down Expand Up @@ -427,9 +427,9 @@ private Capture<String> setUpSensorKeyTests(final Metrics metrics) {
public void shouldAddNewStoreLevelMutableMetric() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(null);
metrics.addMetric(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER));
Expand All @@ -453,8 +453,8 @@ public void shouldAddNewStoreLevelMutableMetric() {
public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
final Metrics metrics = mock(Metrics.class);
final MetricName metricName =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName);
expect(metrics.metric(metricName)).andReturn(mock(KafkaMetric.class));
replay(metrics);
Expand All @@ -478,12 +478,12 @@ public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = niceMock(Metrics.class);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final MetricName metricName1 =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap);
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricName metricName2 =
new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, storeLevelTagMap))
new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP);
expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.andReturn(metricName1);
expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, storeLevelTagMap))
expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP))
.andReturn(metricName2);
final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
resetToDefault(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -628,23 +629,72 @@ public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRock
EasyMock.replay(context);

rocksDBStore.init(context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);

final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName(
"num-entries-active-mem-table",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));

assertThat(numberOfEntriesActiveMemTable, notNullValue());
assertThat(numberOfEntriesActiveMemTable.metricValue(), is(BigInteger.valueOf(0)));
assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
}

final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);
@Test
public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
final TaskId taskId = new TaskId(0, 0);

assertThat(numberOfEntriesActiveMemTable, notNullValue());
assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0)));
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time);

final Properties props = StreamsTestUtils.getStreamsConfig();
context = EasyMock.niceMock(InternalMockProcessorContext.class);
EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.expect(context.appConfigs()).andStubReturn(new StreamsConfig(props).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);

rocksDBStore.init(context, rocksDBStore);

final List<String> propertyNames = Arrays.asList(
"num-entries-active-mem-table",
"num-deletes-active-mem-table",
"num-entries-imm-mem-tables",
"num-deletes-imm-mem-tables",
"num-immutable-mem-table",
"cur-size-active-mem-table",
"cur-size-all-mem-tables",
"size-all-mem-tables",
"mem-table-flush-pending",
"num-running-flushes",
"compaction-pending",
"num-running-compactions",
"estimate-pending-compaction-bytes",
"total-sst-files-size",
"live-sst-files-size",
"num-live-versions",
"block-cache-capacity",
"block-cache-usage",
"block-cache-pinned-usage",
"estimate-num-keys",
"estimate-table-readers-mem",
"background-errors"
);
for (final String propertyname : propertyNames) {
final Metric metric = metrics.metric(new MetricName(
propertyname,
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertThat("Metric " + propertyname + " not found!", metric, notNullValue());
metric.metricValue();
}
}

public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
Expand Down
Loading

0 comments on commit c04000c

Please sign in to comment.