Skip to content

Commit

Permalink
MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (ap…
Browse files Browse the repository at this point in the history
…ache#4763)

The invalid topic name is already handled locally so it is unnecessary to send the DeleteTopicsRequest. This PR adds a count to MockClient for testing.

Reviewers: Colin Patrick McCabe <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
chia7712 authored and hachikuji committed Apr 5, 2018
1 parent d5db4e9 commit 53d4267
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1150,9 +1150,10 @@ void handleFailure(Throwable throwable) {
}

@Override
public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
public DeleteTopicsResult deleteTopics(Collection<String> topicNames,
DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
final List<String> validTopicNames = new ArrayList<>(topicNames.size());
for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
Expand All @@ -1161,6 +1162,7 @@ public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
validTopicNames.add(topicName);
}
}
final long now = time.milliseconds();
Expand All @@ -1169,7 +1171,7 @@ public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,

@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs);
}

@Override
Expand Down Expand Up @@ -1204,7 +1206,7 @@ void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
if (!topicNames.isEmpty()) {
if (!validTopicNames.isEmpty()) {
runnable.call(call, now);
}
return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
Expand Down
10 changes: 9 additions & 1 deletion clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -88,7 +89,7 @@ public FutureResponse(Node node,
private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
private volatile int numBlockingWakeups = 0;

private final AtomicInteger totalRequestCount = new AtomicInteger(0);
public MockClient(Time time) {
this(time, null);
}
Expand Down Expand Up @@ -394,6 +395,7 @@ public void reset() {
futureResponses.clear();
metadataUpdates.clear();
authenticationErrors.clear();
totalRequestCount.set(0);
}

public boolean hasPendingMetadataUpdates() {
Expand Down Expand Up @@ -461,6 +463,7 @@ public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?>
@Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse, RequestCompletionHandler callback) {
totalRequestCount.incrementAndGet();
return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs,
expectResponse, callback);
}
Expand Down Expand Up @@ -503,4 +506,9 @@ private static class MetadataUpdate {
this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
}

// visible for testing
public int totalRequestCount() {
return totalRequestCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,22 +230,26 @@ public void testInvalidTopicNames() throws Exception {
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().totalRequestCount());

Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().totalRequestCount());

List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}

Map<String, KafkaFuture<Void>> createFutures =
env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().totalRequestCount());
}
}

Expand Down

0 comments on commit 53d4267

Please sign in to comment.