Skip to content

Commit

Permalink
KAFKA-13075: Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest (
Browse files Browse the repository at this point in the history
apache#11034)

Consolidate the RocksDBStoreTest and RocksDBKeyValueStoreTest files into a single test class for the RocksDBStore.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
tang7526 authored Jul 13, 2021
1 parent ac18bdc commit f301768
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 112 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -32,12 +33,16 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRocksDbConfigSetter;
Expand Down Expand Up @@ -84,11 +89,12 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;

@SuppressWarnings("unchecked")
public class RocksDBStoreTest {
public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
private static boolean enableBloomFilters = false;
final static String DB_NAME = "db-name";
final static String METRICS_SCOPE = "metrics-scope";
Expand Down Expand Up @@ -122,6 +128,18 @@ public void tearDown() {
rocksDBStore.close();
}

@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
(Serde<K>) context.keySerde(),
(Serde<V>) context.valueSerde());

final KeyValueStore<K, V> store = storeBuilder.build();
store.init(context, store);
return store;
}

RocksDBStore getRocksDBStore() {
return new RocksDBStore(DB_NAME, METRICS_SCOPE);
}
Expand Down Expand Up @@ -849,6 +867,70 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
}
}

@Test
public void shouldPerformRangeQueriesWithCachingDisabled() {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
final KeyValueIterator<Integer, String> range = store.range(1, 2);
assertEquals("hi", range.next().value);
assertEquals("goodbye", range.next().value);
assertFalse(range.hasNext());
}

@Test
public void shouldPerformAllQueriesWithCachingDisabled() {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
final KeyValueIterator<Integer, String> range = store.all();
assertEquals("hi", range.next().value);
assertEquals("goodbye", range.next().value);
assertFalse(range.hasNext());
}

@Test
public void shouldCloseOpenRangeIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() {
context.setTime(1L);
store.put(1, "hi");
store.put(2, "goodbye");
final KeyValueIterator<Integer, String> iteratorOne = store.range(1, 5);
final KeyValueIterator<Integer, String> iteratorTwo = store.range(1, 4);

assertTrue(iteratorOne.hasNext());
assertTrue(iteratorTwo.hasNext());

store.close();

try {
iteratorOne.hasNext();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}

try {
iteratorOne.next();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}

try {
iteratorTwo.hasNext();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}

try {
iteratorTwo.next();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}
}

public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {

static boolean bloomFiltersSet;
Expand Down

0 comments on commit f301768

Please sign in to comment.