Skip to content

Commit

Permalink
[FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stric…
Browse files Browse the repository at this point in the history
…ter assignment contracts

This commit refactors the local partition assignment logic to be located
in a strict contract-defining method, to make it explicit of the
expected partition to subtask assignment without relying solely on
hashCode's of kafka partitions.
  • Loading branch information
tzulitai committed Jul 28, 2017
1 parent b1f37ef commit 888fabe
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
Expand Down Expand Up @@ -407,11 +408,9 @@ public void open(Configuration configuration) throws Exception {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (AbstractPartitionDiscoverer.shouldAssignToThisSubtask(
restoredStateEntry.getKey(),
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks())) {

if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());

return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka.internals;

/**
* Utility for assigning Kafka partitions to consumer subtasks.
*/
public class KafkaTopicPartitionAssigner {

/**
* Returns the index of the target subtask that a specific Kafka partition should be
* assigned to.
*
* <p>The resulting distribution of partitions of a single topic has the following contract:
* <ul>
* <li>1. Uniformly distributed across subtasks</li>
* <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
* subtask indices) by using the partition id as the offset from a starting index
* (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
* determined using the topic name).</li>
* </ul>
*
* <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
* contract to locally filter out partitions that it should not subscribe to, guaranteeing
* that all partitions of a single topic will always be assigned to some subtask in a
* uniformly distributed manner.
*
* @param partition the Kafka partition
* @param numParallelSubtasks total number of parallel subtasks
*
* @return index of the target subtask that the Kafka partition should be assigned to.
*/
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void testPartitionsEqualConsumersFixedPartitions() throws Exception {
new KafkaTopicPartition(TEST_TOPIC, 2),
new KafkaTopicPartition(TEST_TOPIC, 3));

int numSubtasks = mockGetAllPartitionsForTopicsReturn.size();

// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
int numConsumers = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numSubtasks);

for (int subtaskIndex = 0; subtaskIndex < mockGetAllPartitionsForTopicsReturn.size(); subtaskIndex++) {
TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
topicsDescriptor,
Expand All @@ -85,7 +90,7 @@ public void testPartitionsEqualConsumersFixedPartitions() throws Exception {
assertEquals(1, initialDiscovery.size());
assertTrue(contains(mockGetAllPartitionsForTopicsReturn, initialDiscovery.get(0).getPartition()));
assertEquals(
getExpectedSubtaskIndex(initialDiscovery.get(0), mockGetAllPartitionsForTopicsReturn.size()),
getExpectedSubtaskIndex(initialDiscovery.get(0), numConsumers, numSubtasks),
subtaskIndex);

// subsequent discoveries should not find anything
Expand Down Expand Up @@ -114,6 +119,9 @@ public void testMultiplePartitionsPerConsumersFixedPartitions() {
final int minPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers;
final int maxPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers + 1;

// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numConsumers);

for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
topicsDescriptor,
Expand All @@ -130,7 +138,7 @@ public void testMultiplePartitionsPerConsumersFixedPartitions() {
for (KafkaTopicPartition p : initialDiscovery) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
}

// subsequent discoveries should not find anything
Expand Down Expand Up @@ -163,6 +171,9 @@ public void testPartitionsFewerThanConsumersFixedPartitions() {

final int numConsumers = 2 * mockGetAllPartitionsForTopicsReturn.size() + 3;

// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numConsumers);

for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
topicsDescriptor,
Expand All @@ -178,7 +189,7 @@ public void testPartitionsFewerThanConsumersFixedPartitions() {
for (KafkaTopicPartition p : initialDiscovery) {
// check that the element was actually contained
assertTrue(allPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
}

// subsequent discoveries should not find anything
Expand Down Expand Up @@ -222,6 +233,9 @@ public void testGrowingPartitions() {
final int minNewPartitionsPerConsumer = allPartitions.size() / numConsumers;
final int maxNewPartitionsPerConsumer = allPartitions.size() / numConsumers + 1;

// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
int startIndex = KafkaTopicPartitionAssigner.assign(allPartitions.get(0), numConsumers);

TestPartitionDiscoverer partitionDiscovererSubtask0 = new TestPartitionDiscoverer(
topicsDescriptor,
0,
Expand Down Expand Up @@ -260,19 +274,19 @@ public void testGrowingPartitions() {
for (KafkaTopicPartition p : initialDiscoverySubtask0) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
}

for (KafkaTopicPartition p : initialDiscoverySubtask1) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
}

for (KafkaTopicPartition p : initialDiscoverySubtask2) {
// check that the element was actually contained
assertTrue(allInitialPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
}

// all partitions must have been assigned
Expand All @@ -299,32 +313,32 @@ public void testGrowingPartitions() {

for (KafkaTopicPartition p : initialDiscoverySubtask0) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
}

for (KafkaTopicPartition p : initialDiscoverySubtask1) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
}

for (KafkaTopicPartition p : initialDiscoverySubtask2) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
}

for (KafkaTopicPartition p : secondDiscoverySubtask0) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
}

for (KafkaTopicPartition p : secondDiscoverySubtask1) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
}

for (KafkaTopicPartition p : secondDiscoverySubtask2) {
assertTrue(allNewPartitions.remove(p));
assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
}

// all partitions must have been assigned
Expand Down Expand Up @@ -370,7 +384,7 @@ public void testDeterministicAssignmentWithDifferentFetchedPartitionOrdering() t
subtaskIndex,
numSubtasks,
createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturnOutOfOrder));
partitionDiscovererOutOfOrder.open();

List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
Expand Down Expand Up @@ -487,7 +501,15 @@ private List<List<KafkaTopicPartition>> deepClone(List<List<KafkaTopicPartition>
return clone;
}

private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int numTasks) {
return Math.abs(partition.hashCode() % numTasks);
/**
* Utility method that determines the expected subtask index a partition should be assigned to,
* depending on the start index and using the partition id as the offset from that start index
* in clockwise direction.
*
* <p>The expectation is based on the distribution contract of
* {@link KafkaTopicPartitionAssigner#assign(KafkaTopicPartition, int)}.
*/
private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int startIndex, int numSubtasks) {
return (startIndex + partition.getPartition()) % numSubtasks;
}
}

0 comments on commit 888fabe

Please sign in to comment.