From aea14651184476940c69238535de5143e61f4c31 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 5 Apr 2017 10:31:06 +0100 Subject: [PATCH] HOTFIX: WindowedStreamPartitioner does not provide topic name to serializer Author: Matthias J. Sax Reviewers: Eno Thereska , Damian Guy , Ismael Juma Closes #2777 from mjsax/hotfix-window-serdes-trunk --- .../kstream/internals/KStreamImpl.java | 12 ++-- .../kstream/internals/SessionKeySerde.java | 17 +++--- .../internals/WindowedStreamPartitioner.java | 8 ++- .../kafka/streams/state/StateSerdes.java | 57 +++++++++++-------- .../state/internals/CachingKeyValueStore.java | 8 +-- .../state/internals/CachingSessionStore.java | 23 +++++--- .../state/internals/CachingWindowStore.java | 14 +++-- .../ChangeLoggingKeyValueBytesStore.java | 9 ++- .../internals/ChangeLoggingKeyValueStore.java | 7 ++- .../ChangeLoggingSegmentedBytesStore.java | 9 ++- .../InMemoryKeyValueLoggedStore.java | 10 ++-- .../internals/InMemoryKeyValueStore.java | 8 ++- .../state/internals/MemoryLRUCache.java | 8 ++- ...MergedSortedCacheSessionStoreIterator.java | 4 +- .../internals/RocksDBSegmentedBytesStore.java | 3 + .../state/internals/RocksDBSessionStore.java | 16 ++++-- .../RocksDBSessionStoreSupplier.java | 6 +- .../streams/state/internals/RocksDBStore.java | 8 ++- .../state/internals/RocksDBWindowStore.java | 7 ++- .../state/internals/SegmentedBytesStore.java | 8 +++ .../state/internals/SessionKeySchema.java | 10 +++- .../state/internals/WindowKeySchema.java | 7 ++- .../state/internals/WindowStoreUtils.java | 5 +- .../WrappedSessionStoreIterator.java | 4 +- .../internals/SessionKeySerdeTest.java | 16 +++--- .../WindowedStreamPartitionerTest.java | 2 +- .../state/KeyValueStoreTestDriver.java | 34 +++++------ .../internals/CachingSessionStoreTest.java | 6 +- .../internals/CachingWindowStoreTest.java | 2 +- .../CompositeReadOnlyKeyValueStoreTest.java | 2 +- ...dSortedCacheKeyValueStoreIteratorTest.java | 2 +- ...dCacheWrappedSessionStoreIteratorTest.java | 2 +- .../RocksDBSegmentedBytesStoreTest.java | 9 +-- .../internals/RocksDBSessionStoreTest.java | 5 +- .../state/internals/SessionKeySchemaTest.java | 1 + .../state/internals/StateStoreTestUtils.java | 16 +++++- .../internals/StoreChangeLoggerTest.java | 6 +- .../internals/WrappingStoreProviderTest.java | 4 +- 38 files changed, 229 insertions(+), 146 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 5eabc2c07dd5e..bbd4ac4c0867c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -360,16 +360,16 @@ public void to(Serde keySerde, Serde valSerde, String topic) { @SuppressWarnings("unchecked") @Override - public void to(Serde keySerde, Serde valSerde, StreamPartitioner partitioner, String topic) { + public void to(final Serde keySerde, final Serde valSerde, StreamPartitioner partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); - String name = topology.newName(SINK_NAME); + final String name = topology.newName(SINK_NAME); - Serializer keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer valSerializer = valSerde == null ? null : valSerde.serializer(); + final Serializer keySerializer = keySerde == null ? null : keySerde.serializer(); + final Serializer valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { - WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; - partitioner = (StreamPartitioner) new WindowedStreamPartitioner(windowedSerializer); + final WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; + partitioner = (StreamPartitioner) new WindowedStreamPartitioner(topic, windowedSerializer); } topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 7a85c7716c389..3b57d95885fb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -33,7 +33,6 @@ */ public class SessionKeySerde implements Serde> { private static final int TIMESTAMP_SIZE = 8; - private static final String SESSIONKEY = "sessionkey"; private final Serde keySerde; @@ -77,7 +76,7 @@ public byte[] serialize(final String topic, final Windowed data) { if (data == null) { return null; } - return toBinary(data, keySerializer).get(); + return toBinary(data, keySerializer, topic).get(); } @Override @@ -102,7 +101,7 @@ public Windowed deserialize(final String topic, final byte[] data) { if (data == null || data.length == 0) { return null; } - return from(data, deserializer); + return from(data, deserializer, topic); } @@ -133,8 +132,8 @@ public static byte[] extractKeyBytes(final byte[] binaryKey) { return bytes; } - public static Windowed from(final byte[] binaryKey, final Deserializer keyDeserializer) { - final K key = extractKey(binaryKey, keyDeserializer); + public static Windowed from(final byte[] binaryKey, final Deserializer keyDeserializer, final String topic) { + final K key = extractKey(binaryKey, keyDeserializer, topic); final Window window = extractWindow(binaryKey); return new Windowed<>(key, window); } @@ -147,12 +146,12 @@ public static Windowed fromBytes(Bytes bytesKey) { return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end)); } - private static K extractKey(final byte[] binaryKey, Deserializer deserializer) { - return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey)); + private static K extractKey(final byte[] binaryKey, final Deserializer deserializer, final String topic) { + return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); } - public static Bytes toBinary(final Windowed sessionKey, final Serializer serializer) { - final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key()); + public static Bytes toBinary(final Windowed sessionKey, final Serializer serializer, final String topic) { + final byte[] bytes = serializer.serialize(topic, sessionKey.key()); ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); buf.putLong(sessionKey.window().end()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index f4a5e812bc47c..fa1ceae39ec33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -24,9 +24,11 @@ public class WindowedStreamPartitioner implements StreamPartitioner, V> { + private final String topic; private final WindowedSerializer serializer; - public WindowedStreamPartitioner(WindowedSerializer serializer) { + WindowedStreamPartitioner(final String topic, final WindowedSerializer serializer) { + this.topic = topic; this.serializer = serializer; } @@ -40,8 +42,8 @@ public WindowedStreamPartitioner(WindowedSerializer serializer) { * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ - public Integer partition(Windowed windowedKey, V value, int numPartitions) { - byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + public Integer partition(final Windowed windowedKey, final V value, final int numPartitions) { + final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); // hash the keyBytes to choose a partition return toPositive(Utils.murmur2(keyBytes)) % numPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index 43663119c57fc..d43c61393dc0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -32,18 +32,21 @@ public final class StateSerdes { /** * Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes. * - * @param stateName the name of the state - * @param keyClass the class of the key type - * @param valueClass the class of the value type - * @param the key type - * @param the value type - * @return a new instance of {@link StateSerdes} + * @param topic the topic name + * @param keyClass the class of the key type + * @param valueClass the class of the value type + * @param the key type + * @param the value type + * @return a new instance of {@link StateSerdes} */ - public static StateSerdes withBuiltinTypes(String stateName, Class keyClass, Class valueClass) { - return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); + public static StateSerdes withBuiltinTypes( + final String topic, + final Class keyClass, + final Class valueClass) { + return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass)); } - private final String stateName; + private final String topic; private final Serde keySerde; private final Serde valueSerde; @@ -53,22 +56,26 @@ public static StateSerdes withBuiltinTypes(String stateName, Class< * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not * need to provide the topic name any more. * - * @param stateName the name of the state + * @param topic the topic name * @param keySerde the serde for keys; cannot be null * @param valueSerde the serde for values; cannot be null * @throws IllegalArgumentException if key or value serde is null */ @SuppressWarnings("unchecked") - public StateSerdes(String stateName, - Serde keySerde, - Serde valueSerde) { - this.stateName = stateName; - - if (keySerde == null) + public StateSerdes(final String topic, + final Serde keySerde, + final Serde valueSerde) { + if (topic == null) { + throw new IllegalArgumentException("topic cannot be null"); + } + if (keySerde == null) { throw new IllegalArgumentException("key serde cannot be null"); - if (valueSerde == null) + } + if (valueSerde == null) { throw new IllegalArgumentException("value serde cannot be null"); + } + this.topic = topic; this.keySerde = keySerde; this.valueSerde = valueSerde; } @@ -128,12 +135,12 @@ public Serializer valueSerializer() { } /** - * Return the name of the state. + * Return the topic. * - * @return the name of the state + * @return the topic */ - public String stateName() { - return stateName; + public String topic() { + return topic; } /** @@ -143,7 +150,7 @@ public String stateName() { * @return the key as typed object */ public K keyFrom(byte[] rawKey) { - return keySerde.deserializer().deserialize(stateName, rawKey); + return keySerde.deserializer().deserialize(topic, rawKey); } /** @@ -153,7 +160,7 @@ public K keyFrom(byte[] rawKey) { * @return the value as typed object */ public V valueFrom(byte[] rawValue) { - return valueSerde.deserializer().deserialize(stateName, rawValue); + return valueSerde.deserializer().deserialize(topic, rawValue); } /** @@ -163,7 +170,7 @@ public V valueFrom(byte[] rawValue) { * @return the serialized key */ public byte[] rawKey(K key) { - return keySerde.serializer().serialize(stateName, key); + return keySerde.serializer().serialize(topic, key); } /** @@ -173,6 +180,6 @@ public byte[] rawKey(K key) { * @return the serialized value */ public byte[] rawValue(V value) { - return valueSerde.serializer().serialize(stateName, value); + return valueSerde.serializer().serialize(topic, value); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index d9ef688c5b1ec..2a720be86a378 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -51,11 +52,6 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im this.valueSerde = valueSerde; } - @Override - public String name() { - return underlying.name(); - } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { @@ -69,7 +65,7 @@ public void init(final ProcessorContext context, final StateStore root) { @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? (Serde) context.valueSerde() : valueSerde); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index a4b46ff2ff97e..bebd11845e115 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -43,6 +44,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i private StateSerdes serdes; private InternalProcessorContext context; private CacheFlushListener, AGG> flushListener; + private String topic; CachingSessionStore(final SessionStore bytesStore, final Serde keySerde, @@ -56,6 +58,7 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); bytesStore.init(context, root); initInternal((InternalProcessorContext) context); } @@ -64,13 +67,15 @@ public void init(final ProcessorContext context, final StateStore root) { private void initInternal(final InternalProcessorContext context) { this.context = context; - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - aggSerde == null ? (Serde) context.valueSerde() : aggSerde); + keySchema.init(topic); + serdes = new StateSerdes<>( + topic, + keySerde == null ? (Serde) context.keySerde() : keySerde, + aggSerde == null ? (Serde) context.valueSerde() : aggSerde); - this.cacheName = context.taskId() + "-" + bytesStore.name(); - this.cache = this.context.getCache(); + cacheName = context.taskId() + "-" + bytesStore.name(); + cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List entries) { @@ -85,7 +90,7 @@ public KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key)); + final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime), keySchema.upperRange(binarySessionId, latestSessionStartTime)); @@ -106,7 +111,7 @@ public void remove(final Windowed sessionKey) { @Override public void put(final Windowed key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer()); + final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey, entry); @@ -122,12 +127,12 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer()); + final Windowed key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey); if (!(newValue == null && oldValue == null)) { - flushListener.apply(key, newValue == null ? null : newValue, oldValue); + flushListener.apply(key, newValue, oldValue); } } bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 4003e546dca7d..f4925730ce6b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -62,17 +63,18 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl public void init(final ProcessorContext context, final StateStore root) { underlying.init(context, root); initInternal(context); + keySchema.init(context.applicationId()); } @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(underlying.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); - this.name = context.taskId() + "-" + underlying.name(); - this.cache = this.context.getCache(); + name = context.taskId() + "-" + underlying.name(); + cache = this.context.getCache(); cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -161,7 +163,7 @@ public synchronized WindowStoreIterator fetch(final K key, final long timeFro return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, - new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde())); + new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde())); } private V fetchPrevious(final Bytes key, final long timestamp) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index f5ad3acd96e40..8dc457a9949d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS @Override public void init(final ProcessorContext context, final StateStore root) { inner.init(context, root); - this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES); + this.changeLogger = new StoreChangeLogger<>( + inner.name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + inner.name()))); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java index c60278f2c0b8e..ea9f7aa871313 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -55,9 +56,9 @@ private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesSt public void init(final ProcessorContext context, final StateStore root) { innerBytes.init(context, root); - this.serdes = new StateSerdes<>(innerBytes.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), innerBytes.name()), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index 3082426801f5d..d23e1159b6f2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; /** @@ -65,6 +66,12 @@ public byte[] get(final Bytes key) { @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { bytesStore.init(context, root); - changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES); + changeLogger = new StoreChangeLogger<>( + name(), + context, + WindowStoreUtils.getInnerStateSerde( + ProcessorStateManager.storeChangelogTopic( + context.applicationId(), + bytesStore.name()))); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index bcc98191a97ac..638caadd922bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -33,7 +34,6 @@ public class InMemoryKeyValueLoggedStore extends WrappedStateStore.Abstrac private final Serde valueSerde; private StoreChangeLogger changeLogger; - private ProcessorContext context; InMemoryKeyValueLoggedStore(final KeyValueStore inner, Serde keySerde, Serde valueSerde) { super(inner); @@ -45,13 +45,13 @@ public class InMemoryKeyValueLoggedStore extends WrappedStateStore.Abstrac @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { - this.context = context; inner.init(context, root); // construct the serde - StateSerdes serdes = new StateSerdes<>(inner.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + StateSerdes serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index f63d2f1aefcd7..41c6de321f3a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -64,9 +65,10 @@ public String name() { @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); if (root != null) { // register the store diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index cf78165c4749d..e6bba54565f41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -103,9 +104,10 @@ public String name() { @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { // construct the serde - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); // register the store context.register(root, true, new StateRestoreCallback() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index 72b73ed9b1c68..3f9b620719ab9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -36,7 +36,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSorted MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator cacheIterator, final KeyValueIterator, byte[]> storeIterator, final StateSerdes serdes) { - super(cacheIterator, storeIterator, new StateSerdes<>(serdes.stateName(), + super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(), new SessionKeySerde<>(serdes.keySerde()), serdes.valueSerde())); @@ -51,7 +51,7 @@ public KeyValue, AGG> deserializeStorePair(KeyValue, @Override Windowed deserializeCacheKey(final Bytes cacheKey) { - return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer()); + return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index a9d9259b70334..252a55fae5e5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import java.util.List; @@ -89,6 +90,8 @@ public String name() { public void init(ProcessorContext context, StateStore root) { this.context = context; + keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name())); + segments.openExisting(context); // register and possibly restore the state from the logs diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index cdd0dd71406d0..502778139a6f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; @@ -35,6 +36,7 @@ class RocksDBSessionStore extends WrappedStateStore.AbstractStateStore i protected final SegmentedBytesStore bytesStore; protected StateSerdes serdes; + protected String topic; // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs private static class RocksDBSessionBytesStore extends RocksDBSessionStore { @@ -75,9 +77,13 @@ static RocksDBSessionStore bytesStore(final SegmentedBytesStore i @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - aggSerde == null ? (Serde) context.valueSerde() : aggSerde); + final String storeName = bytesStore.name(); + topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + + serdes = new StateSerdes<>( + topic, + keySerde == null ? (Serde) context.keySerde() : keySerde, + aggSerde == null ? (Serde) context.valueSerde() : aggSerde); bytesStore.init(context, root); } @@ -95,11 +101,11 @@ public KeyValueIterator, AGG> fetch(final K key) { @Override public void remove(final Windowed key) { - bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer())); + bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic)); } @Override public void put(final Windowed sessionKey, final AGG aggregate) { - bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate)); + bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(topic, aggregate)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java index 6743a7e91abfd..4e618d9802cde 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java @@ -51,9 +51,9 @@ public String name() { public SessionStore get() { final SessionKeySchema keySchema = new SessionKeySchema(); final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name, - retentionPeriod, - NUM_SEGMENTS, - keySchema); + retentionPeriod, + NUM_SEGMENTS, + keySchema); if (cached && logged) { final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 932ddd29fa0b8..c879b91b0f44b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -143,9 +144,10 @@ public void openDB(ProcessorContext context) { } // we need to construct the serde while opening DB since // it is also triggered by windowed DB segments without initialization - this.serdes = new StateSerdes<>(name, - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + this.serdes = new StateSerdes<>( + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index b82e41605eaf4..5e8d0b204a4de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -77,9 +78,9 @@ static RocksDBWindowStore bytesStore(final SegmentedBytesStore in public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde - this.serdes = new StateSerdes<>(bytesStore.name(), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()), + keySerde == null ? (Serde) context.keySerde() : keySerde, + valueSerde == null ? (Serde) context.valueSerde() : valueSerde); bytesStore.init(context, root); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 622ed08fad44e..0c3bb53e7d51f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -66,6 +66,14 @@ public interface SegmentedBytesStore extends StateStore { byte[] get(Bytes key); interface KeySchema { + + /** + * Initialized the schema with a topic. + * + * @param topic a topic name + */ + void init(final String topic); + /** * Given a record-key and a time, construct a Segmented key that represents * the upper range of keys to search when performing range queries. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 7d6761c15e999..80785b2dcf620 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -27,17 +27,23 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema { + private String topic; + + @Override + public void init(final String topic) { + this.topic = topic; + } @Override public Bytes upperRange(final Bytes key, final long to) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override public Bytes lowerRange(final Bytes key, final long from) { final Windowed sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); - return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); + return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), topic); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 76faf0eb3d6d8..b9a8665fc0755 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -24,7 +24,12 @@ import java.util.List; class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { - private final StateSerdes serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray()); + private StateSerdes serdes; + + @Override + public void init(final String topic) { + serdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()); + } @Override public Bytes upperRange(final Bytes key, final long to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index b93e39a190d2f..faf289910fa49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -31,7 +31,10 @@ public class WindowStoreUtils { /** Inner byte array serde used for segments */ static final Serde INNER_KEY_SERDE = Serdes.Bytes(); static final Serde INNER_VALUE_SERDE = Serdes.ByteArray(); - static final StateSerdes INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); + + static StateSerdes getInnerStateSerde(final String topic) { + return new StateSerdes<>(topic, INNER_KEY_SERDE, INNER_VALUE_SERDE); + } static Bytes toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes serdes) { byte[] serializedKey = serdes.rawKey(key); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java index 819f263ad4f36..6fd963644011f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java @@ -66,7 +66,7 @@ public void close() { @Override public Windowed peekNextKey() { final Bytes bytes = bytesIterator.peekNextKey(); - return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer()); + return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer(), serdes.topic()); } @Override @@ -77,7 +77,7 @@ public boolean hasNext() { @Override public KeyValue, V> next() { final KeyValue next = bytesIterator.next(); - return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value)); + return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer(), serdes.topic()), serdes.valueFrom(next.value)); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java index 65c4d9fa37b98..aca3352fe8307 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -62,39 +62,39 @@ public void shouldDeSerializeNullToNull() throws Exception { @Test public void shouldConvertToBinaryAndBack() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); - final Windowed result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); + final Windowed result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy"); assertEquals(windowedKey, result); } @Test public void shouldExtractEndTimeFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get())); } @Test public void shouldExtractStartTimeFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(startTime, SessionKeySerde.extractStart(serialized.get())); } @Test public void shouldExtractWindowFromBindary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertEquals(window, SessionKeySerde.extractWindow(serialized.get())); } @Test public void shouldExtractKeyBytesFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); } @Test public void shouldExtractKeyFromBinary() throws Exception { - final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer()); - assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer())); + final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy"); + assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy")); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 3af50d8c4de30..316494d4b48da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -69,7 +69,7 @@ public void testCopartitioning() { DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); WindowedSerializer windowedSerializer = new WindowedSerializer<>(intSerializer); - WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); + WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 08adbf46f31c9..b758799699339 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -16,16 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.metrics.JmxReporter; @@ -54,6 +44,17 @@ import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + /** * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. @@ -170,11 +171,12 @@ public static KeyValueStoreTestDriver create(Class keyClass, Cla * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null * @return the test driver; never null */ - public static KeyValueStoreTestDriver create(Serializer keySerializer, - Deserializer keyDeserializer, - Serializer valueSerializer, - Deserializer valueDeserializer) { - StateSerdes serdes = new StateSerdes("unexpected", + public static KeyValueStoreTestDriver create(final Serializer keySerializer, + final Deserializer keyDeserializer, + final Serializer valueSerializer, + final Deserializer valueDeserializer) { + StateSerdes serdes = new StateSerdes( + "unexpected", Serdes.serdeFrom(keySerializer, keyDeserializer), Serdes.serdeFrom(valueSerializer, valueDeserializer)); return new KeyValueStoreTestDriver(serdes); @@ -234,7 +236,7 @@ public void send(final String topic, this.stateDir.mkdirs(); props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId"); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 59caaaf02fa16..7377ba28c402f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -58,7 +58,9 @@ public class CachingSessionStoreTest { @Before public void setUp() throws Exception { - underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema()); + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); + underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, schema); final RocksDBSessionStore sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), @@ -116,7 +118,7 @@ public void shouldFlushItemsToStoreOnEviction() throws Exception { assertEquals(added.size() - 1, cache.size()); final KeyValueIterator iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0); final KeyValue next = iterator.next(); - assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer())); + assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy")); assertArrayEquals(serdes.rawValue(added.get(0).value), next.value); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 297a88e6659f2..054e685412eae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -160,7 +160,7 @@ public void shouldIterateAcrossWindows() throws Exception { @Test public void shouldIterateCacheAndStore() throws Exception { final Bytes key = Bytes.wrap("1" .getBytes()); - underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES), "a".getBytes()); + underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.getInnerStateSerde("app-id")), "a".getBytes()); cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE); final WindowStoreIterator fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index e19c4efc4f938..2e5b8725d14e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -61,7 +61,7 @@ public void before() { } private KeyValueStore newStoreInstance() { - return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class); + return StateStoreTestUtils.newKeyValueStore(storeName, "app-id", String.class, String.class); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index 3f05428f59475..6e0059ffddacd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -32,7 +32,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { private final String namespace = "one"; - private final StateSerdes serdes = new StateSerdes<>(namespace, Serdes.ByteArray(), Serdes.ByteArray()); + private final StateSerdes serdes = new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray()); private KeyValueStore store; private ThreadCache cache; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 5f24fdea8b1c9..d3d8f4043b6cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -45,7 +45,7 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest { private final SessionWindow cacheWindow = new SessionWindow(10, 20); private final Iterator> cacheKvs = Collections.singleton(KeyValue.pair( SessionKeySerde.toBinary( - new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()), new LRUCacheEntry(cacheKey.getBytes()))) + new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer(), "dummy"), new LRUCacheEntry(cacheKey.getBytes()))) .iterator(); @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index ab13c241dd949..bd335d4f32f33 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -54,11 +54,12 @@ public class RocksDBSegmentedBytesStoreTest { @Before public void before() { - + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); bytesStore = new RocksDBSegmentedBytesStore(storeName, retention, numSegments, - new SessionKeySchema()); + schema); stateDir = TestUtils.tempDirectory(); final MockProcessorContext context = new MockProcessorContext(stateDir, @@ -154,7 +155,7 @@ private byte[] serializeValue(final long value) { } private Bytes serializeKey(final Windowed key) { - return SessionKeySerde.toBinary(key, Serdes.String().serializer()); + return SessionKeySerde.toBinary(key, Serdes.String().serializer(), "dummy"); } private List, Long>> toList(final KeyValueIterator iterator) { @@ -162,7 +163,7 @@ private List, Long>> toList(final KeyValueIterator next = iterator.next(); final KeyValue, Long> deserialized - = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value)); + = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value)); results.add(deserialized); } return results; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 9be7c10218c35..7f01aae9ef900 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -48,8 +48,11 @@ public class RocksDBSessionStoreTest { @Before public void before() { + final SessionKeySchema schema = new SessionKeySchema(); + schema.init("topic"); + final RocksDBSegmentedBytesStore bytesStore = - new RocksDBSegmentedBytesStore("session-store", 10000L, 3, new SessionKeySchema()); + new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema); sessionStore = new RocksDBSessionStore<>(bytesStore, Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java index 7c085ddffcdca..354cf01fc0e07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -40,6 +40,7 @@ public class SessionKeySchemaTest { @Before public void before() { + sessionKeySchema.init("topic"); final List> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index 39cc5b5f77fcf..d30372fd80b08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -28,7 +29,10 @@ @SuppressWarnings("unchecked") public class StateStoreTestUtils { - public static KeyValueStore newKeyValueStore(String name, Class keyType, Class valueType) { + public static KeyValueStore newKeyValueStore(final String name, + final String applicationId, + final Class keyType, + final Class valueType) { final InMemoryKeyValueStoreSupplier supplier = new InMemoryKeyValueStoreSupplier<>(name, null, null, @@ -37,8 +41,14 @@ public static KeyValueStore newKeyValueStore(String name, Class Collections.emptyMap()); final StateStore stateStore = supplier.get(); - stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType), - new NoOpRecordCollector()), stateStore); + stateStore.init( + new MockProcessorContext( + StateSerdes.withBuiltinTypes( + ProcessorStateManager.storeChangelogTopic(applicationId, name), + keyType, + valueType), + new NoOpRecordCollector()), + stateStore); return (KeyValueStore) stateStore; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 6edda74b7d41f..311eaf6ff7844 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -17,9 +17,6 @@ package org.apache.kafka.streams.state.internals; -import java.util.HashMap; -import java.util.Map; - import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; @@ -27,6 +24,9 @@ import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index 76a8747e63d8a..991867248ecdf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -42,9 +42,9 @@ public void before() { final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class)); + stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", "app-id", String.class, String.class)); stubProviderTwo.addStore("window", new NoOpWindowStore()); wrappingStoreProvider = new WrappingStoreProvider(