Skip to content

Commit

Permalink
KAFKA-16952: Do not bump broker epoch when re-registering the same in…
Browse files Browse the repository at this point in the history
…carnation (apache#16333)

* KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation

As part of KIP-858 (Handle JBOD broker disk failure in KRaft), we added some code that caused the
broker to re-register itself when transitioning from a MetadataVersion that did not support broker
directory IDs, to one that did. This code was necessary because otherwise the controller would not
be aware of what directories the broker held.

However, prior to this PR, the re-registration process acted exactly like a full registration. That
is, it bumped the broker epoch (which is meant to only be bumped on broker restart). This PR fixes
the code to keep the broker epoch the same if the incarnation ID is the same.

There are some other minor improvements here:

- The previous logic relied on a complicated combination of request version and previous broker
  epoch to understand if the request came from the same broker or not. This is not needed: either
  the incarnation ID is the same and it's the same process, or it is not and it isn't.

- We now log whether we're amending a registration, registering a previously unknown broker, or
  replacing a previous registration.

- Move changes to the HeartbeatManager to the end of the function, so that we will not do them if
  any validation step fails. Log4j messages are also generated at the end, for the same reason.

Reviewers: Ron Dagostino <[email protected]>
  • Loading branch information
cmccabe authored Jun 18, 2024
1 parent 191b647 commit 2fd00ce
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ public void activate() {
}
}

String clusterId() { // Visible for testing
return clusterId;
}

/**
* Transition this ClusterControlManager to standby.
*/
Expand Down Expand Up @@ -340,10 +344,10 @@ boolean zkRegistrationAllowed() {
* Process an incoming broker registration request.
*/
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long brokerEpoch,
FinalizedControllerFeatures finalizedFeatures,
short version) {
BrokerRegistrationRequestData request,
long newBrokerEpoch,
FinalizedControllerFeatures finalizedFeatures
) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
Expand All @@ -354,21 +358,14 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
int brokerId = request.brokerId();
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) {
log.debug("Received an unclean shutdown request");
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
}
Uuid prevIncarnationId = null;
if (existing != null) {
prevIncarnationId = existing.incarnationId();
if (heartbeatManager.hasValidSession(brokerId)) {
if (!existing.incarnationId().equals(request.incarnationId())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
"registered with that broker id.");
}
} else {
if (!existing.incarnationId().equals(request.incarnationId())) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
}
}
}

Expand Down Expand Up @@ -406,16 +403,9 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
setBrokerId(brokerId).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());

if (existing != null && request.incarnationId().equals(existing.incarnationId())) {
log.info("Amending registration of broker {}", request.brokerId());
record.setFenced(existing.fenced());
record.setInControlledShutdown(existing.inControlledShutdown());
}

for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
}
Expand All @@ -432,11 +422,39 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
record.setLogDirs(request.logDirs());
}

heartbeatManager.register(brokerId, record.fenced());

if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
int numRecordsAdded = records.size() - prevNumRecords;
if (existing == null) {
log.info("No previous registration found for broker {}. New incarnation ID is " +
"{}. Generated {} record(s) to clean up previous incarnations. New broker " +
"epoch is {}.", brokerId, request.incarnationId(), numRecordsAdded, newBrokerEpoch);
} else {
log.info("Registering a new incarnation of broker {}. Previous incarnation ID " +
"was {}; new incarnation ID is {}. Generated {} record(s) to clean up " +
"previous incarnations. Broker epoch will become {}.", brokerId,
existing.incarnationId(), request.incarnationId(), numRecordsAdded,
newBrokerEpoch);
}
record.setBrokerEpoch(newBrokerEpoch);
} else {
log.info("Amending registration of broker {}, incarnation ID {}. Broker epoch remains {}.",
request.brokerId(), request.incarnationId(), existing.epoch());
record.setFenced(existing.fenced());
record.setInControlledShutdown(existing.inControlledShutdown());
record.setBrokerEpoch(existing.epoch());
}
records.add(new ApiMessageAndVersion(record, featureControl.metadataVersion().
registerBrokerRecordVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));

if (!request.incarnationId().equals(prevIncarnationId)) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
}
heartbeatManager.register(brokerId, record.fenced());

return ControllerResult.atomicOf(records, new BrokerRegistrationReply(record.brokerEpoch()));
}

ControllerResult<Void> registerController(ControllerRegistrationRequestData request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2177,7 +2177,7 @@ public CompletableFuture<BrokerRegistrationReply> registerBroker(
() -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
finalizedFeatures(Long.MAX_VALUE), context.requestHeader().requestApiVersion());
finalizedFeatures(Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;

import org.junit.jupiter.api.Test;
Expand All @@ -61,6 +60,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -276,8 +276,7 @@ public void testRegistrationWithIncorrectClusterId() {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1));
new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
}

private static Stream<Arguments> metadataVersions() {
Expand Down Expand Up @@ -322,8 +321,7 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1);
new FinalizedControllerFeatures(Collections.emptyMap(), 456L));

short expectedVersion = metadataVersion.registerBrokerRecordVersion();

Expand Down Expand Up @@ -564,8 +562,7 @@ public void testRegistrationWithUnsupportedMetadataVersion() {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());

assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 7 and 7, inclusive.",
Expand All @@ -582,8 +579,7 @@ public void testRegistrationWithUnsupportedMetadataVersion() {
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
}

@Test
Expand Down Expand Up @@ -637,10 +633,10 @@ public void testRegisterWithDuplicateDirectoryId() {

void registerNewBrokerWithDirs(ClusterControlManager clusterControl, int brokerId, List<Uuid> dirs) {
BrokerRegistrationRequestData data = new BrokerRegistrationRequestData().setBrokerId(brokerId)
.setClusterId(TestUtils.fieldValue(clusterControl, ClusterControlManager.class, "clusterId"))
.setClusterId(clusterControl.clusterId())
.setIncarnationId(Uuid.randomUuid()).setLogDirs(dirs);
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(Collections.emptyMap(), 456L);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures, (short) 1);
ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(data, 123L, finalizedFeatures);
RecordTestUtils.replayAll(clusterControl, result.records());
}

Expand Down Expand Up @@ -681,4 +677,45 @@ public void testDefaultDir() {
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(3));
assertEquals(DirectoryId.UNASSIGNED, clusterControl.defaultDir(4));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
build();
clusterControl.activate();
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
100,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
records());
RecordTestUtils.replayAll(clusterControl, clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(newIncarnationId ?
Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww") : Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L)).
records());
if (newIncarnationId) {
assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(111,
clusterControl.brokerRegistrations().get(1).epoch());
} else {
assertEquals(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ"),
clusterControl.brokerRegistrations().get(1).incarnationId());
assertEquals(100,
clusterControl.brokerRegistrations().get(1).epoch());
}
}
}

0 comments on commit 2fd00ce

Please sign in to comment.