Skip to content

Commit

Permalink
kafka-896; merge 0.8 (988d4d8) to trunk; patched by Jun Rao; reviewed…
Browse files Browse the repository at this point in the history
… by Jay Kreps
  • Loading branch information
junrao committed Jul 8, 2013
2 parents 731ba90 + 988d4d8 commit c98bdd3
Show file tree
Hide file tree
Showing 106 changed files with 1,198 additions and 936 deletions.
36 changes: 10 additions & 26 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,26 @@ fi

base_dir=$(dirname $0)/..

SCALA_VERSION=2.8.0

USER_HOME=$(eval echo ~${USER})
ivyPath=$(echo "$USER_HOME/.ivy2/cache")

snappy=$(echo "$ivyPath/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar")
CLASSPATH=$CLASSPATH:$snappy

library=$(echo "$ivyPath/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar")
CLASSPATH=$CLASSPATH:$library

compiler=~$(echo "$ivyPath/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar")
CLASSPATH=$CLASSPATH:$compiler

log4j=$(echo "$ivyPath/log4j/log4j/jars/log4j-1.2.15.jar")
CLASSPATH=$CLASSPATH:$log4j

slf=$(echo "$ivyPath/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")
CLASSPATH=$CLASSPATH:$slf

zookeeper=$(echo "$ivyPath/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar")
CLASSPATH=$CLASSPATH:$zookeeper

jopt=$(echo "$ivyPath/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar")
CLASSPATH=$CLASSPATH:$jopt

# assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
for file in $base_dir/core/target/scala-2.8.0/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/core/lib/*.jar;
for file in $base_dir/perf/target/scala-${SCALA_VERSION}/kafka*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

# classpath addition for release
for file in $base_dir/libs/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/perf/target/scala-2.8.0/kafka*.jar;
for file in $base_dir/kafka*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
Expand Down
6 changes: 3 additions & 3 deletions config/consumer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details

# zk connection string
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zk.connect=127.0.0.1:2181
zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper
zk.connection.timeout.ms=1000000
zookeeper.connection.timeout.ms=1000000

#consumer group id
group.id=test-consumer-group
Expand Down
4 changes: 2 additions & 2 deletions config/producer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# list of brokers used for bootstrapping
# format: host1:port1,host2:port2 ...
broker.list=localhost:9092
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
Expand All @@ -31,7 +31,7 @@ producer.type=sync
compression.codec=none

# message encoder
serializer.class=kafka.serializer.StringEncoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=
Expand Down
6 changes: 3 additions & 3 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zk connection string (see zk docs for details).
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zk.connect=localhost:2181
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zk.connection.timeout.ms=1000000
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public DataGenerator(String id, Props props) throws Exception {

System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
Expand Down
2 changes: 1 addition & 1 deletion contrib/hadoop-producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ compression codec, one would add the "kafka.output.compression.codec" parameter
compression).

For easier debugging, the above values as well as the Kafka broker information
(kafka.broker.list), the topic (kafka.output.topic), and the schema
(kafka.metadata.broker.list), the topic (kafka.output.topic), and the schema
(kafka.output.schema) are injected into the job's configuration. By default,
the Hadoop producer uses Kafka's sync producer as asynchronous operation
doesn't make sense in the batch Hadoop case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOE
// URL: kafka://<kafka host>/<topic>
// e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
String brokerList = uri.getAuthority();
props.setProperty("broker.list", brokerList);
job.set(KAFKA_CONFIG_PREFIX + ".broker.list", brokerList);
props.setProperty("metadata.broker.list", brokerList);
job.set(KAFKA_CONFIG_PREFIX + ".metadata.broker.list", brokerList);

if (uri.getPath() == null || uri.getPath().length() <= 1)
throw new KafkaException("no topic specified in kafka uri");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.bridge.hadoop;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
Expand Down Expand Up @@ -62,7 +63,9 @@ public void write(K key, V value) throws IOException, InterruptedException
if (value instanceof byte[])
valBytes = (byte[]) value;
else if (value instanceof BytesWritable)
valBytes = ((BytesWritable) value).getBytes();
// BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the
// intended size of the byte array contained. We need to use BytesWritable.getLength for the true size.
valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength());
else
throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");

Expand Down
6 changes: 5 additions & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sbt._
import Keys._
import AssemblyKeys._

name := "kafka"

Expand All @@ -11,8 +12,10 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ )

libraryDependencies ++= Seq(
"org.apache.zookeeper" % "zookeeper" % "3.3.4",
"com.github.sgroschupf" % "zkclient" % "0.1",
"com.101tec" % "zkclient" % "0.2",
"org.xerial.snappy" % "snappy-java" % "1.0.4.1",
"com.yammer.metrics" % "metrics-core" % "2.2.0",
"com.yammer.metrics" % "metrics-annotation" % "2.2.0",
"org.easymock" % "easymock" % "3.0" % "test",
"junit" % "junit" % "4.1" % "test"
)
Expand All @@ -24,4 +27,5 @@ libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
})
}

assemblySettings

Binary file removed core/lib/metrics-annotation-3.0.0-c0c8be71.jar
Binary file not shown.
Binary file removed core/lib/metrics-core-3.0.0-c0c8be71.jar
Binary file not shown.
Binary file removed core/lib/zkclient-20120522.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object AdminUtils extends Logging {
try {
Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
} catch {
case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
Expand All @@ -233,7 +233,7 @@ object AdminUtils extends Logging {
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e =>
error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
Expand Down
90 changes: 0 additions & 90 deletions core/src/main/scala/kafka/admin/ListTopicCommand.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
if (!options.has(jsonFileOpt))
ZkUtils.getAllPartitions(zkClient)
else
parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)

preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
Expand All @@ -69,7 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}

def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
Json.parseFull(jsonString) match {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
Expand Down Expand Up @@ -102,7 +102,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
} catch {
case nee: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
case e2 => throw new AdminOperationException(e2.toString)
Expand Down
50 changes: 36 additions & 14 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,37 @@ object TopicCommand {
}

def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient)
for(md <- metadata) {
println(md.topic)
val config = AdminUtils.fetchTopicConfig(zkClient, md.topic)
println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
println("\tpartitions: " + md.partitionsMetadata.size)
for(pd <- md.partitionsMetadata) {
println("\t\tpartition " + pd.partitionId)
println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none"))
println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", "))
println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", "))
var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
if (topics.size <= 0)
topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
for (topic <- topics) {
ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
println(topic)
val config = AdminUtils.fetchTopicConfig(zkClient, topic)
println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", "))
println("\tpartitions: " + sortedPartitions.size)
}
for ((partitionId, assignedReplicas) <- sortedPartitions) {
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
print("\t\ttopic: " + topic)
print("\tpartition: " + partitionId)
print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
print("\treplicas: " + assignedReplicas.mkString(","))
println("\tisr: " + inSyncReplicas.mkString(","))
}
}
case None =>
println("topic " + topic + " doesn't exist!")
}
}
}
Expand Down Expand Up @@ -177,7 +195,11 @@ object TopicCommand {
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])

val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
"if set when describing topics, only show under replicated partitions")
val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
"if set when describing topics, only show partitions whose leader is not available")


val options = parser.parse(args : _*)
}
Expand Down
Loading

0 comments on commit c98bdd3

Please sign in to comment.