Skip to content

Commit

Permalink
HOTFIX: WindowedStreamPartitioner does not provide topic name to seri…
Browse files Browse the repository at this point in the history
…alizer

Author: Matthias J. Sax <[email protected]>

Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]>

Closes apache#2777 from mjsax/hotfix-window-serdes-trunk
  • Loading branch information
mjsax authored and ijuma committed Apr 5, 2017
1 parent 49f80b2 commit aea1465
Show file tree
Hide file tree
Showing 38 changed files with 229 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,16 +360,16 @@ public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {

@SuppressWarnings("unchecked")
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
Objects.requireNonNull(topic, "topic can't be null");
String name = topology.newName(SINK_NAME);
final String name = topology.newName(SINK_NAME);

Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();

if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
}

topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
*/
public class SessionKeySerde<K> implements Serde<Windowed<K>> {
private static final int TIMESTAMP_SIZE = 8;
private static final String SESSIONKEY = "sessionkey";

private final Serde<K> keySerde;

Expand Down Expand Up @@ -77,7 +76,7 @@ public byte[] serialize(final String topic, final Windowed<K> data) {
if (data == null) {
return null;
}
return toBinary(data, keySerializer).get();
return toBinary(data, keySerializer, topic).get();
}

@Override
Expand All @@ -102,7 +101,7 @@ public Windowed<K> deserialize(final String topic, final byte[] data) {
if (data == null || data.length == 0) {
return null;
}
return from(data, deserializer);
return from(data, deserializer, topic);
}


Expand Down Expand Up @@ -133,8 +132,8 @@ public static byte[] extractKeyBytes(final byte[] binaryKey) {
return bytes;
}

public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) {
final K key = extractKey(binaryKey, keyDeserializer);
public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer, final String topic) {
final K key = extractKey(binaryKey, keyDeserializer, topic);
final Window window = extractWindow(binaryKey);
return new Windowed<>(key, window);
}
Expand All @@ -147,12 +146,12 @@ public static Windowed<Bytes> fromBytes(Bytes bytesKey) {
return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end));
}

private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey));
private static <K> K extractKey(final byte[] binaryKey, final Deserializer<K> deserializer, final String topic) {
return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
}

public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) {
final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key());
public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) {
final byte[] bytes = serializer.serialize(topic, sessionKey.key());
ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(sessionKey.window().end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {

private final String topic;
private final WindowedSerializer<K> serializer;

public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) {
this.topic = topic;
this.serializer = serializer;
}

Expand All @@ -40,8 +42,8 @@ public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) {
final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);

// hash the keyBytes to choose a partition
return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ public final class StateSerdes<K, V> {
/**
* Create a new instance of {@link StateSerdes} for the given state name and key-/value-type classes.
*
* @param stateName the name of the state
* @param keyClass the class of the key type
* @param valueClass the class of the value type
* @param <K> the key type
* @param <V> the value type
* @return a new instance of {@link StateSerdes}
* @param topic the topic name
* @param keyClass the class of the key type
* @param valueClass the class of the value type
* @param <K> the key type
* @param <V> the value type
* @return a new instance of {@link StateSerdes}
*/
public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<K> keyClass, Class<V> valueClass) {
return new StateSerdes<>(stateName, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
public static <K, V> StateSerdes<K, V> withBuiltinTypes(
final String topic,
final Class<K> keyClass,
final Class<V> valueClass) {
return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
}

private final String stateName;
private final String topic;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;

Expand All @@ -53,22 +56,26 @@ public static <K, V> StateSerdes<K, V> withBuiltinTypes(String stateName, Class<
* is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
* need to provide the topic name any more.
*
* @param stateName the name of the state
* @param topic the topic name
* @param keySerde the serde for keys; cannot be null
* @param valueSerde the serde for values; cannot be null
* @throws IllegalArgumentException if key or value serde is null
*/
@SuppressWarnings("unchecked")
public StateSerdes(String stateName,
Serde<K> keySerde,
Serde<V> valueSerde) {
this.stateName = stateName;

if (keySerde == null)
public StateSerdes(final String topic,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
if (topic == null) {
throw new IllegalArgumentException("topic cannot be null");
}
if (keySerde == null) {
throw new IllegalArgumentException("key serde cannot be null");
if (valueSerde == null)
}
if (valueSerde == null) {
throw new IllegalArgumentException("value serde cannot be null");
}

this.topic = topic;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
Expand Down Expand Up @@ -128,12 +135,12 @@ public Serializer<V> valueSerializer() {
}

/**
* Return the name of the state.
* Return the topic.
*
* @return the name of the state
* @return the topic
*/
public String stateName() {
return stateName;
public String topic() {
return topic;
}

/**
Expand All @@ -143,7 +150,7 @@ public String stateName() {
* @return the key as typed object
*/
public K keyFrom(byte[] rawKey) {
return keySerde.deserializer().deserialize(stateName, rawKey);
return keySerde.deserializer().deserialize(topic, rawKey);
}

/**
Expand All @@ -153,7 +160,7 @@ public K keyFrom(byte[] rawKey) {
* @return the value as typed object
*/
public V valueFrom(byte[] rawValue) {
return valueSerde.deserializer().deserialize(stateName, rawValue);
return valueSerde.deserializer().deserialize(topic, rawValue);
}

/**
Expand All @@ -163,7 +170,7 @@ public V valueFrom(byte[] rawValue) {
* @return the serialized key
*/
public byte[] rawKey(K key) {
return keySerde.serializer().serialize(stateName, key);
return keySerde.serializer().serialize(topic, key);
}

/**
Expand All @@ -173,6 +180,6 @@ public byte[] rawKey(K key) {
* @return the serialized value
*/
public byte[] rawValue(V value) {
return valueSerde.serializer().serialize(stateName, value);
return valueSerde.serializer().serialize(topic, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -51,11 +52,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
this.valueSerde = valueSerde;
}

@Override
public String name() {
return underlying.name();
}

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
Expand All @@ -69,7 +65,7 @@ public void init(final ProcessorContext context, final StateStore root) {
@SuppressWarnings("unchecked")
private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
this.serdes = new StateSerdes<>(underlying.name(),
this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
Expand All @@ -43,6 +44,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
private StateSerdes<K, AGG> serdes;
private InternalProcessorContext context;
private CacheFlushListener<Windowed<K>, AGG> flushListener;
private String topic;

CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
final Serde<K> keySerde,
Expand All @@ -56,6 +58,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i

@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
bytesStore.init(context, root);
initInternal((InternalProcessorContext) context);
}
Expand All @@ -64,13 +67,15 @@ public void init(final ProcessorContext context, final StateStore root) {
private void initInternal(final InternalProcessorContext context) {
this.context = context;

this.serdes = new StateSerdes<>(bytesStore.name(),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
keySchema.init(topic);
serdes = new StateSerdes<>(
topic,
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);


this.cacheName = context.taskId() + "-" + bytesStore.name();
this.cache = this.context.getCache();
cacheName = context.taskId() + "-" + bytesStore.name();
cache = context.getCache();
cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
Expand All @@ -85,7 +90,7 @@ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
validateStoreOpen();
final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key));
final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
keySchema.lowerRange(binarySessionId, earliestSessionEndTime),
keySchema.upperRange(binarySessionId, latestSessionStartTime));
Expand All @@ -106,7 +111,7 @@ public void remove(final Windowed<K> sessionKey) {
@Override
public void put(final Windowed<K> key, AGG value) {
validateStoreOpen();
final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer());
final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic);
final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
key.window().end(), context.partition(), context.topic());
cache.put(cacheName, binaryKey, entry);
Expand All @@ -122,12 +127,12 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(binaryKey);
if (!(newValue == null && oldValue == null)) {
flushListener.apply(key, newValue == null ? null : newValue, oldValue);
flushListener.apply(key, newValue, oldValue);
}
}
bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -62,17 +63,18 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
public void init(final ProcessorContext context, final StateStore root) {
underlying.init(context, root);
initInternal(context);
keySchema.init(context.applicationId());
}

@SuppressWarnings("unchecked")
private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
this.serdes = new StateSerdes<>(underlying.name(),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.name = context.taskId() + "-" + underlying.name();
this.cache = this.context.getCache();
name = context.taskId() + "-" + underlying.name();
cache = this.context.getCache();

cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
@Override
Expand Down Expand Up @@ -161,7 +163,7 @@ public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFro

return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
underlyingIterator,
new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
}

private V fetchPrevious(final Bytes key, final long timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

Expand All @@ -37,7 +38,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
@Override
public void init(final ProcessorContext context, final StateStore root) {
inner.init(context, root);
this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES);
this.changeLogger = new StoreChangeLogger<>(
inner.name(),
context,
WindowStoreUtils.getInnerStateSerde(
ProcessorStateManager.storeChangelogTopic(
context.applicationId(),
inner.name())));
}

@Override
Expand Down
Loading

0 comments on commit aea1465

Please sign in to comment.