Skip to content

Commit

Permalink
MINOR: Add unit tests for StreamsRebalanceListener (apache#9258)
Browse files Browse the repository at this point in the history
Reviewers: Walker Carlson <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
cadonna authored Sep 15, 2020
1 parent 634c917 commit f28713f
Showing 1 changed file with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -47,6 +50,13 @@ public class StreamsRebalanceListenerTest {
assignmentErrorCode
);

@Before
public void before() {
expect(streamThread.state()).andStubReturn(null);
expect(taskManager.activeTaskIds()).andStubReturn(null);
expect(taskManager.standbyTaskIds()).andStubReturn(null);
}

@Test
public void shouldThrowMissingSourceTopicException() {
replay(taskManager, streamThread);
Expand All @@ -61,14 +71,56 @@ public void shouldThrowMissingSourceTopicException() {
}

@Test
public void shouldHandleOnPartitionAssigned() {
public void shouldHandleAssignedPartitions() {
taskManager.handleRebalanceComplete();
expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(null);
expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);
replay(taskManager, streamThread);
assignmentErrorCode.set(AssignorError.NONE.code());

streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());

verify(taskManager, streamThread);
}

@Test
public void shouldHandleRevokedPartitions() {
final Collection<TopicPartition> partitions = Collections.singletonList(new TopicPartition("topic", 0));
expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
taskManager.handleRevocation(partitions);
replay(streamThread, taskManager);

streamsRebalanceListener.onPartitionsRevoked(partitions);

verify(taskManager, streamThread);
}

@Test
public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(null);
replay(streamThread, taskManager);

streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition("topic", 0)));

verify(taskManager, streamThread);
}

@Test
public void shouldNotHandleEmptySetOfRevokedPartitions() {
expect(streamThread.setState(State.PARTITIONS_REVOKED)).andReturn(State.RUNNING);
replay(streamThread, taskManager);

streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());

verify(taskManager, streamThread);
}

@Test
public void shouldHandleLostPartitions() {
taskManager.handleLostAll();
replay(streamThread, taskManager);

streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition("topic", 0)));

verify(taskManager, streamThread);
}
}

0 comments on commit f28713f

Please sign in to comment.