diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java index 9849f187cf41..18d6a070d055 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java @@ -45,10 +45,6 @@ public short oldestAllowedVersion() { @Override public BrokerRegistrationRequest build(short version) { - if (version < 2) { - // Reset the PreviousBrokerEpoch to default if not supported. - data.setPreviousBrokerEpoch(new BrokerRegistrationRequestData().previousBrokerEpoch()); - } return new BrokerRegistrationRequest(data, version); } diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json index 24e4f07489b5..ace268db77a6 100644 --- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json +++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json @@ -15,9 +15,9 @@ // Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode. -// Version 2 adds the PreviousBrokerEpoch for the KIP-966 +// Version 2 adds LogDirs for KIP-858 -// Version 3 adds LogDirs for KIP-858 +// Version 3 adds the PreviousBrokerEpoch for the KIP-966 { "apiKey":62, "type": "request", @@ -58,9 +58,9 @@ "about": "The rack which this broker is in." }, { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false", "about": "If the required configurations for ZK migration are present, this value is set to true" }, - { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1", - "about": "The epoch before a clean shutdown." }, - { "name": "LogDirs", "type": "[]uuid", "versions": "3+", - "about": "Log directories configured in this broker which are available." } + { "name": "LogDirs", "type": "[]uuid", "versions": "2+", + "about": "Log directories configured in this broker which are available." }, + { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+", "default": "-1", "ignorable": true, + "about": "The epoch before a clean shutdown." } ] } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index a1791ccbe0bf..efa628605149 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -18,7 +18,6 @@ package kafka import java.util.Properties - import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index a7d91ca9df93..db386e4a92e6 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -84,7 +84,7 @@ case class ControllerMigrationSupport( class ControllerServer( val sharedServer: SharedServer, val configSchema: KafkaConfigSchema, - val bootstrapMetadata: BootstrapMetadata, + val bootstrapMetadata: BootstrapMetadata ) extends Logging { import kafka.server.Server._ @@ -216,7 +216,7 @@ class ControllerServer( startupDeadline, time) val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections) val quorumFeatures = new QuorumFeatures(config.nodeId, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled), controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava) val delegationTokenKeyString = { @@ -347,7 +347,7 @@ class ControllerServer( clusterId, time, s"controller-${config.nodeId}-", - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled), config.migrationEnabled, incarnationId, listenerInfo) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 42896a5c33db..603e8c90b269 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -677,6 +677,7 @@ object KafkaConfig { /** Internal Configurations **/ val UnstableApiVersionsEnableProp = "unstable.api.versions.enable" + val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable" /* Documentation */ /** ********* Zookeeper Configuration ***********/ @@ -1532,8 +1533,10 @@ object KafkaConfig { .define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC) /** Internal Configurations **/ - // This indicates whether unreleased APIs should be advertised by this broker. - .defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, LOW) + // This indicates whether unreleased APIs should be advertised by this node. + .defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, HIGH) + // This indicates whether unreleased MetadataVersions should be enabled on this node. + .defineInternal(UnstableMetadataVersionsEnableProp, BOOLEAN, false, HIGH) } /** ********* Remote Log Management Configuration *********/ @@ -2113,6 +2116,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami /** Internal Configurations **/ val unstableApiVersionsEnabled = getBoolean(KafkaConfig.UnstableApiVersionsEnableProp) + val unstableMetadataVersionsEnabled = getBoolean(KafkaConfig.UnstableMetadataVersionsEnableProp) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index b2db8b6e8a1e..30d836370a9a 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -59,10 +59,18 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString)) + val metadataVersion = getMetadataVersion(namespace, + Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString)) if (!metadataVersion.isKRaftSupported) { throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.") } + if (!metadataVersion.isProduction()) { + if (config.get.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.") + } else { + throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.") + } + } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). @@ -131,7 +139,7 @@ object StorageTool extends Logging { action(storeTrue()) formatParser.addArgument("--release-version", "-r"). action(store()). - help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}") + help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}") parser.parseArgsOrFail(args) } @@ -151,7 +159,7 @@ object StorageTool extends Logging { ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) - case None => MetadataVersion.latest() + case None => MetadataVersion.LATEST_PRODUCTION } Option(namespace.getString("release_version")) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 3e09dc8fbe89..0857e4ded30c 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -117,6 +117,6 @@ public void testNoAutoStart() { @ClusterTest public void testDefaults(ClusterConfig config) { - Assertions.assertEquals(MetadataVersion.IBP_3_7_IV1, config.metadataVersion()); + Assertions.assertEquals(MetadataVersion.IBP_3_7_IV3, config.metadataVersion()); } } diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index 150aa7e7d71d..1511e28a3d4f 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -41,6 +41,6 @@ String name() default ""; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; - MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV1; + MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV3; ClusterConfigProperty[] serverProperties() default {}; } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 77f1832bbe32..2cef03f67af2 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -198,6 +198,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) { if (brokerNode != null) { props.putAll(brokerNode.propertyOverrides()); } + props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true"); return new KafkaConfig(props, false, Option.empty()); } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 0be996ec2521..c670a7e9db02 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -317,6 +317,7 @@ abstract class QuorumTestHarness extends Logging { val props = propsList(0) props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString) props.setProperty(KafkaConfig.ProcessRolesProp, "controller") + props.setProperty(KafkaConfig.UnstableMetadataVersionsEnableProp, "true") if (props.getProperty(KafkaConfig.NodeIdProp) == null) { props.setProperty(KafkaConfig.NodeIdProp, "1000") } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 753f69d20be0..6be7f6b422d7 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -72,7 +72,9 @@ object ZkMigrationIntegrationTest { MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2, MetadataVersion.IBP_3_7_IV0, - MetadataVersion.IBP_3_7_IV1 + MetadataVersion.IBP_3_7_IV1, + MetadataVersion.IBP_3_7_IV2, + MetadataVersion.IBP_3_7_IV3 ).foreach { mv => val clusterConfig = ClusterConfig.defaultClusterBuilder() .metadataVersion(mv) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index a3454f4d6fec..b908fcc5e59d 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -2191,7 +2191,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV1, + interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV2, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 5a4cca14c58e..db4482e8ddb5 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -33,6 +33,8 @@ import org.apache.kafka.common.metadata.UserScramCredentialRecord import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} import org.junit.jupiter.api.{Test, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -203,8 +205,8 @@ Found problem: def testDefaultMetadataVersion(): Unit = { val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")) val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = None) - assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(), - "Expected the default metadata.version to be the latest version") + assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), mv.featureLevel(), + "Expected the default metadata.version to be the latest production version") } @Test @@ -390,4 +392,45 @@ Found problem: assertFalse(DirectoryId.reserved(metaProps.directoryId().get())) } finally Utils.delete(tempDir) } + + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def testFormattingUnstableMetadataVersionBlocked(enableUnstable: Boolean): Unit = { + var exitString: String = "" + var exitStatus: Int = 1 + def exitProcedure(status: Int, message: Option[String]) : Nothing = { + exitStatus = status + exitString = message.getOrElse("") + throw new StorageToolTestException(exitString) + } + Exit.setExitProcedure(exitProcedure) + val properties = newSelfManagedProperties() + val propsFile = TestUtils.tempFile() + val propsStream = Files.newOutputStream(propsFile.toPath) + try { + properties.setProperty(KafkaConfig.LogDirsProp, TestUtils.tempDir().toString) + properties.setProperty(KafkaConfig.UnstableMetadataVersionsEnableProp, enableUnstable.toString) + properties.store(propsStream, "config.props") + } finally { + propsStream.close() + } + val args = Array("format", "-c", s"${propsFile.toPath}", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.latest().toString) + try { + StorageTool.main(args) + } catch { + case _: StorageToolTestException => + } finally { + Exit.resetExitProcedure() + } + if (enableUnstable) { + assertEquals("", exitString) + assertEquals(0, exitStatus) + } else { + assertEquals(s"Metadata version ${MetadataVersion.latest().toString} is not ready for " + + "production use yet.", exitString) + assertEquals(1, exitStatus) + } + } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index effb0ae67d6a..bf622853829f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -331,6 +331,7 @@ object TestUtils extends Logging { }.mkString(",") val props = new Properties + props.put(KafkaConfig.UnstableMetadataVersionsEnableProp, "true") if (zkConnect == null) { props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString) props.put(KafkaConfig.NodeIdProp, nodeId.toString) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index b6766ac7e6f4..aadb27e937e0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -54,11 +54,13 @@ static public Optional reasonNotSupported( return Optional.empty(); } - public static Map defaultFeatureMap() { + public static Map defaultFeatureMap(boolean enableUnstable) { Map features = new HashMap<>(1); features.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), - MetadataVersion.latest().featureLevel())); + enableUnstable ? + MetadataVersion.latest().featureLevel() : + MetadataVersion.LATEST_PRODUCTION.featureLevel())); return features; } diff --git a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json index 0471b9875f6f..f22669943cac 100644 --- a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json +++ b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json @@ -17,8 +17,8 @@ "apiKey": 5, "type": "metadata", "name": "PartitionChangeRecord", - // Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966. - // Version 2 adds Directories for KIP-858 + // Version 1 adds Directories for KIP-858. + // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966. "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ @@ -43,14 +43,14 @@ "about": "null if the adding replicas didn't change; the new adding replicas otherwise." }, { "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5, "about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." }, + { "name": "Directories", "type": "[]uuid", "default": "null", + "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 8, + "about": "null if the log dirs didn't change; the new log directory for each replica otherwise."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6, + "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 6, "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }, { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7, - "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }, - { "name": "Directories", "type": "[]uuid", "default": "null", - "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 8, - "about": "null if the log dirs didn't change; the new log directory for each replica otherwise."} + "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 7, + "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." } ] } diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json index f8b6c1b7a8bc..5c84a2e556f9 100644 --- a/metadata/src/main/resources/common/metadata/PartitionRecord.json +++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json @@ -17,8 +17,8 @@ "apiKey": 3, "type": "metadata", "name": "PartitionRecord", - // Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966. - // Version 2 adds Directories for KIP-858 + // Version 1 adds Directories for KIP-858 + // Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966. "validVersions": "0-2", "flexibleVersions": "0+", "fields": [ @@ -42,13 +42,13 @@ "about": "The epoch of the partition leader." }, { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change anything in the partition." }, + { "name": "Directories", "type": "[]uuid", "versions": "1+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}, { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1, + "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 1, "about": "The eligible leader replicas of this partition." }, { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", - "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2, - "about": "The last known eligible leader replicas of this partition." }, - { "name": "Directories", "type": "[]uuid", "versions": "2+", - "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."} + "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 2, + "about": "The last known eligible leader replicas of this partition." } ] } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 6a66bd3c6dba..91bab7b01098 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -76,7 +76,7 @@ public void testReplay(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -137,7 +137,7 @@ public void testReplayRegisterBrokerRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -190,7 +190,7 @@ public void testReplayBrokerRegistrationChangeRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -245,7 +245,7 @@ public void testRegistrationWithIncorrectClusterId() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -275,7 +275,7 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); @@ -332,7 +332,7 @@ public void testUnregister() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -371,7 +371,7 @@ public void testPlaceReplicas(int numUsableBrokers) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); @@ -424,7 +424,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index bc65e5a48a81..c2be58a519f3 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -76,7 +76,7 @@ private static Map versionMap(Object... args) { } public static QuorumFeatures features(Object... args) { - Map features = QuorumFeatures.defaultFeatureMap(); + Map features = QuorumFeatures.defaultFeatureMap(true); features.putAll(rangeMap(args)); return new QuorumFeatures(0, features, emptyList()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index af49639a6bfb..8b346b1c612d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -34,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mockito; import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; @@ -53,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; -import static org.mockito.Mockito.when; @Timeout(value = 40) @@ -115,16 +113,17 @@ public void testChangeRecordIsNoOp() { private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); - // TODO remove this after after MetadataVersion bump for KIP-858 - private static MetadataVersion metadataVersionForDirAssignmentInfo() { - MetadataVersion metadataVersion = Mockito.spy(MetadataVersion.latest()); - when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true); - return metadataVersion; - } - private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) { - return isDirAssignmentEnabled(version) ? metadataVersionForDirAssignmentInfo() : - isElrEnabled(version) ? MetadataVersion.IBP_3_7_IV1 : MetadataVersion.IBP_3_7_IV0; + switch (version) { + case (short) 0: + return MetadataVersion.IBP_3_7_IV0; + case (short) 1: + return MetadataVersion.IBP_3_7_IV2; + case (short) 2: + return MetadataVersion.IBP_3_7_IV3; + default: + throw new RuntimeException("Unknown PartitionChangeRecord version " + version); + } } private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) { @@ -132,7 +131,9 @@ private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataV } private static PartitionChangeBuilder createFooBuilder(short version) { - return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version)); + return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 2). + setEligibleLeaderReplicasEnabled(isElrEnabled(version)); } private static final PartitionRegistration BAR = new PartitionRegistration.Builder(). @@ -155,15 +156,13 @@ private static PartitionChangeBuilder createFooBuilder(short version) { private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); private static boolean isElrEnabled(short partitionChangeRecordVersion) { - return partitionChangeRecordVersion > 0; - } - - private static boolean isDirAssignmentEnabled(short partitionChangeRecordVersion) { - return partitionChangeRecordVersion > 1; + return partitionChangeRecordVersion >= 2; } private static PartitionChangeBuilder createBarBuilder(short version) { - return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version)); + return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 2). + setEligibleLeaderReplicasEnabled(isElrEnabled(version)); } private static final PartitionRegistration BAZ = new PartitionRegistration.Builder(). @@ -183,7 +182,9 @@ private static PartitionChangeBuilder createBarBuilder(short version) { private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A"); private static PartitionChangeBuilder createBazBuilder(short version) { - return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version)); + return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, + metadataVersionForPartitionChangeRecordVersion(version), 2). + setEligibleLeaderReplicasEnabled(isElrEnabled(version)); } private static final PartitionRegistration OFFLINE_WITHOUT_ELR = new PartitionRegistration.Builder(). @@ -213,9 +214,16 @@ private static PartitionChangeBuilder createBazBuilder(short version) { private final static Uuid OFFLINE_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); - private static PartitionChangeBuilder createOfflineBuilder(short version) { - return version > 0 ? new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version)) : - new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version)); + private static PartitionChangeBuilder createOfflineBuilder(short partitionChangeRecordVersion) { + MetadataVersion metadataVersion = + metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion); + if (metadataVersion.isElrSupported()) { + return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1, + metadataVersion, 2).setEligibleLeaderReplicasEnabled(true); + } else { + return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1, + metadataVersion, 2).setEligibleLeaderReplicasEnabled(false); + } } private static void assertElectLeaderEquals(PartitionChangeBuilder builder, @@ -392,7 +400,8 @@ public void testIsrChangeAndLeaderChange(short version) { setPartitionId(0). setIsr(Arrays.asList(2, 3)). setLeader(2), version)), - createFooBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build()); + createFooBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest. + newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build()); } @ParameterizedTest @@ -402,7 +411,7 @@ public void testReassignmentRearrangesReplicas(short version) { setTopicId(FOO_ID). setPartitionId(0). setReplicas(Arrays.asList(3, 2, 1)); - if (version > 1) { + if (version >= 1) { Map dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories); expectedRecord.setDirectories(Arrays.asList(dirs.get(3), dirs.get(2), dirs.get(1))); } @@ -421,12 +430,13 @@ public void testIsrEnlargementCompletesReassignment(short version) { setLeader(2). setRemovingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()); - if (version > 1) { + if (version >= 1) { Map dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories); expectedRecord.setDirectories(Arrays.asList(dirs.get(2), dirs.get(3), dirs.get(4))); } assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)), - createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build()); + createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest. + newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build()); } @ParameterizedTest @@ -442,7 +452,7 @@ public void testRevertReassignment(short version) { setLeader(1). setRemovingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()); - if (version > 1) { + if (version >= 1) { Map dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories); expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3))); } @@ -469,7 +479,7 @@ public void testRemovingReplicaReassignment(short version) { setReplicas(Arrays.asList(1, 2)). setIsr(Arrays.asList(2, 1)). setLeader(1); - if (version > 1) { + if (version >= 1) { Map dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories); expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2))); } @@ -493,7 +503,7 @@ public void testAddingReplicaReassignment(short version) { setPartitionId(0). setReplicas(Arrays.asList(1, 2, 3, 4)). setAddingReplicas(Collections.singletonList(4)); - if (version > 1) { + if (version >= 1) { Map dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories); expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3), DirectoryId.UNASSIGNED)); } @@ -529,16 +539,13 @@ public void testUncleanLeaderElection(short version) { .setLeader(1) .setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()); - if (version > 0) { + if (version >= 2) { // The test partition has ELR, so unclean election will clear these fiedls. record.setEligibleLeaderReplicas(Collections.emptyList()) .setLastKnownELR(Collections.emptyList()); } - expectedRecord = new ApiMessageAndVersion( - record, - version - ); + expectedRecord = new ApiMessageAndVersion(record, version); assertEquals( Optional.of(expectedRecord), createOfflineBuilder(version).setElection(Election.UNCLEAN).build() @@ -781,7 +788,8 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) { .setPartitionEpoch(200) .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -795,16 +803,13 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) { .setIsr(Arrays.asList(1, 2)) .setLeader(-2) .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE); - if (version > 0) { + if (version >= 2) { record.setEligibleLeaderReplicas(Arrays.asList(3, 4)); } - ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion( - record, - version - ); + ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version); assertEquals(Optional.of(expectedRecord), builder.build()); partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); - if (version > 0) { + if (version >= 2) { assertTrue(Arrays.equals(new int[]{3, 4}, partition.elr), partition.toString()); assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString()); } else { @@ -834,7 +839,8 @@ public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) { .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -850,10 +856,7 @@ public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) { // Both versions will set the elr and lastKnownElr as empty list. record.setEligibleLeaderReplicas(Collections.emptyList()) .setLastKnownELR(Collections.emptyList()); - ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion( - record, - version - ); + ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version); assertEquals(Optional.of(expectedRecord), builder.build()); partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); assertEquals(0, partition.elr.length); @@ -881,7 +884,8 @@ public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) { .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -893,18 +897,15 @@ public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) { .setIsr(Arrays.asList(1, 4)) .setLeader(-2) .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE); - if (version == 0) { + if (version < 2) { record.setEligibleLeaderReplicas(Collections.emptyList()); record.setLastKnownELR(Collections.emptyList()); } // No change is expected to ELR/LastKnownELR. - ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion( - record, - version - ); + ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version); assertEquals(Optional.of(expectedRecord), builder.build()); partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); - if (version > 0) { + if (version >= 2) { assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString()); assertTrue(Arrays.equals(new int[]{2}, partition.lastKnownElr), partition.toString()); } else { @@ -934,7 +935,8 @@ public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(shor .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Min ISR is 3. - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(false); @@ -946,19 +948,16 @@ public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(shor .setPartitionId(0) .setLeader(-2) .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE); - if (version > 0) { + if (version >= 2) { record.setEligibleLeaderReplicas(Arrays.asList(2)) .setLastKnownELR(Arrays.asList(3)); } else { record.setEligibleLeaderReplicas(Collections.emptyList()); } - ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion( - record, - version - ); + ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version); assertEquals(Optional.of(expectedRecord), builder.build()); partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); - if (version > 0) { + if (version >= 2) { assertTrue(Arrays.equals(new int[]{2}, partition.elr), partition.toString()); assertTrue(Arrays.equals(new int[]{3}, partition.lastKnownElr), partition.toString()); } else { @@ -983,7 +982,7 @@ void testKeepsDirectoriesAfterReassignment() { setPartitionEpoch(200). build(); Optional built = new PartitionChangeBuilder(registration, FOO_ID, - 0, r -> true, metadataVersionForDirAssignmentInfo(), 2). + 0, r -> true, MetadataVersion.IBP_3_7_IV2, 2). setTargetReplicas(Arrays.asList(3, 1, 4)).build(); Optional expected = Optional.of(new ApiMessageAndVersion( new PartitionChangeRecord(). @@ -996,7 +995,7 @@ void testKeepsDirectoriesAfterReassignment() { Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"), DirectoryId.UNASSIGNED )), - (short) 2 + (short) 1 )); assertEquals(expected, built); } @@ -1004,7 +1003,7 @@ void testKeepsDirectoriesAfterReassignment() { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEnabled) { - short version = 1; + short version = 2; PartitionRegistration partition = new PartitionRegistration.Builder() .setReplicas(new int[] {1, 2, 3, 4}) .setIsr(new int[] {1}) @@ -1018,7 +1017,8 @@ public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEn Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Make replica 1 offline. - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 1, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 1, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(isElrEnabled(version)) .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled); @@ -1045,7 +1045,7 @@ public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEn @ParameterizedTest @ValueSource(booleans = {true, false}) public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeaderEnabled) { - short version = 1; + short version = 2; PartitionRegistration partition = new PartitionRegistration.Builder() .setReplicas(new int[] {1, 2, 3, 4}) .setIsr(new int[] {1, 2, 3, 4}) @@ -1059,7 +1059,8 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); // Mark all the replicas offline. - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(true) .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled); @@ -1078,16 +1079,14 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade record.setLastKnownELR(Arrays.asList(1)); } - ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion( - record, - version - ); + ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version); assertEquals(Optional.of(expectedRecord), builder.build()); partition = partition.merge((PartitionChangeRecord) builder.build().get().message()); assertTrue(Arrays.equals(new int[]{1, 2, 3, 4}, partition.elr), partition.toString()); if (lastKnownLeaderEnabled) { assertTrue(Arrays.equals(new int[]{1}, partition.lastKnownElr), partition.toString()); - builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, metadataVersionForPartitionChangeRecordVersion(version), 3) + builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(true) .setUncleanShutdownReplicas(Arrays.asList(2)) @@ -1102,7 +1101,7 @@ public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeade @Test public void testEligibleLeaderReplicas_ElectLastKnownLeader() { - short version = 1; + short version = 2; PartitionRegistration partition = new PartitionRegistration.Builder() .setReplicas(new int[] {1, 2, 3, 4}) .setIsr(new int[] {}) @@ -1115,7 +1114,8 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeader() { .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> true, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> true, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setUseLastKnownLeaderInBalancedRecovery(true) .setEligibleLeaderReplicasEnabled(true); @@ -1141,7 +1141,7 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeader() { @Test public void testEligibleLeaderReplicas_ElectLastKnownLeaderShouldFail() { - short version = 1; + short version = 2; PartitionRegistration partition = new PartitionRegistration.Builder() .setReplicas(new int[] {1, 2, 3, 4}) .setIsr(new int[] {}) @@ -1154,7 +1154,8 @@ public void testEligibleLeaderReplicas_ElectLastKnownLeaderShouldFail() { .build(); Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); - PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3) + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, + metadataVersionForPartitionChangeRecordVersion(version), 3) .setElection(Election.PREFERRED) .setEligibleLeaderReplicasEnabled(true) .setUseLastKnownLeaderInBalancedRecovery(true); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index 1d3ed8bf00ec..9df8c35e2722 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -49,7 +49,7 @@ public void setUp() { featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(MetadataVersion.latest()). build(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index b1145b88981b..301f8b67f3f7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -191,7 +191,7 @@ public void testConfigurationOperations() throws Throwable { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testConfigurationOperations(controlEnv.activeController()); @@ -234,7 +234,7 @@ public void testDelayedConfigurationOperations() throws Throwable { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); @@ -576,7 +576,7 @@ public void testUnregisterBroker() throws Throwable { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). setListeners(listeners)); assertEquals(5L, reply.get().epoch()); CreateTopicsRequestData createTopicsRequestData = diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index cc7ed1067742..3ae8cc1ed0d7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -103,7 +103,7 @@ private QuorumControllerTestEnv( builder.setRaftClient(logEnv.logManagers().get(nodeId)); builder.setBootstrapMetadata(bootstrapMetadata); builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs); - builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultFeatureMap(), nodeIds)); + builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultFeatureMap(true), nodeIds)); sessionTimeoutMillis.ifPresent(timeout -> { builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)); }); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 10ebf9c574f6..1e0259e9661f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -49,6 +49,24 @@ public class QuorumFeaturesTest { QUORUM_FEATURES = new QuorumFeatures(0, LOCAL, Arrays.asList(0, 1, 2)); } + @Test + public void testDefaultFeatureMap() { + Map expectedFeatures = new HashMap<>(1); + expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( + MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), + MetadataVersion.LATEST_PRODUCTION.featureLevel())); + assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false)); + } + + @Test + public void testDefaultFeatureMapWithUnstable() { + Map expectedFeatures = new HashMap<>(1); + expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( + MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), + MetadataVersion.latest().featureLevel())); + assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true)); + } + @Test public void testLocalSupportedFeature() { assertEquals(VersionRange.of(0, 3), QUORUM_FEATURES.localSupportedFeature("foo")); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 8c48024a7640..8ecdafe65ea6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -224,7 +225,7 @@ private ReplicationControlTestContext( this.featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); @@ -1635,6 +1636,7 @@ public void testReassignPartitions(short version) throws Exception { setReplicas(asList(2, 1, 3)). setLeader(3). setRemovingReplicas(Collections.emptyList()). + setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))). setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())), new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( new ReassignableTopicResponse().setName("foo").setPartitions(asList( @@ -1986,6 +1988,7 @@ public void testCancelReassignPartitions() throws Exception { setLeader(4). setReplicas(asList(2, 3, 4)). setRemovingReplicas(null). + setDirectories(Arrays.asList(DirectoryId.unassignedArray(3))). setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())), new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( new ReassignableTopicResponse().setName("foo").setPartitions(asList( diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index eb29a94e74c0..b92a9f62221b 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -42,7 +42,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.stream.IntStream; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -50,15 +49,10 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; @Timeout(40) public class PartitionRegistrationTest { - private static Stream partitionRecordVersions() { - return IntStream.range(PartitionRecord.LOWEST_SUPPORTED_VERSION, PartitionRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version)); - } @Test public void testElectionWasClean() { assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2})); @@ -261,9 +255,17 @@ public void testBuilderSetsDefaultAddingAndRemovingReplicas() { assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas)); } + private static Stream metadataVersionsForTestPartitionRegistration() { + return Arrays.asList( + MetadataVersion.IBP_3_7_IV1, + MetadataVersion.IBP_3_7_IV2, + MetadataVersion.IBP_3_7_IV3 + ).stream().map(mv -> Arguments.of(mv)); + } + @ParameterizedTest - @MethodSource("partitionRecordVersions") - public void testPartitionRegistrationToRecord(short version) { + @MethodSource("metadataVersionsForTestPartitionRegistration") + public void testPartitionRegistrationToRecord(MetadataVersion metadataVersion) { PartitionRegistration.Builder builder = new PartitionRegistration.Builder(). setReplicas(new int[]{0, 1, 2, 3, 4}). setDirectories(new Uuid[]{ @@ -291,16 +293,12 @@ public void testPartitionRegistrationToRecord(short version) { setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()). setLeaderEpoch(0). setPartitionEpoch(0); - MetadataVersion metadataVersion = spy(MetadataVersion.latest()); - when(metadataVersion.partitionRecordVersion()).thenReturn(version); - if (version > 0) { + if (metadataVersion.isElrSupported()) { expectRecord. setEligibleLeaderReplicas(Arrays.asList(2, 3)). setLastKnownELR(Arrays.asList(4)); - } else { - when(metadataVersion.isElrSupported()).thenReturn(false); } - if (version > 1) { + if (metadataVersion.isDirectoryAssignmentSupported()) { expectRecord.setDirectories(Arrays.asList( DirectoryId.UNASSIGNED, Uuid.fromString("KBJBm9GVRAG9Ffe25odmmg"), @@ -308,15 +306,15 @@ public void testPartitionRegistrationToRecord(short version) { Uuid.fromString("7DZNT5qBS7yFF7VMMHS7kw"), Uuid.fromString("cJGPUZsMSEqbidOLYLOIXg") )); - when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true); } List exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). setMetadataVersion(metadataVersion). setLossHandler(exceptions::add). build(); - assertEquals(new ApiMessageAndVersion(expectRecord, version), partitionRegistration.toRecord(topicID, 0, options)); - if (version < 2) { + assertEquals(new ApiMessageAndVersion(expectRecord, metadataVersion.partitionRecordVersion()), + partitionRegistration.toRecord(topicID, 0, options)); + if (!metadataVersion.isDirectoryAssignmentSupported()) { assertTrue(exceptions.stream(). anyMatch(e -> e.getMessage().contains("the directory assignment state of one or more replicas"))); } @@ -342,13 +340,14 @@ public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() { setLeader(0). setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()). setLeaderEpoch(0). + setDirectories(Arrays.asList(DirectoryId.unassignedArray(5))). setPartitionEpoch(0); List exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.latest()). + setMetadataVersion(MetadataVersion.IBP_3_7_IV3). setLossHandler(exceptions::add). build(); - assertEquals(new ApiMessageAndVersion(expectRecord, (short) 1), partitionRegistration.toRecord(topicID, 0, options)); + assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options)); assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas)); assertTrue(exceptions.isEmpty()); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index eda9dc38ffa0..f6a57d1da99c 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -82,7 +82,7 @@ public class KRaftMigrationDriverTest { private final static QuorumFeatures QUORUM_FEATURES = new QuorumFeatures(4, - QuorumFeatures.defaultFeatureMap(), + QuorumFeatures.defaultFeatureMap(true), Arrays.asList(4, 5, 6)); static class MockControllerMetrics extends QuorumControllerMetrics { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 22b7517ff70b..1359af218ef3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -188,8 +188,14 @@ public enum MetadataVersion { // Implement KIP-919 controller registration. IBP_3_7_IV0(15, "3.7", "IV0", true), + // Reserved + IBP_3_7_IV1(16, "3.7", "IV1", false), + + // Add JBOD support for KRaft. + IBP_3_7_IV2(17, "3.7", "IV2", true), + // Add ELR related supports (KIP-966). - IBP_3_7_IV1(16, "3.7", "IV1", true); + IBP_3_7_IV3(18, "3.7", "IV3", true); // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version @@ -207,6 +213,22 @@ public enum MetadataVersion { */ public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = IBP_3_3_IV0; + /** + * The latest production-ready MetadataVersion. This is the latest version that is stable + * and cannot be changed. MetadataVersions later than this can be tested via junit, but + * not deployed in production. + * + * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, + * IT CANNOT BE CHANGED. + */ + public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0; + + /** + * An array containing all of the MetadataVersion entries. + * + * This is essentially a cached copy of MetadataVersion.values. Unlike that function, it doesn't + * allocate a new array each time. + */ public static final MetadataVersion[] VERSIONS; private final short featureLevel; @@ -289,12 +311,12 @@ public boolean isDelegationTokenSupported() { return this.isAtLeast(IBP_3_6_IV2); } - public boolean isElrSupported() { - return this.isAtLeast(IBP_3_7_IV1); + public boolean isDirectoryAssignmentSupported() { + return this.isAtLeast(IBP_3_7_IV2); } - public boolean isDirectoryAssignmentSupported() { - return false; // TODO: Bump IBP for JBOD support in KRaft + public boolean isElrSupported() { + return this.isAtLeast(IBP_3_7_IV3); } public boolean isKRaftSupported() { @@ -351,9 +373,9 @@ public boolean isControllerRegistrationSupported() { } public short partitionChangeRecordVersion() { - if (isDirectoryAssignmentSupported()) { + if (isElrSupported()) { return (short) 2; - } else if (isElrSupported()) { + } else if (isDirectoryAssignmentSupported()) { return (short) 1; } else { return (short) 0; @@ -361,9 +383,9 @@ public short partitionChangeRecordVersion() { } public short partitionRecordVersion() { - if (isDirectoryAssignmentSupported()) { + if (isElrSupported()) { return (short) 2; - } else if (isElrSupported()) { + } else if (isDirectoryAssignmentSupported()) { return (short) 1; } else { return (short) 0; @@ -467,19 +489,24 @@ public short offsetCommitValueVersion(boolean expireTimestampMs) { } private static final Map IBP_VERSIONS; + static { - { - MetadataVersion[] enumValues = MetadataVersion.values(); - VERSIONS = Arrays.copyOf(enumValues, enumValues.length); + MetadataVersion[] enumValues = MetadataVersion.values(); + VERSIONS = Arrays.copyOf(enumValues, enumValues.length); - IBP_VERSIONS = new HashMap<>(); - Map maxInterVersion = new HashMap<>(); - for (MetadataVersion metadataVersion : VERSIONS) { + IBP_VERSIONS = new HashMap<>(); + Map maxInterVersion = new HashMap<>(); + for (MetadataVersion metadataVersion : VERSIONS) { + if (metadataVersion.isProduction()) { maxInterVersion.put(metadataVersion.release, metadataVersion); - IBP_VERSIONS.put(metadataVersion.ibpVersion, metadataVersion); } - IBP_VERSIONS.putAll(maxInterVersion); + IBP_VERSIONS.put(metadataVersion.ibpVersion, metadataVersion); } + IBP_VERSIONS.putAll(maxInterVersion); + } + + public boolean isProduction() { + return this.compareTo(MetadataVersion.LATEST_PRODUCTION) <= 0; } public String shortVersion() { diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 2ed6e6cdb31f..7a7ed1b3a990 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -165,8 +165,13 @@ public void testFromVersionString() { assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1")); assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2")); + // 3.7-IV0 is the latest production version in the 3.7 line + assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7")); + assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0")); assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1")); + assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2")); + assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3")); } @Test @@ -268,6 +273,8 @@ public void testVersion() { assertEquals("3.6-IV2", IBP_3_6_IV2.version()); assertEquals("3.7-IV0", IBP_3_7_IV0.version()); assertEquals("3.7-IV1", IBP_3_7_IV1.version()); + assertEquals("3.7-IV2", IBP_3_7_IV2.version()); + assertEquals("3.7-IV3", IBP_3_7_IV3.version()); } @Test @@ -326,22 +333,53 @@ public void testIsDelegationTokenSupported(MetadataVersion metadataVersion) { metadataVersion.isDelegationTokenSupported()); } + @ParameterizedTest + @EnumSource(value = MetadataVersion.class) + public void testDirectoryAssignmentSupported(MetadataVersion metadataVersion) { + assertEquals(metadataVersion.isAtLeast(IBP_3_7_IV2), metadataVersion.isDirectoryAssignmentSupported()); + } + @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testIsElrSupported(MetadataVersion metadataVersion) { - assertEquals(metadataVersion.equals(IBP_3_7_IV1), - metadataVersion.isElrSupported()); - short expectPartitionRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; - assertEquals(expectPartitionRecordVersion, metadataVersion.partitionRecordVersion()); - short expectPartitionChangeRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0; - assertEquals(expectPartitionChangeRecordVersion, metadataVersion.partitionChangeRecordVersion()); + assertEquals(metadataVersion.isAtLeast(IBP_3_7_IV3), metadataVersion.isElrSupported()); + } + + @ParameterizedTest + @EnumSource(value = MetadataVersion.class) + public void testPartitionRecordVersion(MetadataVersion metadataVersion) { + final short expectedVersion; + if (metadataVersion.isElrSupported()) { + expectedVersion = (short) 2; + } else if (metadataVersion.isDirectoryAssignmentSupported()) { + expectedVersion = (short) 1; + } else { + expectedVersion = (short) 0; + } + assertEquals(expectedVersion, metadataVersion.partitionRecordVersion()); + } + + @ParameterizedTest + @EnumSource(value = MetadataVersion.class) + public void testPartitionChangeRecordVersion(MetadataVersion metadataVersion) { + final short expectedVersion; + if (metadataVersion.isElrSupported()) { + expectedVersion = (short) 2; + } else if (metadataVersion.isDirectoryAssignmentSupported()) { + expectedVersion = (short) 1; + } else { + expectedVersion = (short) 0; + } + assertEquals(expectedVersion, metadataVersion.partitionChangeRecordVersion()); } @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) { final short expectedVersion; - if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_4_IV0)) { + if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) { + expectedVersion = 3; + } else if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_4_IV0)) { expectedVersion = 2; } else if (metadataVersion.isAtLeast(IBP_3_3_IV3)) { expectedVersion = 1; @@ -386,4 +424,21 @@ public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) { public void testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion metadataVersion) { assertEquals((short) 1, metadataVersion.offsetCommitValueVersion(true)); } + + @Test + public void assertLatestProductionIsLessThanLatest() { + assertTrue(LATEST_PRODUCTION.ordinal() < MetadataVersion.latest().ordinal(), + "Expected LATEST_PRODUCTION " + LATEST_PRODUCTION + + " to be less than the latest of " + MetadataVersion.latest()); + } + + @Test + public void assertLatestProductionIsProduction() { + assertTrue(LATEST_PRODUCTION.isProduction()); + } + + @Test + public void assertLatestIsNotProduction() { + assertFalse(MetadataVersion.latest().isProduction()); + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 6e38af8956ef..d416ba6f034d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -68,7 +68,7 @@ public void testDescribeWithKRaft() { ); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 3.7-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); } @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0) @@ -78,7 +78,7 @@ public void testDescribeWithKRaftAndBootstrapControllers() { ); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 3.7-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput)); } @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -137,7 +137,7 @@ public void testDowngradeMetadataVersionWithKRaft() { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 3000 only supports versions 1-16", commandOutput); + "metadata.version. Local controller 3000 only supports versions 1-18", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),