Skip to content

Commit

Permalink
KAFKA-15922: Add a MetadataVersion for JBOD (apache#14860)
Browse files Browse the repository at this point in the history
Assign MetadataVersion.IBP_3_7_IV2 to JBOD.

Move KIP-966 support to MetadataVersion.IBP_3_7_IV3.

Create MetadataVersion.LATEST_PRODUCTION as the latest metadata version that can be used when formatting a
new cluster, or upgrading a cluster using kafka-features.sh. This will allow us to clearly distinguish between stable
and unstable metadata versions for the first time.

Reviewers: Igor Soarez <[email protected]>, Ron Dagostino <[email protected]>, Calvin Liu <[email protected]>, Proven Provenzano <[email protected]>
  • Loading branch information
cmccabe authored Nov 30, 2023
1 parent a35e021 commit a94bc8d
Show file tree
Hide file tree
Showing 30 changed files with 339 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public short oldestAllowedVersion() {

@Override
public BrokerRegistrationRequest build(short version) {
if (version < 2) {
// Reset the PreviousBrokerEpoch to default if not supported.
data.setPreviousBrokerEpoch(new BrokerRegistrationRequestData().previousBrokerEpoch());
}
return new BrokerRegistrationRequest(data, version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.

// Version 2 adds the PreviousBrokerEpoch for the KIP-966
// Version 2 adds LogDirs for KIP-858

// Version 3 adds LogDirs for KIP-858
// Version 3 adds the PreviousBrokerEpoch for the KIP-966
{
"apiKey":62,
"type": "request",
Expand Down Expand Up @@ -58,9 +58,9 @@
"about": "The rack which this broker is in." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "If the required configurations for ZK migration are present, this value is set to true" },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
"about": "The epoch before a clean shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+",
"about": "Log directories configured in this broker which are available." }
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are available." },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+", "default": "-1", "ignorable": true,
"about": "The epoch before a clean shutdown." }
]
}
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka

import java.util.Properties

import joptsimple.OptionParser
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class ControllerMigrationSupport(
class ControllerServer(
val sharedServer: SharedServer,
val configSchema: KafkaConfigSchema,
val bootstrapMetadata: BootstrapMetadata,
val bootstrapMetadata: BootstrapMetadata
) extends Logging {

import kafka.server.Server._
Expand Down Expand Up @@ -216,7 +216,7 @@ class ControllerServer(
startupDeadline, time)
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections)
val quorumFeatures = new QuorumFeatures(config.nodeId,
QuorumFeatures.defaultFeatureMap(),
QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled),
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)

val delegationTokenKeyString = {
Expand Down Expand Up @@ -347,7 +347,7 @@ class ControllerServer(
clusterId,
time,
s"controller-${config.nodeId}-",
QuorumFeatures.defaultFeatureMap(),
QuorumFeatures.defaultFeatureMap(config.unstableMetadataVersionsEnabled),
config.migrationEnabled,
incarnationId,
listenerInfo)
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ object KafkaConfig {

/** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"

/* Documentation */
/** ********* Zookeeper Configuration ***********/
Expand Down Expand Up @@ -1532,8 +1533,10 @@ object KafkaConfig {
.define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC)

/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this broker.
.defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, LOW)
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions should be enabled on this node.
.defineInternal(UnstableMetadataVersionsEnableProp, BOOLEAN, false, HIGH)
}

/** ********* Remote Log Management Configuration *********/
Expand Down Expand Up @@ -2113,6 +2116,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(KafkaConfig.UnstableApiVersionsEnableProp)
val unstableMetadataVersionsEnabled = getBoolean(KafkaConfig.UnstableMetadataVersionsEnableProp)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,18 @@ object StorageTool extends Logging {
case "format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString))
val metadataVersion = getMetadataVersion(namespace,
Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
if (!metadataVersion.isProduction()) {
if (config.get.unstableMetadataVersionsEnabled) {
System.out.println(s"WARNING: using pre-production metadata version ${metadataVersion}.")
} else {
throw new TerseFailure(s"Metadata version ${metadataVersion} is not ready for production use yet.")
}
}
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId).
Expand Down Expand Up @@ -131,7 +139,7 @@ object StorageTool extends Logging {
action(storeTrue())
formatParser.addArgument("--release-version", "-r").
action(store()).
help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}")

parser.parseArgsOrFail(args)
}
Expand All @@ -151,7 +159,7 @@ object StorageTool extends Logging {
): MetadataVersion = {
val defaultValue = defaultVersionString match {
case Some(versionString) => MetadataVersion.fromVersionString(versionString)
case None => MetadataVersion.latest()
case None => MetadataVersion.LATEST_PRODUCTION
}

Option(namespace.getString("release_version"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ public void testNoAutoStart() {

@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_7_IV1, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_7_IV3, config.metadataVersion());
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/annotation/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV1;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV3;
ClusterConfigProperty[] serverProperties() default {};
}
1 change: 1 addition & 0 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
if (brokerNode != null) {
props.putAll(brokerNode.propertyOverrides());
}
props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true");
return new KafkaConfig(props, false, Option.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ abstract class QuorumTestHarness extends Logging {
val props = propsList(0)
props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString)
props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
props.setProperty(KafkaConfig.UnstableMetadataVersionsEnableProp, "true")
if (props.getProperty(KafkaConfig.NodeIdProp) == null) {
props.setProperty(KafkaConfig.NodeIdProp, "1000")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ object ZkMigrationIntegrationTest {
MetadataVersion.IBP_3_5_IV2,
MetadataVersion.IBP_3_6_IV2,
MetadataVersion.IBP_3_7_IV0,
MetadataVersion.IBP_3_7_IV1
MetadataVersion.IBP_3_7_IV1,
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_7_IV3
).foreach { mv =>
val clusterConfig = ClusterConfig.defaultClusterBuilder()
.metadataVersion(mv)
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ class PartitionTest extends AbstractPartitionTest {
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV1,
interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV2,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
Expand Down
47 changes: 45 additions & 2 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -203,8 +205,8 @@ Found problem:
def testDefaultMetadataVersion(): Unit = {
val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = None)
assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be the latest version")
assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), mv.featureLevel(),
"Expected the default metadata.version to be the latest production version")
}

@Test
Expand Down Expand Up @@ -390,4 +392,45 @@ Found problem:
assertFalse(DirectoryId.reserved(metaProps.directoryId().get()))
} finally Utils.delete(tempDir)
}

@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testFormattingUnstableMetadataVersionBlocked(enableUnstable: Boolean): Unit = {
var exitString: String = ""
var exitStatus: Int = 1
def exitProcedure(status: Int, message: Option[String]) : Nothing = {
exitStatus = status
exitString = message.getOrElse("")
throw new StorageToolTestException(exitString)
}
Exit.setExitProcedure(exitProcedure)
val properties = newSelfManagedProperties()
val propsFile = TestUtils.tempFile()
val propsStream = Files.newOutputStream(propsFile.toPath)
try {
properties.setProperty(KafkaConfig.LogDirsProp, TestUtils.tempDir().toString)
properties.setProperty(KafkaConfig.UnstableMetadataVersionsEnableProp, enableUnstable.toString)
properties.store(propsStream, "config.props")
} finally {
propsStream.close()
}
val args = Array("format", "-c", s"${propsFile.toPath}",
"-t", "XcZZOzUqS4yHOjhMQB6JLQ",
"--release-version", MetadataVersion.latest().toString)
try {
StorageTool.main(args)
} catch {
case _: StorageToolTestException =>
} finally {
Exit.resetExitProcedure()
}
if (enableUnstable) {
assertEquals("", exitString)
assertEquals(0, exitStatus)
} else {
assertEquals(s"Metadata version ${MetadataVersion.latest().toString} is not ready for " +
"production use yet.", exitString)
assertEquals(1, exitStatus)
}
}
}
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ object TestUtils extends Logging {
}.mkString(",")

val props = new Properties
props.put(KafkaConfig.UnstableMetadataVersionsEnableProp, "true")
if (zkConnect == null) {
props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString)
props.put(KafkaConfig.NodeIdProp, nodeId.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ static public Optional<String> reasonNotSupported(
return Optional.empty();
}

public static Map<String, VersionRange> defaultFeatureMap() {
public static Map<String, VersionRange> defaultFeatureMap(boolean enableUnstable) {
Map<String, VersionRange> features = new HashMap<>(1);
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.latest().featureLevel()));
enableUnstable ?
MetadataVersion.latest().featureLevel() :
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
return features;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
// Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
// Version 2 adds Directories for KIP-858
// Version 1 adds Directories for KIP-858.
// Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
Expand All @@ -43,14 +43,14 @@
"about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
{ "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5,
"about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." },
{ "name": "Directories", "type": "[]uuid", "default": "null",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 8,
"about": "null if the log dirs didn't change; the new log directory for each replica otherwise."},
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 6,
"about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
"about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." },
{ "name": "Directories", "type": "[]uuid", "default": "null",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 8,
"about": "null if the log dirs didn't change; the new log directory for each replica otherwise."}
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 7,
"about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
]
}
14 changes: 7 additions & 7 deletions metadata/src/main/resources/common/metadata/PartitionRecord.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
// Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
// Version 2 adds Directories for KIP-858
// Version 1 adds Directories for KIP-858
// Version 2 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
Expand All @@ -42,13 +42,13 @@
"about": "The epoch of the partition leader." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change anything in the partition." },
{ "name": "Directories", "type": "[]uuid", "versions": "1+",
"about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."},
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1,
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 1,
"about": "The eligible leader replicas of this partition." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." },
{ "name": "Directories", "type": "[]uuid", "versions": "2+",
"about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." }
]
}
Loading

0 comments on commit a94bc8d

Please sign in to comment.