Skip to content

Commit

Permalink
KAFKA-12204; Implement DescribeCluster API in the broker (KIP-700) (a…
Browse files Browse the repository at this point in the history
…pache#9903)

This PR implements the DescribeCluster API in the broker.

Reviewers: Rajini Sivaram <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
dajac authored Jan 19, 2021
1 parent f7c0b0d commit 302eee6
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public enum ApiKeys {
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false);
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false, RecordBatch.MAGIC_VALUE_V0, false, true);

// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return EnvelopeRequest.parse(buffer, apiVersion);
case FETCH_SNAPSHOT:
return FetchSnapshotRequest.parse(buffer, apiVersion);
case DESCRIBE_CLUSTER:
return DescribeClusterRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return EnvelopeResponse.parse(responseBuffer, version);
case FETCH_SNAPSHOT:
return FetchSnapshotResponse.parse(responseBuffer, version);
case DESCRIBE_CLUSTER:
return DescribeClusterResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;

public class DescribeClusterRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<DescribeClusterRequest> {

private final DescribeClusterRequestData data;

public Builder(DescribeClusterRequestData data) {
super(ApiKeys.DESCRIBE_CLUSTER);
this.data = data;
}

@Override
public DescribeClusterRequest build(final short version) {
return new DescribeClusterRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final DescribeClusterRequestData data;

public DescribeClusterRequest(DescribeClusterRequestData data, short version) {
super(ApiKeys.DESCRIBE_CLUSTER, version);
this.data = data;
}

@Override
public DescribeClusterRequestData data() {
return data;
}

@Override
public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
return new DescribeClusterResponse(new DescribeClusterResponseData()
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()));
}

@Override
public String toString(final boolean verbose) {
return data.toString();
}

public static DescribeClusterRequest parse(ByteBuffer buffer, short version) {
return new DescribeClusterRequest(new DescribeClusterRequestData(new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

public class DescribeClusterResponse extends AbstractResponse {

private final DescribeClusterResponseData data;

public DescribeClusterResponse(DescribeClusterResponseData data) {
super(ApiKeys.DESCRIBE_CLUSTER);
this.data = data;
}

public Map<Integer, Node> nodes() {
return data.brokers().valuesList().stream()
.map(b -> new Node(b.brokerId(), b.host(), b.port(), b.rack()))
.collect(Collectors.toMap(Node::id, Function.identity()));
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public DescribeClusterResponseData data() {
return data;
}

public static DescribeClusterResponse parse(ByteBuffer buffer, short version) {
return new DescribeClusterResponse(new DescribeClusterResponseData(new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 60,
"type": "request",
"name": "DescribeClusterRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include cluster authorized operations." }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 60,
"type": "response",
"name": "DescribeClusterResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top-level error code, or 0 if there was no error" },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." },
{ "name": "ClusterId", "type": "string", "versions": "0+",
"about": "The cluster ID that responding broker belongs to." },
{ "name": "ControllerId", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
"about": "The ID of the controller broker." },
{ "name": "Brokers", "type": "[]DescribeClusterBroker", "versions": "0+",
"about": "Each broker in the response.", "fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
"about": "The broker ID." },
{ "name": "Host", "type": "string", "versions": "0+",
"about": "The broker hostname." },
{ "name": "Port", "type": "int32", "versions": "0+",
"about": "The broker port." },
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The rack of the broker, or null if it has not been assigned to a rack." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this cluster." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBroker;
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
Expand Down Expand Up @@ -293,6 +295,28 @@ public void testDescribeGroupsResponseVersions() throws Exception {
testAllMessageRoundTripsFromVersion((short) 4, baseResponse);
}

@Test
public void testDescribeClusterRequestVersions() throws Exception {
testAllMessageRoundTrips(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true));
}

@Test
public void testDescribeClusterResponseVersions() throws Exception {
DescribeClusterResponseData data = new DescribeClusterResponseData()
.setBrokers(new DescribeClusterBrokerCollection(
Collections.singletonList(new DescribeClusterBroker()
.setBrokerId(1)
.setHost("localhost")
.setPort(9092)
.setRack("rack1")).iterator()))
.setClusterId("clusterId")
.setControllerId(1)
.setClusterAuthorizedOperations(10);

testAllMessageRoundTrips(data);
}

@Test
public void testGroupInstanceIdIgnorableInDescribeGroupsResponse() throws Exception {
DescribeGroupsResponseData responseWithGroupInstanceId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
import org.apache.kafka.common.message.DescribeAclsResponseData.AclDescription;
import org.apache.kafka.common.message.DescribeAclsResponseData.DescribeAclsResource;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBroker;
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResourceResult;
Expand Down Expand Up @@ -509,6 +513,36 @@ public void testSerialization() throws Exception {
checkResponse(createAlterClientQuotasResponse(), 0, true);
}

@Test
public void testDescribeClusterSerialization() throws Exception {
for (int v = ApiKeys.DESCRIBE_CLUSTER.oldestVersion(); v <= ApiKeys.DESCRIBE_CLUSTER.latestVersion(); v++) {
checkRequest(createDescribeClusterRequest(v), true);
checkErrorResponse(createDescribeClusterRequest(v), unknownServerException, true);
checkResponse(createDescribeClusterResponse(), v, true);
}
}

private DescribeClusterRequest createDescribeClusterRequest(int version) {
return new DescribeClusterRequest.Builder(
new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true))
.build((short) version);
}

private DescribeClusterResponse createDescribeClusterResponse() {
return new DescribeClusterResponse(
new DescribeClusterResponseData()
.setBrokers(new DescribeClusterBrokerCollection(
Collections.singletonList(new DescribeClusterBroker()
.setBrokerId(1)
.setHost("localhost")
.setPort(9092)
.setRack("rack1")).iterator()))
.setClusterId("clusterId")
.setControllerId(1)
.setClusterAuthorizedOperations(10));
}

@Test
public void testResponseHeader() {
ResponseHeader header = createResponseHeader((short) 1);
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/network/RequestConvertToJson.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object RequestConvertToJson {
case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version)
case req: WriteTxnMarkersRequest => WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
case req: FetchSnapshotRequest => FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeClusterRequest => DescribeClusterRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so.");
}
Expand Down Expand Up @@ -154,6 +155,7 @@ object RequestConvertToJson {
case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version)
case res: FetchSnapshotResponse => FetchSnapshotResponseDataJsonConverter.write(res.data, version)
case res: DescribeClusterResponse => DescribeClusterResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}
Expand Down
Loading

0 comments on commit 302eee6

Please sign in to comment.