Skip to content

Commit

Permalink
KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV (
Browse files Browse the repository at this point in the history
…apache#15810)

Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than
3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were
trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record
to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata
versions), but it would still emit the record, even though it had no effect.

Reviewers: Igor Soarez <[email protected]>
  • Loading branch information
cmccabe authored May 2, 2024
1 parent cdc4caa commit a3f2414
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ private ElectionResult(int node, boolean unclean) {
}
}

public List<Integer> targetIsr() {
return targetIsr;
}

// VisibleForTesting
/**
* Perform leader election based on the partition state and leader election type.
Expand Down Expand Up @@ -365,44 +369,61 @@ private void tryElection(PartitionChangeRecord record) {
}

/**
* Trigger a leader epoch bump if one is needed.
*
* We need to bump the leader epoch if:
* 1. The leader changed, or
* 2. The new replica list does not contain all the nodes that the old replica list did.
* Trigger a leader epoch bump if one is needed because of replica reassignment.
*
* Changes that do NOT fall in any of these categories will increase the partition epoch, but
* not the leader epoch. Note that if the leader epoch increases, the partition epoch will
* always increase as well; there is no case where the partition epoch increases more slowly
* than the leader epoch.
*
* If the PartitionChangeRecord sets the leader field to something other than
* NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
* case 1. In this function, we check for cases 2 and 3, and handle them by manually
* setting record.leader to the current leader.
*
* In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
* that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
* bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if
* the ISR expanded.
* Note that if the leader epoch increases, the partition epoch will always increase as well; there is no
* case where the partition epoch increases more slowly than the leader epoch.
*/
void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) {
if (record.leader() != NO_LEADER_CHANGE) {
// The leader is already changing, so there will already be a leader epoch bump.
return;
}
if (!Replicas.contains(targetReplicas, partition.replicas)) {
// If the new replica list does not contain all the brokers that the old one did,
// ensure that there will be a leader epoch bump by setting the leader field.
record.setLeader(partition.leader);
}
}

/**
* Trigger a leader epoch bump if one is needed because of an ISR shrink.
*
* In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must
* be bumped during ISR shrink for compatability with ZK brokers.
* Note that it's important to call this function only after we have set the ISR field in
* the PartitionChangeRecord.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
if (record.leader() == NO_LEADER_CHANGE) {
boolean bumpLeaderEpochOnIsrShrink = metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;

if (!Replicas.contains(targetReplicas, partition.replicas)) {
// Reassignment
record.setLeader(partition.leader);
} else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) {
// ISR shrink
record.setLeader(partition.leader);
}
void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord record) {
if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled)) {
// We only need to bump the leader epoch on an ISR shrink in two cases:
//
// 1. In older metadata versions before 3.6, there was a bug (KAFKA-15021) in the
// broker replica manager that required that the leader epoch be bumped whenever
// the ISR shrank. (This was never necessary for EXPANSIONS, only SHRINKS.)
//
// 2. During ZK migration, we bump the leader epoch during all ISR shrinks, in order
// to maintain compatibility with migrating brokers that are still in ZK mode.
//
// If we're not in either case, we can exit here.
return;
}
if (record.leader() != NO_LEADER_CHANGE) {
// The leader is already changing, so there will already be a leader epoch bump.
return;
}
if (record.isr() == null) {
// The ISR is not changing.
return;
}
if (!Replicas.contains(record.isr(), partition.isr)) {
// If the new ISR list does not contain all the brokers that the old one did,
// ensure that there will be a leader epoch bump by setting the leader field.
record.setLeader(partition.leader);
}
}

/**
* @return true if the reassignment was completed; false otherwise.
*/
private void completeReassignmentIfNeeded() {
PartitionReassignmentReplicas reassignmentReplicas =
new PartitionReassignmentReplicas(
Expand Down Expand Up @@ -435,7 +456,7 @@ public Optional<ApiMessageAndVersion> build() {

tryElection(record);

triggerLeaderEpochBumpIfNeeded(record);
triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);

maybeUpdateRecordElr(record);

Expand All @@ -449,6 +470,8 @@ public Optional<ApiMessageAndVersion> build() {
record.setIsr(targetIsr);
}

triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);

maybeUpdateLastKnownLeader(record);

setAssignmentChanges(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,15 @@ private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataV
0,
r -> r != 3,
metadataVersion,
2)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}

private static PartitionChangeBuilder createFooBuilder(short version) {
return new PartitionChangeBuilder(FOO,
FOO_ID,
0,
r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version),
2).
setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}

private static PartitionChangeBuilder createFooBuilder(short partitionChangeRecordVersion) {
return createFooBuilder(metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion));
}

private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[] {
Expand Down Expand Up @@ -295,102 +289,145 @@ public void testElectLeader(short version) {
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
}

private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
PartitionChangeRecord record,
int expectedLeader) {
builder.triggerLeaderEpochBumpIfNeeded(record);
private static void testTriggerLeaderEpochBumpIfNeeded(
PartitionChangeBuilder builder,
PartitionChangeRecord record,
int expectedLeader
) {
builder.triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
record.setIsr(builder.targetIsr());
builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
assertEquals(expectedLeader, record.leader());
}

@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testTriggerLeaderEpochBumpIfNeeded(short version) {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
// Shrinking the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
// Expanding the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
),
public void testNoLeaderEpochBumpIfNothingChanged(short version) {
testTriggerLeaderEpochBumpIfNeeded(createFooBuilder(version),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
// Expanding the ISR during migration doesn't increase leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
.setZkMigrationEnabled(true),
NO_LEADER_CHANGE);
}

/**
* Test that shrinking the ISR doesn't increase the leader epoch in later MVs.
*/
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
new PartitionChangeRecord().setLeader(2), 2);

// Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
// See KAFKA-15021 for details.
testTriggerLeaderEpochBumpIfNeededLeader(
new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2, 2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
}

/**
* Test that shrinking the ISR does increase the leader epoch in earlier MVs.
* See KAFKA-15021 for details.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
1
);
1);
}

/**
* Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on.
*/
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testLeaderEpochBumpZkMigration(short version) {
// KAFKA-15109: Shrinking the ISR while in ZK migration mode requires a leader epoch bump
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(true),
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).
setZkMigrationEnabled(true).
setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
1
);
1);
}

testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(false),
/**
* Test that expanding the ISR doesn't increase the leader epoch.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
NO_LEADER_CHANGE);
}

// For older MV, always expect the epoch to increase regardless of ZK migration
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(MetadataVersion.IBP_3_5_IV2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(true),
/**
* Test that expanding the ISR doesn't increase the leader epoch during ZK migration.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).
setZkMigrationEnabled(true).
setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
new PartitionChangeRecord(),
1
);
NO_LEADER_CHANGE);
}

testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder(MetadataVersion.IBP_3_5_IV2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(false),
/**
* Test that changing the replica set such that not all the old replicas remain
* always results in a leader epoch increase.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetReplicas(Arrays.asList(2, 1, 4)),
new PartitionChangeRecord(),
1
);
1);
}

/**
* Regression test for KAFKA-16624. Tests that when targetIsr is the empty list, but we
* cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
PartitionRegistration partition = new PartitionRegistration.Builder().
setReplicas(new int[] {2}).
setDirectories(new Uuid[]{
Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw")
}).
setIsr(new int[] {2}).
setLeader(2).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
FOO_ID,
0,
r -> true,
metadataVersion,
2).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
setTargetReplicas(Arrays.asList());
PartitionChangeRecord record = new PartitionChangeRecord();
builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
assertEquals(NO_LEADER_CHANGE, record.leader());
}

@ParameterizedTest
Expand Down

0 comments on commit a3f2414

Please sign in to comment.