Skip to content

Commit

Permalink
merge from 0.8 and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
junrao committed Jan 12, 2013
2 parents ed36a7f + a409531 commit 9249b76
Show file tree
Hide file tree
Showing 98 changed files with 855 additions and 759 deletions.
2 changes: 1 addition & 1 deletion config/consumer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181
zk.connectiontimeout.ms=1000000

#consumer group id
groupid=test-consumer-group
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000
45 changes: 34 additions & 11 deletions config/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,53 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.rootLogger=OFF, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

#log4j.appender.fileAppender=org.apache.log4j.FileAppender
#log4j.appender.fileAppender.File=kafka-request.log
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Turn on all our debugging info
log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=ERROR

log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=TRACE, requestAppender
log4j.additivity.kafka.request.logger=false

log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
log4j.logger.kafka.controller=TRACE, stateChangeAppender
log4j.additivity.kafka.controller=false
25 changes: 4 additions & 21 deletions config/producer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,18 @@ serializer.class=kafka.serializer.StringEncoder
# allow topic level compression
#compressed.topics=

# max message size; messages larger than that size are discarded; default is 1000000
#max.message.size=


############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue
#queue.time=
#queue.buffering.max.ms=

# the maximum size of the blocking queue for buffering on the producer
#queue.size=
#queue.buffering.max.messages=

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueueTimeout.ms=
#queue.enqueue.timeout.ms=

# the number of messages batched at the producer
#batch.size=

# the callback handler for one or multiple events
#callback.handler=

# properties required to initialize the callback handler
#callback.handler.props=

# the handler for events
#event.handler=

# properties required to initialize the event handler
#event.handler.props=

#batch.num.messages=
34 changes: 14 additions & 20 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
broker.id=0

############################# Socket Server Settings #############################

Expand All @@ -27,22 +27,22 @@ port=9092
# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#hostname=localhost
#host.name=localhost

# The number of threads handling network requests
network.threads=2
num.network.threads=2

# The number of threads doing disk I/O
io.threads=2
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer=1048576
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
max.socket.request.bytes=104857600
socket.request.max.bytes=104857600


############################# Log Basics #############################
Expand All @@ -54,9 +54,6 @@ log.dir=/tmp/kafka-logs
# for consumption, but also mean more files.
num.partitions=1

# Overrides for for the default given by num.partitions on a per-topic basis
#topic.partition.count.map=topic1:3, topic2:4

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
Expand All @@ -69,16 +66,13 @@ num.partitions=1
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval=10000
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.default.flush.interval.ms=1000

# Per-topic overrides for log.default.flush.interval.ms
#topic.flush.intervals.ms=topic1:1000, topic2:3000
log.flush.interval.ms=1000

# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
log.default.flush.scheduler.interval.ms=1000
# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

Expand All @@ -91,11 +85,11 @@ log.default.flush.scheduler.interval.ms=1000
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.size.
#log.retention.size=1073741824
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.file.size=536870912
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataGenerator(String id, Props props) throws Exception {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ public RecordWriter<NullWritable, W> getRecordWriter(TaskAttemptContext context)
job.setInt("kafka.output.compression_codec", compressionCodec);

props.setProperty("producer.type", producerType);
props.setProperty("buffer.size", Integer.toString(bufSize));
props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
props.setProperty("connect.timeout.ms", Integer.toString(timeout));
props.setProperty("reconnect.interval", Integer.toString(interval));
props.setProperty("max.message.size", Integer.toString(maxSize));
props.setProperty("compression.codec", Integer.toString(compressionCodec));

if (uri.getScheme().equals("kafka")) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ case class LeaderAndIsrRequest (versionId: Short,
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {

def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
controllerEpoch: Int, correlationId: Int) = {
this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos, liveBrokers, controllerEpoch)
}

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/api/ProducerRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.Map
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._


object ProducerRequest {
val CurrentVersion = 0.shortValue

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/StopReplicaRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ case class StopReplicaRequest(versionId: Short,
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {

def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
deletePartitions, partitions, controllerEpoch)
}

Expand Down
20 changes: 11 additions & 9 deletions core/src/main/scala/kafka/api/TopicMetadataRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,32 @@ object TopicMetadataRequest extends Logging {
val topics = new ListBuffer[String]()
for(i <- 0 until numTopics)
topics += readShortString(buffer)
val topicsList = topics.toList
debug("topic = %s".format(topicsList.head))
new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList)
}
}

case class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
val clientId: String,
val topics: Seq[String],
val correlationId: Int)
val topics: Seq[String])
extends RequestOrResponse(Some(RequestKeys.MetadataKey)){

def this(topics: Seq[String]) =
this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0)
def this(topics: Seq[String], correlationId: Int) =
this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)

def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId) // correlation id not set yet
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
}

def sizeInBytes(): Int = {
2 + 4 + shortStringLength(clientId) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
2 + /* version id */
4 + /* correlation id */
shortStringLength(clientId) + /* client id */
4 + /* number of topics */
topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}
}
15 changes: 9 additions & 6 deletions core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ object ClientUtils extends Logging{
* @param producerConfig The producer's config
* @return topic metadata response
*/
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig): TopicMetadataResponse = {
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
info("Fetching metadata for topic %s".format(topics))
info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e =>
warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
Expand All @@ -44,6 +45,8 @@ object ClientUtils extends Logging{
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
} else {
debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
}
return topicMetadataResponse
}
Expand All @@ -58,9 +61,9 @@ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
val props = new Properties()
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
props.put("clientid", clientId)
props.put("client.id", clientId)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig)
fetchTopicMetadata(topics, brokers, producerConfig, 0)
}

/**
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ class Partition(val topic: String,
.format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}

def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) {
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
leaderIsrUpdateLock synchronized {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
Expand All @@ -281,12 +281,12 @@ class Partition(val topic: String,
}
}

def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
/**
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
* for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
* follower is not catching up and should be removed from the ISR
**/
val leaderLogEndOffset = leaderReplica.logEndOffset
Expand All @@ -298,7 +298,7 @@ class Partition(val topic: String,
val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes)
val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}
Expand Down
Loading

0 comments on commit 9249b76

Please sign in to comment.