Skip to content

Commit

Permalink
KAFKA-10847: improve throughput of stream-stream join with spurious l…
Browse files Browse the repository at this point in the history
…eft/outer join fix (apache#10917)

The fix to avoid spurious left/outer stream-stream join results, showed
very low throughput for RocksDB, due to excessive creation of iterators.
Instead of trying to emit left/outer stream-stream join result for every
input record, this PR adds tracking of the lower timestamp bound of
left/outer join candidates, and only tries to emit them (and create an
iterator) if they are potentially old enough.

Reviewers: Luke Chen <[email protected]>, Guozhang Wang <[email protected]>, Sergio Peña <[email protected]>
  • Loading branch information
mjsax authored Jul 1, 2021
1 parent 4fd71a7 commit a095e1f
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 32 deletions.
15 changes: 15 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,9 @@ public static class InternalConfig {
// Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847)
public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__";

// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";

public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {
Expand All @@ -948,6 +951,18 @@ public static boolean getBoolean(final Map<String, Object> configs, final String
return defaultValue;
}
}

public static long getLong(final Map<String, Object> configs, final String key, final long defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Number) {
return ((Number) value).longValue();
} else if (value instanceof String) {
return Long.parseLong((String) value);
} else {
log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a numeric value.");
return defaultValue;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,26 @@ class KStreamImplJoin {
private final boolean leftOuter;
private final boolean rightOuter;

static class MaxObservedStreamTime {
private long maxObservedStreamTime = ConsumerRecord.NO_TIMESTAMP;
static class TimeTracker {
private long emitIntervalMs = 50L;
long streamTime = ConsumerRecord.NO_TIMESTAMP;
long minTime = Long.MAX_VALUE;
long nextTimeToEmit;

public void setEmitInterval(final long emitIntervalMs) {
this.emitIntervalMs = emitIntervalMs;
}

public void advanceStreamTime(final long recordTimestamp) {
streamTime = Math.max(recordTimestamp, streamTime);
}

public void advance(final long streamTime) {
maxObservedStreamTime = Math.max(streamTime, maxObservedStreamTime);
public void updatedMinTime(final long recordTimestamp) {
minTime = Math.min(recordTimestamp, minTime);
}

public long get() {
return maxObservedStreamTime;
public void advanceNextTimeToEmit() {
nextTimeToEmit += emitIntervalMs;
}
}

Expand Down Expand Up @@ -148,7 +159,7 @@ public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
}

// Time shared between joins to keep track of the maximum stream time
final MaxObservedStreamTime maxObservedStreamTime = new MaxObservedStreamTime();
final TimeTracker sharedTimeTracker = new TimeTracker();

final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
Expand All @@ -158,7 +169,7 @@ public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
joiner,
leftOuter,
outerJoinWindowStore.map(StoreBuilder::name),
maxObservedStreamTime
sharedTimeTracker
);

final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(
Expand All @@ -168,7 +179,7 @@ public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
outerJoinWindowStore.map(StoreBuilder::name),
maxObservedStreamTime
sharedTimeTracker
);

final PassThrough<K1, R> joinMerge = new PassThrough<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -33,6 +34,7 @@

import java.util.Optional;

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

Expand All @@ -51,15 +53,15 @@ class KStreamKStreamJoin<K, R, V1, V2> implements org.apache.kafka.streams.proce
private final Optional<String> outerJoinWindowName;
private final boolean isLeftSide;

private final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime;
private final TimeTracker sharedTimeTracker;

KStreamKStreamJoin(final boolean isLeftSide,
final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime) {
final TimeTracker sharedTimeTracker) {
this.isLeftSide = isLeftSide;
this.otherWindowName = otherWindowName;
if (isLeftSide) {
Expand All @@ -74,7 +76,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements org.apache.kafka.streams.proce
this.joiner = joiner;
this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
this.maxObservedStreamTime = maxObservedStreamTime;
this.sharedTimeTracker = sharedTimeTracker;
}

@Override
Expand All @@ -96,11 +98,20 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte

if (enableSpuriousResultFix
&& StreamsConfig.InternalConfig.getBoolean(
context().appConfigs(),
context.appConfigs(),
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
true
)) {
outerJoinWindowStore = outerJoinWindowName.map(context::getStateStore);
sharedTimeTracker.nextTimeToEmit = context.currentSystemTimeMs();

sharedTimeTracker.setEmitInterval(
StreamsConfig.InternalConfig.getLong(
context.appConfigs(),
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
1000L
)
);
}
}

Expand All @@ -127,10 +138,10 @@ key, value, context().topic(), context().partition(), context().offset()
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

maxObservedStreamTime.advance(inputRecordTimestamp);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

// Emit all non-joined records which window has closed
if (inputRecordTimestamp == maxObservedStreamTime.get()) {
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinWindowStore.ifPresent(this::emitNonJoinedOuterRecords);
}

Expand Down Expand Up @@ -169,9 +180,10 @@ key, value, context().topic(), context().partition(), context().offset()
//
// This condition below allows us to process the out-of-order records without the need
// to hold it in the temporary outer store
if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
if (!outerJoinWindowStore.isPresent() || timeTo < sharedTimeTracker.streamTime) {
context().forward(key, joiner.apply(key, value, null));
} else {
sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
outerJoinWindowStore.ifPresent(store -> store.put(
KeyAndJoinSide.make(isLeftSide, key),
LeftOrRightValue.make(isLeftSide, value),
Expand All @@ -183,15 +195,34 @@ key, value, context().topic(), context().partition(), context().offset()

@SuppressWarnings("unchecked")
private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
// calling `store.all()` creates an iterator what is an expensive operation on RocksDB;
// to reduce runtime cost, we try to avoid paying those cost

// only try to emit left/outer join results if there _might_ be any result records
if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
return;
}
// throttle the emit frequency to a (configurable) interval;
// we use processing time to decouple from data properties,
// as throttling is a non-functional performance optimization
if (context.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
return;
}
sharedTimeTracker.advanceNextTimeToEmit();

// reset to MAX_VALUE in case the store is empty
sharedTimeTracker.minTime = Long.MAX_VALUE;

try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
while (it.hasNext()) {
final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();

final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
final LeftOrRightValue value = record.value;
sharedTimeTracker.minTime = windowedKey.window().start();

// Skip next records if window has not closed
if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= maxObservedStreamTime.get()) {
if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,20 +257,22 @@ public void testLeftJoinDuplicates() {
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(0, "A0-0", 0L);
inputTopic2.pipeInput(1, "a0", 111L);
// bump stream-time to trigger left-join results
inputTopic2.pipeInput(2, "dummy", 500L);

processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 0L),
new KeyValueTimestamp<>(0, "A0-0+null", 0L)
);

// verifies joined duplicates are emitted
inputTopic1.pipeInput(2, "A2", 200L);
inputTopic1.pipeInput(2, "A2-0", 200L);
inputTopic2.pipeInput(2, "a2", 201L);
inputTopic1.pipeInput(2, "A2", 1000L);
inputTopic1.pipeInput(2, "A2-0", 1000L);
inputTopic2.pipeInput(2, "a2", 1001L);

processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(2, "A2+a2", 201L),
new KeyValueTimestamp<>(2, "A2-0+a2", 201L)
new KeyValueTimestamp<>(2, "A2+a2", 1001L),
new KeyValueTimestamp<>(2, "A2-0+a2", 1001L)
);

// this record should expired non-joined records, but because A2 and A2-0 are joined and
Expand Down Expand Up @@ -662,7 +664,7 @@ public void testOrdering() {
// push two items to the primary stream; the other window is empty; this should not produce any item yet
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = {}
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(1, "A1", 100L);
Expand All @@ -671,10 +673,10 @@ public void testOrdering() {
// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
// the joined records
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 100) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = { 1:a1 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 0L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public void testOuterJoinDuplicates() {
inputTopic2.pipeInput(1, "a1", 0L);
inputTopic2.pipeInput(1, "a1-0", 0L);
inputTopic2.pipeInput(1, "a0", 111L);
// bump stream-time to trigger outer-join results
inputTopic2.pipeInput(3, "dummy", 211);

processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "null+a1", 0L),
Expand All @@ -205,10 +207,11 @@ public void testOuterJoinDuplicates() {

// this record should expired non-joined records; only null+a0 will be emitted because
// it did not have a join
inputTopic2.pipeInput(3, "a3", 315L);
inputTopic2.pipeInput(3, "dummy", 1500L);

processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "null+a0", 111L)
new KeyValueTimestamp<>(1, "null+a0", 111L),
new KeyValueTimestamp<>(3, "null+dummy", 211)
);
}
}
Expand Down Expand Up @@ -462,7 +465,7 @@ public void testOrdering() {
// push two items to the primary stream; the other window is empty; this should not produce any item yet
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// --> w2 = {}
inputTopic1.pipeInput(0, "A0", 0L);
inputTopic1.pipeInput(1, "A1", 100L);
Expand All @@ -471,10 +474,10 @@ public void testOrdering() {
// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
// the joined records
// by the time they were produced before
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
// w2 = { }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 100) }
// --> w2 = { 0:a0 (ts: 110) }
inputTopic2.pipeInput(1, "a1", 110L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 0L),
Expand Down Expand Up @@ -541,7 +544,7 @@ public void testGracePeriod() {
// w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
inputTopic2.pipeInput(0, "dummy", 112L);
inputTopic2.pipeInput(0, "dummy", 211);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "null+a1", 0L),
new KeyValueTimestamp<>(0, "A0+null", 0L)
Expand Down

0 comments on commit a095e1f

Please sign in to comment.