Skip to content

Commit

Permalink
KAFKA-14936: Add restore logic (3/N) (apache#14027)
Browse files Browse the repository at this point in the history
Added restore logic for the buffer in grace period joins.

Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
wcarlson5 authored Aug 19, 2023
1 parent 82ae77f commit d0b7677
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
final String bufferStoreName = name + "-Buffer";
final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();

buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name));
buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name, true));
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueAndTimestamp;
Expand All @@ -36,7 +41,8 @@
import static java.util.Objects.requireNonNull;

public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends WrappedStateStore<RocksDBTimeOrderedKeyValueBytesStore, Object, Object> implements TimeOrderedKeyValueBuffer<K, V, V> {

private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private final long gracePeriod;
private long bufferSize;
private long minTimestamp;
Expand All @@ -45,17 +51,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends WrappedStateStore<Ro
private Serde<V> valueSerde;
private final String topic;
private int seqnum;
private final boolean loggingEnabled;
private int partition;
private String changelogTopic;
private InternalProcessorContext context;

public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store,
final Duration gracePeriod,
final String topic) {
final String topic,
final boolean loggingEnabled) {
super(store);
this.gracePeriod = gracePeriod.toMillis();
minTimestamp = Long.MAX_VALUE;
numRecords = 0;
bufferSize = 0;
seqnum = 0;
this.topic = topic;
this.loggingEnabled = loggingEnabled;
}

@SuppressWarnings("unchecked")
Expand All @@ -74,6 +86,11 @@ public void init(final ProcessorContext context, final StateStore root) {
@Override
public void init(final StateStoreContext context, final StateStore root) {
wrapped().init(context, wrapped());
this.context = ProcessorContextUtils.asInternalProcessorContext(context);
partition = context.taskId().partition();
if (loggingEnabled) {
changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE);
}
}

@Override
Expand Down Expand Up @@ -103,6 +120,11 @@ public void evictWhile(final Supplier<Boolean> predicate, final Consumer<Evictio
callback.accept(new Eviction<>(key, value, bufferValue.context()));

wrapped().remove(keyValue.key);

if (loggingEnabled) {
logTombstone(keyValue.key);
}

numRecords--;
bufferSize = bufferSize - computeRecordSize(keyValue.key, bufferValue);
}
Expand Down Expand Up @@ -137,6 +159,12 @@ public boolean put(final long time, final Record<K, V> record, final ProcessorRe
final byte[] valueBytes = valueSerde.serializer().serialize(topic, record.value());
final BufferValue buffered = new BufferValue(null, null, valueBytes, recordContext);
wrapped().put(serializedKey, buffered.serialize(0).array());

if (loggingEnabled) {
final BufferKey key = new BufferKey(0L, serializedKey);
logValue(serializedKey, key, buffered);
}

bufferSize += computeRecordSize(serializedKey, buffered);
numRecords++;
if (minTimestamp() > record.timestamp()) {
Expand Down Expand Up @@ -172,4 +200,37 @@ private static long computeRecordSize(final Bytes key, final BufferValue value)
private void maybeUpdateSeqnumForDups() {
seqnum = (seqnum + 1) & 0x7FFFFFFF;
}

private void logValue(final Bytes key, final BufferKey bufferKey, final BufferValue value) {
final int sizeOfBufferTime = Long.BYTES;
final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
buffer.putLong(bufferKey.time());
final byte[] array = buffer.array();
((RecordCollector.Supplier) context).recordCollector().send(
changelogTopic,
key,
array,
null,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER,
null,
null);
}

private void logTombstone(final Bytes key) {
((RecordCollector.Supplier) context).recordCollector().send(
changelogTopic,
key,
null,
null,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER,
null,
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;

/**
Expand All @@ -42,7 +46,30 @@ protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final by

@Override
Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<ConsumerRecord<byte[], byte[]>> records) {
throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore");
final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
for (final ConsumerRecord<byte[], byte[]> record : records) {
final long timestamp = WindowKeySchema.extractStoreTimestamp(record.key());
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment != null) {
//null segment is if it has expired, so we don't want those records
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
record,
consistencyEnabled,
position
);
try {
final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());

final byte[] baseKey = TimeFirstWindowKeySchema.fromNonPrefixWindowKey(record.key());
segment.addToBatch(new KeyValue<>(baseKey, record.value()), batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + name(), e);
}
}
}
return writeBatchMap;
}

@Override
Expand Down
Loading

0 comments on commit d0b7677

Please sign in to comment.