Skip to content

Commit

Permalink
KAFKA-9274: Remove retries from InternalTopicManager (apache#9060)
Browse files Browse the repository at this point in the history
 - part of KIP-572
 - replace `retries` in InternalTopicManager with infinite retires plus a new timeout, based on consumer config MAX_POLL_INTERVAL_MS

Reviewers: David Jacot <[email protected]>, Boyang Chen <[email protected]>
  • Loading branch information
mjsax authored Aug 6, 2020
1 parent a5aadeb commit 9903013
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
Expand Down Expand Up @@ -1249,11 +1248,7 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {

log.debug("Current changelog positions: {}", allChangelogPositions);
final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets;
try {
allEndOffsets = fetchEndOffsets(allPartitions, adminClient);
} catch (final TimeoutException e) {
throw new StreamsException("Timed out obtaining end offsets from kafka", e);
}
allEndOffsets = fetchEndOffsets(allPartitions, adminClient);
log.debug("Current end offsets :{}", allEndOffsets);

for (final Map.Entry<TopicPartition, ListOffsetsResultInfo> entry : allEndOffsets.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,11 +1193,6 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

// add admin retries configs for creating topics
final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG));

// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -46,24 +45,18 @@
public class ClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);

public static class QuietStreamsConfig extends StreamsConfig {
public static final class QuietStreamsConfig extends StreamsConfig {
public QuietStreamsConfig(final Map<?, ?> props) {
super(props, false);
}
}

public static class QuietConsumerConfig extends ConsumerConfig {
public static final class QuietConsumerConfig extends ConsumerConfig {
public QuietConsumerConfig(final Map<String, Object> props) {
super(props, false);
}
}

public static final class QuietAdminClientConfig extends AdminClientConfig {
QuietAdminClientConfig(final StreamsConfig streamsConfig) {
// If you just want to look up admin configs, you don't care about the clientId
super(streamsConfig.getAdminConfigs("dummy"), false);
}
}

// currently admin client is shared among all threads
public static String getSharedAdminClientId(final String clientId) {
Expand Down Expand Up @@ -141,8 +134,8 @@ public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPar
public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final Collection<TopicPartition> partitions,
final Admin adminClient) {
return adminClient.listOffsets(
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
.all();
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))
).all();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils.QuietConsumerConfig;
import org.slf4j.Logger;

import java.util.HashMap;
Expand All @@ -45,33 +49,38 @@ public class InternalTopicManager {
"Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";

private final Logger log;
private final long windowChangeLogAdditionalRetention;
private final Map<String, String> defaultTopicConfigs = new HashMap<>();

private final short replicationFactor;
private final Time time;
private final Admin adminClient;

private final int retries;
private final short replicationFactor;
private final long windowChangeLogAdditionalRetention;
private final long retryBackOffMs;
private final long retryTimeoutMs;

private final Map<String, String> defaultTopicConfigs = new HashMap<>();

@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed
public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig) {
public InternalTopicManager(final Time time,
final Admin adminClient,
final StreamsConfig streamsConfig) {
this.time = time;
this.adminClient = adminClient;

final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
log = logContext.logger(getClass());

replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
final AdminClientConfig adminConfigs = new ClientUtils.QuietAdminClientConfig(streamsConfig);
retries = adminConfigs.getInt(AdminClientConfig.RETRIES_CONFIG);
retryBackOffMs = adminConfigs.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
retryBackOffMs = streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
final Map<String, Object> consumerConfig = streamsConfig.getMainConsumerConfigs("dummy", "dummy", -1);
// need to add mandatory configs; otherwise `QuietConsumerConfig` throws
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L;

log.debug("Configs:" + Utils.NL +
"\t{} = {}" + Utils.NL +
"\t{} = {}" + Utils.NL +
"\t{} = {}",
AdminClientConfig.RETRIES_CONFIG, retries,
StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor,
StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, windowChangeLogAdditionalRetention);

Expand All @@ -95,13 +104,15 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition assignor.", topics);

int remainingRetries = retries;
long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();

while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
while (!topicsNotReady.isEmpty()) {
final Set<String> tempUnknownTopics = new HashSet<>();
topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics, remainingRetries);
topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics);
newlyCreatedTopics.addAll(topicsNotReady);

if (!topicsNotReady.isEmpty()) {
Expand Down Expand Up @@ -153,27 +164,32 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
"Error message was: {}", topicName, cause.toString());
throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
}
} catch (final TimeoutException retryableException) {
log.error("Creating topic {} timed out.\n" +
"Error message was: {}", topicName, retryableException.toString());
}
}
}


if (!topicsNotReady.isEmpty()) {
log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries);
currentWallClockMs = time.milliseconds();

if (currentWallClockMs >= deadlineMs) {
final String timeoutError = String.format("Could not create topics within %d milliseconds. " +
"This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
log.error(timeoutError);
throw new TimeoutException(timeoutError);
}
log.info(
"Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
topicsNotReady,
retryBackOffMs,
deadlineMs - currentWallClockMs
);
Utils.sleep(retryBackOffMs);

remainingRetries--;
}
}

if (!topicsNotReady.isEmpty()) {
final String timeoutAndRetryError = String.format("Could not create topics after %d retries. " +
"This can happen if the Kafka cluster is temporary not available. " +
"You can increase admin client config `retries` to be resilient against this error.", retries);
log.error(timeoutAndRetryError);
throw new StreamsException(timeoutAndRetryError);
}
log.debug("Completed validating internal topics and created {}", newlyCreatedTopics);

return newlyCreatedTopics;
Expand All @@ -186,8 +202,7 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
*/
// visible for testing
protected Map<String, Integer> getNumPartitions(final Set<String> topics,
final Set<String> tempUnknownTopics,
final boolean hasRemainingRetries) {
final Set<String> tempUnknownTopics) {
log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);

final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
Expand All @@ -212,18 +227,17 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
"Error message was: {}", topicName, cause.toString());
} else if (cause instanceof LeaderNotAvailableException) {
tempUnknownTopics.add(topicName);
if (!hasRemainingRetries) {
// run out of retries, throw exception directly
throw new StreamsException(
String.format("The leader of the Topic %s is not available.", topicName), cause);
}
log.info("The leader of the Topic {} is not available.\n" +
log.debug("The leader of topic {} is not available.\n" +
"Error message was: {}", topicName, cause.toString());
} else {
log.error("Unexpected error during topic description for {}.\n" +
"Error message was: {}", topicName, cause.toString());
throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
}
} catch (final TimeoutException retryableException) {
tempUnknownTopics.add(topicName);
log.debug("Describing topic {} (to get number of partitions) timed out.\n" +
"Error message was: {}", topicName, retryableException.toString());
}
}

Expand All @@ -235,15 +249,13 @@ protected Map<String, Integer> getNumPartitions(final Set<String> topics,
*/
private Set<String> validateTopics(final Set<String> topicsToValidate,
final Map<String, InternalTopicConfig> topicsMap,
final Set<String> tempUnknownTopics,
final int remainingRetries) {
final Set<String> tempUnknownTopics) {
if (!topicsMap.keySet().containsAll(topicsToValidate)) {
throw new IllegalStateException("The topics map " + topicsMap.keySet() + " does not contain all the topics " +
topicsToValidate + " trying to validate.");
}

final Map<String, Integer> existedTopicPartition =
getNumPartitions(topicsToValidate, tempUnknownTopics, remainingRetries > 0);
final Map<String, Integer> existedTopicPartition = getNumPartitions(topicsToValidate, tempUnknownTopics);

final Set<String> topicsToCreate = new HashSet<>();
for (final String topicName : topicsToValidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
try {
allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
} catch (final TaskAssignmentException e) {
} catch (final TaskAssignmentException | TimeoutException e) {
return new GroupAssignment(
errorAssignment(clientMetadataMap,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
Expand Down Expand Up @@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

final Set<TaskId> statefulTasks = new HashSet<>();

final boolean probingRebalanceNeeded =
assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
final boolean probingRebalanceNeeded;
try {
probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
} catch (final TaskAssignmentException | TimeoutException e) {
return new GroupAssignment(
errorAssignment(clientMetadataMap,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
);
}

// ---------------- Step Three ---------------- //

Expand Down Expand Up @@ -449,10 +456,9 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,

/**
* @return a map of repartition topics and their metadata
* @throws TaskAssignmentException if there is incomplete source topic metadata due to missing source topic(s)
*/
private Map<String, InternalTopicConfig> computeRepartitionTopicMetadata(final Map<Integer, TopicsInfo> topicGroups,
final Cluster metadata) throws TaskAssignmentException {
final Cluster metadata) {
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
for (final TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
Expand Down Expand Up @@ -1356,9 +1362,6 @@ protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMet
return false;
}

/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public Admin adminClient() {
}

public InternalTopicManager internalTopicManager() {
return new InternalTopicManager(adminClient, streamsConfig);
return new InternalTopicManager(time(), adminClient, streamsConfig);
}

public CopartitionedTopicsEnforcer copartitionedTopicsEnforcer() {
Expand Down
Loading

0 comments on commit 9903013

Please sign in to comment.