Skip to content

Commit

Permalink
KAFKA-4060: Remove zk client dependency in kafka streams
Browse files Browse the repository at this point in the history
dguy guozhangwang This is a new PR for KAFKA-4060.

Author: Hojjat Jafarpour <[email protected]>
Author: Hojjat Jafarpour <[email protected]>

Reviewers: Damian Guy, Matthias J. Sax, Isamel Juma, Guozhang Wang

Closes apache#1884 from hjafarpour/KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-new
  • Loading branch information
Hojjat Jafarpour authored and guozhangwang committed Jan 11, 2017
1 parent a95170f commit 4b71c0b
Show file tree
Hide file tree
Showing 24 changed files with 672 additions and 411 deletions.
15 changes: 0 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,6 @@ project(':streams') {
compile libs.slf4jApi
compile libs.rocksDBJni

// the following 3 dependencies should be removed after KIP-4 (zkclient, zookeeper, jacksonDatabind)
compile (libs.zkclient) {
exclude module: 'zookeeper'
}

compile (libs.zookeeper) {
exclude module: 'jline'
exclude module: 'netty'
// users should be able to choose the logging implementation (and slf4j bridge)
exclude module: 'slf4j-log4j12'
exclude module: 'log4j'
}

compile libs.jacksonDatabind

testCompile project(':clients').sourceSets.test.output
testCompile project(':core')
testCompile project(':core').sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

Expand Down
301 changes: 186 additions & 115 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,242 +17,116 @@

package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class InternalTopicManager {

private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);

// TODO: the following ZK dependency should be removed after KIP-4
private static final String ZK_TOPIC_PATH = "/brokers/topics";
private static final String ZK_BROKER_PATH = "/brokers/ids";
private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics";
// TODO: the following LogConfig dependency should be removed after KIP-4
public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
public static final String RETENTION_MS = "retention.ms";
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
private static final int MAX_TOPIC_READY_TRY = 5;

final ZkClient zkClient;
private final int replicationFactor;
private final long windowChangeLogAdditionalRetention;

private class ZKStringSerializer implements ZkSerializer {

/**
* @throws AssertionError if the byte String encoding type is not supported
*/
@Override
public byte[] serialize(Object data) {
try {
return ((String) data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError(e);
}
}

/**
* @throws AssertionError if the byte String encoding type is not supported
*/
@Override
public Object deserialize(byte[] bytes) {
try {
if (bytes == null)
return null;
else
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new AssertionError(e);
}
}
}

public InternalTopicManager() {
this.zkClient = null;
this.replicationFactor = 0;
this.windowChangeLogAdditionalRetention = WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
}
private final int replicationFactor;
private final StreamsKafkaClient streamsKafkaClient;

public InternalTopicManager(String zkConnect, final int replicationFactor, long windowChangeLogAdditionalRetention) {
this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
this.streamsKafkaClient = streamsKafkaClient;
this.replicationFactor = replicationFactor;
this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
}

public void makeReady(InternalTopicConfig topic, int numPartitions) {
boolean topicNotReady = true;

while (topicNotReady) {
Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic.name());
/**
* Prepares the set of given internal topics. If the topic with the correct number of partitions exists ignores it. For the ones with different number of
* partitions delete them and create new ones with correct number of partitons along with the non existing topics.
* @param topic
*/
public void makeReady(final InternalTopicConfig topic, int numPartitions) {

if (topicMetadata == null) {
try {
createTopic(topic, numPartitions, replicationFactor);
} catch (ZkNodeExistsException e) {
// ignore and continue
}
} else {
if (topicMetadata.size() > numPartitions) {
// else if topic exists with more #.partitions than needed, delete in order to re-create it
try {
deleteTopic(topic.name());
} catch (ZkNodeExistsException e) {
// ignore and continue
}
} else if (topicMetadata.size() < numPartitions) {
// else if topic exists with less #.partitions than needed, add partitions
try {
addPartitions(topic.name(), numPartitions - topicMetadata.size(), replicationFactor, topicMetadata);
} catch (ZkNoNodeException e) {
// ignore and continue
}
} else {
topicNotReady = false;
}
Map<InternalTopicConfig, Integer> topics = new HashMap<>();
topics.put(topic, numPartitions);
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
Collection<MetadataResponse.TopicMetadata> topicMetadatas = streamsKafkaClient.fetchTopicMetadata();
Map<InternalTopicConfig, Integer> topicsToBeDeleted = getTopicsToBeDeleted(topics, topicMetadatas);
Map<InternalTopicConfig, Integer> topicsToBeCreated = filterExistingTopics(topics, topicMetadatas);
topicsToBeCreated.putAll(topicsToBeDeleted);
streamsKafkaClient.deleteTopics(topicsToBeDeleted);
streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention);
return;
} catch (StreamsException ex) {
log.debug("Could not create internal topics: " + ex.getMessage());
log.debug("Retry #" + i);
}
}
throw new StreamsException("Could not create internal topics.");
}

private List<Integer> getBrokers() {
List<Integer> brokers = new ArrayList<>();
for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
brokers.add(Integer.parseInt(broker));
}
Collections.sort(brokers);

log.debug("Read brokers {} from ZK in partition assignor.", brokers);

return brokers;
}

@SuppressWarnings("unchecked")
private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);

if (data == null) return null;

public void close() {
try {
ObjectMapper mapper = new ObjectMapper();

Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {

});

Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");

log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);

return partitions;
streamsKafkaClient.close();
} catch (IOException e) {
throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
log.warn("Could not close StreamsKafkaClient.");
}
}

private void createTopic(InternalTopicConfig topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic.name(), numPartitions);
ObjectMapper mapper = new ObjectMapper();
List<Integer> brokers = getBrokers();
int numBrokers = brokers.size();
if (numBrokers < replicationFactor) {
log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " + numBrokers);
replicationFactor = numBrokers;
}

Map<Integer, List<Integer>> assignment = new HashMap<>();

for (int i = 0; i < numPartitions; i++) {
ArrayList<Integer> brokerList = new ArrayList<>();
for (int r = 0; r < replicationFactor; r++) {
int shift = r * numBrokers / replicationFactor;
brokerList.add(brokers.get((i + shift) % numBrokers));
/**
* Return the non existing topics.
*
* @param topicsPartitionsMap
* @param topicsMetadata
* @return
*/
private Map<InternalTopicConfig, Integer> filterExistingTopics(final Map<InternalTopicConfig, Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
Map<InternalTopicConfig, Integer> nonExistingTopics = new HashMap<>();
// Add the topics that don't exist to the nonExistingTopics.
for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
if (existingTopicNamesPartitions.get(topic.name()) == null) {
nonExistingTopics.put(topic, topicsPartitionsMap.get(topic));
}
assignment.put(i, brokerList);
}
// write out config first just like in AdminUtils.scala createOrUpdateTopicPartitionAssignmentPathInZK()
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("version", 1);
dataMap.put("config", topic.toProperties(windowChangeLogAdditionalRetention));
String data = mapper.writeValueAsString(dataMap);
zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
} catch (JsonProcessingException e) {
throw new StreamsException("Error while creating topic config in ZK for internal topic " + topic, e);
}

// try to write to ZK with open ACL
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("version", 1);
dataMap.put("partitions", assignment);
String data = mapper.writeValueAsString(dataMap);

zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
} catch (JsonProcessingException e) {
throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
}
return nonExistingTopics;
}

private void deleteTopic(String topic) throws ZkNodeExistsException {
log.debug("Deleting topic {} from ZK in partition assignor.", topic);

zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
}

private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);

List<Integer> brokers = getBrokers();
int numBrokers = brokers.size();
if (numBrokers < replicationFactor) {
log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " + numBrokers);
replicationFactor = numBrokers;
}

int startIndex = existingAssignment.size();

Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);

for (int i = 0; i < numPartitions; i++) {
ArrayList<Integer> brokerList = new ArrayList<>();
for (int r = 0; r < replicationFactor; r++) {
int shift = r * numBrokers / replicationFactor;
brokerList.add(brokers.get((i + shift) % numBrokers));
/**
* Return the topics that exist but have different partiton number to be deleted.
* @param topicsPartitionsMap
* @param topicsMetadata
* @return
*/
private Map<InternalTopicConfig, Integer> getTopicsToBeDeleted(final Map<InternalTopicConfig, Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
Map<InternalTopicConfig, Integer> deleteTopics = new HashMap<>();
// Add the topics that don't exist to the nonExistingTopics.
for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
if (existingTopicNamesPartitions.get(topic.name()) != null) {
if (existingTopicNamesPartitions.get(topic.name()) != topicsPartitionsMap.get(topic)) {
deleteTopics.put(topic, topicsPartitionsMap.get(topic));
}
}
newAssignment.put(i + startIndex, brokerList);
}
return deleteTopics;
}

// try to write to ZK with open ACL
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("version", 1);
dataMap.put("partitions", newAssignment);

ObjectMapper mapper = new ObjectMapper();
String data = mapper.writeValueAsString(dataMap);

zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
} catch (JsonProcessingException e) {
throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
private Map<String, Integer> getExistingTopicNamesPartitions(Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
// The names of existing topics
Map<String, Integer> existingTopicNamesPartitions = new HashMap<>();
for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
existingTopicNamesPartitions.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
}
return existingTopicNamesPartitions;
}

}
Loading

0 comments on commit 4b71c0b

Please sign in to comment.