Skip to content

Commit

Permalink
KAFKA-1291 Add wrapper scripts and usage information to each command.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Jun 19, 2014
1 parent d0c019a commit 6b0ae4b
Show file tree
Hide file tree
Showing 36 changed files with 344 additions and 166 deletions.
17 changes: 17 additions & 0 deletions bin/kafka-consumer-offset-checker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker $@
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi

exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@
exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker $@
17 changes: 17 additions & 0 deletions bin/kafka-replica-verification.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool $@
17 changes: 17 additions & 0 deletions bin/windows/kafka-consumer-offset-checker.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.tools.ConsumerOffsetChecker %*
20 changes: 20 additions & 0 deletions bin/windows/kafka-consumer-perf-test.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
%~dp0kafka-run-class.bat kafka.tools.ConsumerPerformance %*
EndLocal
17 changes: 17 additions & 0 deletions bin/windows/kafka-mirror-maker.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.tools.MirrorMaker %*
17 changes: 17 additions & 0 deletions bin/windows/kafka-preferred-replica-election.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.admin.PreferredReplicaLeaderElectionCommand %*
20 changes: 20 additions & 0 deletions bin/windows/kafka-producer-perf-test.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %*
EndLocal
17 changes: 17 additions & 0 deletions bin/windows/kafka-reassign-partitions.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.admin.ReassignPartitionsCommand %*
17 changes: 17 additions & 0 deletions bin/windows/kafka-replay-log-producer.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.tools.ReplayLogProducer %*
17 changes: 17 additions & 0 deletions bin/windows/kafka-replica-verification.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.tools.ReplicaVerificationTool %*
17 changes: 17 additions & 0 deletions bin/windows/kafka-simple-consumer-shell.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.tools.SimpleConsumerShell %*
22 changes: 22 additions & 0 deletions bin/windows/zookeeper-shell.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

IF [%1] EQU [] (
echo USAGE: %0 zookeeper_host:port[/path] [args...]
EXIT /B 1
)

%~dp0kafka-run-class.bat org.apache.zookeeper.ZooKeeperMain -server %*
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])

if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," +
" it can be used to balance leadership among the servers.")

val options = parser.parse(args : _*)

Expand Down
28 changes: 11 additions & 17 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ object ReassignPartitionsCommand extends Logging {

// should have exactly one action
val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
if(actions != 1) {
opts.parser.printHelpOn(System.err)
Utils.croak("Command must include exactly one action: --generate, --execute or --verify")
}
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify")

CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)

Expand All @@ -58,10 +56,8 @@ object ReassignPartitionsCommand extends Logging {
}

def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
opts.parser.printHelpOn(System.err)
Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
}
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
Expand All @@ -81,10 +77,8 @@ object ReassignPartitionsCommand extends Logging {
}

def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) {
opts.parser.printHelpOn(System.err)
Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
}
if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
Expand All @@ -105,11 +99,8 @@ object ReassignPartitionsCommand extends Logging {
}

def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt)) {
opts.parser.printHelpOn(System.err)
Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " +
"during the --generate option")
}
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
Expand Down Expand Up @@ -185,6 +176,9 @@ object ReassignPartitionsCommand extends Logging {
.withRequiredArg
.describedAs("brokerlist")
.ofType(classOf[String])

if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")

val options = parser.parse(args : _*)
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ object TopicCommand {

val opts = new TopicCommandOptions(args)

if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")

// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1) {
System.err.println("Command must include exactly one action: --list, --describe, --create, --alter or --delete")
opts.parser.printHelpOn(System.err)
System.exit(1)
}
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")

opts.checkArgs()

Expand Down
26 changes: 6 additions & 20 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ object ConsoleConsumer extends Logging {
.describedAs("metrics dictory")
.ofType(classOf[java.lang.String])

if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")

var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1) {
error("Exactly one of whitelist/blacklist/topic is required.")
parser.printHelpOn(System.err)
System.exit(1)
}
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
val topicArg = options.valueOf(topicOrFilterOpt.head)
val filterSpec = if (options.has(blacklistOpt))
new Blacklist(topicArg)
Expand Down Expand Up @@ -144,7 +144,7 @@ object ConsoleConsumer extends Logging {
val config = new ConsumerConfig(consumerProps)
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val connector = Consumer.create(config)

Expand Down Expand Up @@ -217,20 +217,6 @@ object ConsoleConsumer extends Logging {
}
}

object MessageFormatter {
def tryParseFormatterArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid parser arguments: " + args.mkString(" "))
System.exit(1)
}
val props = new Properties
for(a <- splits)
props.put(a(0), a(1))
props
}
}

trait MessageFormatter {
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
Expand Down
Loading

0 comments on commit 6b0ae4b

Please sign in to comment.