Skip to content

Commit

Permalink
MINOR: Remove Deprecated Scala Procedure Syntax (apache#7214)
Browse files Browse the repository at this point in the history
Reviewers: Colin P. McCabe <[email protected]>, Stanislav Kozlovski <[email protected]>
  • Loading branch information
jolshan authored and cmccabe committed Aug 16, 2019
1 parent e2d16b5 commit 33d0608
Show file tree
Hide file tree
Showing 265 changed files with 1,902 additions and 1,902 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object AclCommand extends Logging {

private val Newline = scala.util.Properties.lineSeparator

def main(args: Array[String]) {
def main(args: Array[String]): Unit = {

val opts = new AclCommandOptions(args)

Expand Down Expand Up @@ -80,7 +80,7 @@ object AclCommand extends Logging {

class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit) {
private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
Expand Down Expand Up @@ -185,7 +185,7 @@ object AclCommand extends Logging {

class AuthorizerService(val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAuthorizer()(f: Authorizer => Unit) {
private def withAuthorizer()(f: Authorizer => Unit): Unit = {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
val authorizerProperties =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
Expand Down Expand Up @@ -258,7 +258,7 @@ object AclCommand extends Logging {
}
}

private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) {
private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter): Unit = {
getAcls(authorizer, filter)
.keys
.foreach(resource =>
Expand Down Expand Up @@ -573,7 +573,7 @@ object AclCommand extends Logging {

options = parser.parse(args: _*)

def checkArgs() {
def checkArgs(): Unit = {
if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer must be specified")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object BrokerApiVersionsCommand {
options = parser.parse(args : _*)
checkArgs()

def checkArgs() {
def checkArgs(): Unit = {
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to retrieve broker version information.")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
Expand All @@ -111,7 +111,7 @@ object BrokerApiVersionsCommand {
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()

val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
override def run() {
override def run(): Unit = {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
Expand Down Expand Up @@ -169,7 +169,7 @@ object BrokerApiVersionsCommand {
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers() {
def awaitBrokers(): Unit = {
var nodes = List[Node]()
do {
nodes = findAllBrokers()
Expand All @@ -192,7 +192,7 @@ object BrokerApiVersionsCommand {
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
}.toMap

def close() {
def close(): Unit = {
running = false
try {
client.close()
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object ConfigCommand extends Config {
}
}

private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entity = parseEntity(opts)
Expand Down Expand Up @@ -157,7 +157,7 @@ object ConfigCommand extends Config {
println(s"Completed Updating config for entity: $entity.")
}

private def preProcessScramCredentials(configsToBeAdded: Properties) {
private def preProcessScramCredentials(configsToBeAdded: Properties): Unit = {
def scramCredential(mechanism: ScramMechanism, credentialStr: String): String = {
val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
val (iterations, password) = credentialStr match {
Expand Down Expand Up @@ -194,7 +194,7 @@ object ConfigCommand extends Config {
* Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
* The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
*/
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) {
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean): Unit = {
val passwordEncoderConfigs = new Properties
passwordEncoderConfigs ++= configsToBeAdded.asScala.filter { case (key, _) => key.startsWith("password.encoder.") }
if (!passwordEncoderConfigs.isEmpty) {
Expand All @@ -221,7 +221,7 @@ object ConfigCommand extends Config {
}
}

private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
val entities = configEntity.getAllEntities(zkClient)
Expand Down Expand Up @@ -294,7 +294,7 @@ object ConfigCommand extends Config {
}

private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
entityType: String, entityName: String) {
entityType: String, entityName: String): Unit = {
val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)

Expand Down Expand Up @@ -339,7 +339,7 @@ object ConfigCommand extends Config {
}

private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
entityType: String, entityName: String) {
entityType: String, entityName: String): Unit = {
val configs = if (entityType == ConfigType.Broker)
brokerConfig(adminClient, entityName, includeSynonyms = true)
else // broker logger
Expand Down Expand Up @@ -537,7 +537,7 @@ object ConfigCommand extends Config {

val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)

def checkArgs() {
def checkArgs(): Unit = {
// should have exactly one action
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if(actions != 1)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import scala.reflect.ClassTag

object ConsumerGroupCommand extends Logging {

def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
val opts = new ConsumerGroupCommandOptions(args)

CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
Expand Down Expand Up @@ -522,7 +522,7 @@ object ConsumerGroupCommand extends Logging {
successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)
}

def close() {
def close(): Unit = {
adminClient.close()
consumers.values.foreach(consumer =>
Option(consumer).foreach(_.close())
Expand Down Expand Up @@ -922,7 +922,7 @@ object ConsumerGroupCommand extends Logging {
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)

def checkArgs() {
def checkArgs(): Unit = {

CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object DelegationTokenCommand extends Logging {

options = parser.parse(args : _*)

def checkArgs() {
def checkArgs(): Unit = {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}

def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) {
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]): Unit = {
try {
zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
Expand Down Expand Up @@ -158,8 +158,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
/** Elect the preferred leader for the given {@code partitionsForElection}.
* If the given {@code partitionsForElection} are None then elect the preferred leader for all partitions.
*/
def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]) : Unit
def close() : Unit
def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]): Unit
def close(): Unit
}

class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int)
Expand All @@ -169,7 +169,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, Int.MaxValue, time)

override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]) {
override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
try {
val topics =
partitionsFromUser match {
Expand Down Expand Up @@ -283,7 +283,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}

class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {
def moveLeaderToPreferredReplica() = {
def moveLeaderToPreferredReplica(): Unit = {
try {
val topics = partitionsFromUser.map(_.topic).toSet
val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap { case (topic, partitions) =>
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object ReassignPartitionsCommand extends Logging {
}
}

def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions) {
def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions): Unit = {
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
verifyAssignment(zkClient, adminClientOpt, jsonString)
Expand Down Expand Up @@ -160,7 +160,7 @@ object ReassignPartitionsCommand extends Logging {
}
}

def generateAssignment(zkClient: KafkaZkClient, opts: ReassignPartitionsCommandOptions) {
def generateAssignment(zkClient: KafkaZkClient, opts: ReassignPartitionsCommandOptions): Unit = {
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
Expand Down Expand Up @@ -196,7 +196,7 @@ object ReassignPartitionsCommand extends Logging {
(partitionsToBeReassigned, currentAssignment)
}

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions) {
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], opts: ReassignPartitionsCommandOptions): Unit = {
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val interBrokerThrottle = opts.options.valueOf(opts.interBrokerThrottleOpt)
Expand All @@ -205,7 +205,7 @@ object ReassignPartitionsCommand extends Logging {
executeAssignment(zkClient, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
}

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit = {
val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
val adminZkClient = new AdminZkClient(zkClient)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
Expand Down Expand Up @@ -531,7 +531,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
* Limit the throttle on currently moving replicas. Note that this command can use used to alter the throttle, but
* it may not alter all limits originally set, if some of the brokers have completed their rebalance.
*/
def maybeLimit(throttle: Throttle) {
def maybeLimit(throttle: Throttle): Unit = {
if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0) {
val existingBrokers = existingAssignment().values.flatten.toSeq
val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq ++ proposedReplicaAssignment.keys.toSeq.map(_.brokerId())
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ object TopicCommand extends Logging {
"collide. To avoid issues it is best to use either, but not both.")
createTopic(topic)
}
def createTopic(topic: CommandTopicPartition)
def listTopics(opts: TopicCommandOptions)
def alterTopic(opts: TopicCommandOptions)
def describeTopic(opts: TopicCommandOptions)
def deleteTopic(opts: TopicCommandOptions)
def createTopic(topic: CommandTopicPartition): Unit
def listTopics(opts: TopicCommandOptions): Unit
def alterTopic(opts: TopicCommandOptions): Unit
def describeTopic(opts: TopicCommandOptions): Unit
def deleteTopic(opts: TopicCommandOptions): Unit
def getTopics(topicWhitelist: Option[String], excludeInternalTopics: Boolean = false): Seq[String]
}

Expand Down Expand Up @@ -649,7 +649,7 @@ object TopicCommand extends Logging {
def topicConfig: Option[util.List[String]] = valuesAsOption(configOpt)
def configsToDelete: Option[util.List[String]] = valuesAsOption(deleteConfigOpt)

def checkArgs() {
def checkArgs(): Unit = {
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, or change a topic.")

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ZkSecurityMigrator extends Logging {
+ "znodes as part of the process of setting up ZooKeeper "
+ "authentication.")

def run(args: Array[String]) {
def run(args: Array[String]): Unit = {
val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val opts = new ZkSecurityMigratorOptions(args)

Expand Down Expand Up @@ -99,7 +99,7 @@ object ZkSecurityMigrator extends Logging {
migrator.run()
}

def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
try {
run(args)
} catch {
Expand Down Expand Up @@ -159,7 +159,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
def processResult(rc: Int,
path: String,
ctx: Object,
children: java.util.List[String]) {
children: java.util.List[String]): Unit = {
val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
Expand Down Expand Up @@ -193,7 +193,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
def processResult(rc: Int,
path: String,
ctx: Object,
stat: Stat) {
stat: Stat): Unit = {
val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
val promise = ctx.asInstanceOf[Promise[String]]

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/ApiUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object ApiUtils {
* @param buffer The buffer to write to
* @param string The string to write
*/
def writeShortString(buffer: ByteBuffer, string: String) {
def writeShortString(buffer: ByteBuffer, string: String): Unit = {
if(string == null) {
buffer.putShort(-1)
} else {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class Partition(val topicPartition: TopicPartition,
}
}

def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
def removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit = {
inWriteLock(leaderIsrUpdateLock) {
futureLog = None
if (deleteFromLogDir)
Expand Down Expand Up @@ -450,7 +450,7 @@ class Partition(val topicPartition: TopicPartition,
} else false
}

def delete() {
def delete(): Unit = {
// need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
inWriteLock(leaderIsrUpdateLock) {
remoteReplicasMap.clear()
Expand Down Expand Up @@ -1076,7 +1076,7 @@ class Partition(val topicPartition: TopicPartition,
* @param offset offset to be used for truncation
* @param isFuture True iff the truncation should be performed on the future log of this partition
*/
def truncateTo(offset: Long, isFuture: Boolean) {
def truncateTo(offset: Long, isFuture: Boolean): Unit = {
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
Expand All @@ -1090,7 +1090,7 @@ class Partition(val topicPartition: TopicPartition,
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future log of this partition
*/
def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean) {
def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit = {
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/common/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException

trait Config extends Logging {

def validateChars(prop: String, value: String) {
def validateChars(prop: String, value: String): Unit = {
val legalChars = "[a-zA-Z0-9\\._\\-]"
val rgx = new Regex(legalChars + "*")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class InterBrokerSendThread(name: String,
awaitShutdown()
}

override def doWork() {
override def doWork(): Unit = {
var now = time.milliseconds()

generateRequests().foreach { request =>
Expand Down
Loading

0 comments on commit 33d0608

Please sign in to comment.