Skip to content

Commit

Permalink
KAFKA-2894: WorkerSinkTask should rewind offsets on rebalance
Browse files Browse the repository at this point in the history
Author: Konstantine Karantasis <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes apache#1771 from kkonstantine/KAFKA-2894-rewind-offsets-on-rebalance
  • Loading branch information
kkonstantine authored and ewencp committed Aug 23, 2016
1 parent 313aad4 commit cec2769
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,16 @@ private void rewind() {
if (offsets.isEmpty()) {
return;
}
for (TopicPartition tp: offsets.keySet()) {
Long offset = offsets.get(tp);
for (Map.Entry<TopicPartition, Long> entry: offsets.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
if (offset != null) {
log.trace("Rewind {} to offset {}.", tp, offset);
consumer.seek(tp, offset);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
currentOffsets.put(tp, new OffsetAndMetadata(offset));
} else {
log.warn("Cannot rewind {} to null offset.", tp);
}
}
context.clearOffsets();
Expand Down Expand Up @@ -462,6 +465,8 @@ else if (!context.pausedPartitions().isEmpty())
if (rebalanceException == null || rebalanceException instanceof WakeupException) {
try {
openPartitions(partitions);
// Rewind should be applied only if openPartitions succeeds.
rewind();
} catch (RuntimeException e) {
// The consumer swallows exceptions raised in the rebalance listener, so we need to store
// exceptions and rethrow when poll() returns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void testPollsInBackground() throws Exception {
expectPollInitialAssignment();

Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
expectStopTask(10L);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -182,7 +182,7 @@ public void testCommit() throws Exception {
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
expectOffsetFlush(1L, null, null, 0, true);
expectStopTask(2);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -221,7 +221,7 @@ public void testCommitTaskFlushFailure() throws Exception {
PowerMock.expectLastCall();
consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
PowerMock.expectLastCall();
expectStopTask(2);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -259,7 +259,7 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception {
PowerMock.expectLastCall();
consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
PowerMock.expectLastCall();
expectStopTask(2);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -291,7 +291,7 @@ public void testCommitConsumerFailure() throws Exception {
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
expectOffsetFlush(1L, null, new Exception(), 0, true);
expectStopTask(2);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -323,7 +323,7 @@ public void testCommitTimeout() throws Exception {
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
expectStopTask(4);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -402,7 +402,7 @@ public Object answer() throws Throwable {
consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
PowerMock.expectLastCall();

expectStopTask(0);
expectStopTask();

PowerMock.replayAll();

Expand Down Expand Up @@ -446,7 +446,7 @@ public Object answer() throws Throwable {
}
});

expectStopTask(3);
expectStopTask();
PowerMock.replayAll();

workerTask.initialize(TASK_CONFIG);
Expand All @@ -460,6 +460,33 @@ public Object answer() throws Throwable {
PowerMock.verifyAll();
}

@Test
public void testRewindOnRebalanceDuringPoll() throws Exception {
expectInitializeTask();
expectPollInitialAssignment();

expectRebalanceDuringPoll().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
assertEquals(0, offsets.size());
return null;
}
});

expectStopTask();
PowerMock.replayAll();

workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
workerTask.iteration();
workerTask.stop();
workerTask.close();

PowerMock.verifyAll();
}

private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);

Expand Down Expand Up @@ -493,7 +520,7 @@ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
EasyMock.expectLastCall();
}

private void expectStopTask(final long expectedMessages) throws Exception {
private void expectStopTask() throws Exception {
sinkTask.stop();
PowerMock.expectLastCall();

Expand Down Expand Up @@ -561,6 +588,51 @@ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
return EasyMock.expectLastCall();
}

@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectRebalanceDuringPoll() throws Exception {
final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3);

final long startOffset = 40L;
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, startOffset);

EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
// "Sleep" so time will progress
time.sleep(1L);

sinkTaskContext.getValue().offset(offsets);
rebalanceListener.getValue().onPartitionsAssigned(partitions);

ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
}
});

EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);

sinkTask.open(partitions);
EasyMock.expectLastCall();

consumer.seek(TOPIC_PARTITION, startOffset);
EasyMock.expectLastCall();

EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
sinkTask.put(EasyMock.anyObject(Collection.class));
return EasyMock.expectLastCall();
}

private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
final RuntimeException flushError,
final Exception consumerCommitError,
Expand Down

0 comments on commit cec2769

Please sign in to comment.