Skip to content

Commit

Permalink
KAFKA-544 Store the key given to the producer in the message. Expose …
Browse files Browse the repository at this point in the history
…this key in the consumer. Patch reviewed by Jun.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1410055 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Edward Jay Kreps committed Nov 15, 2012
1 parent 22e032f commit f4ccf21
Show file tree
Hide file tree
Showing 54 changed files with 735 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -81,18 +81,17 @@ public DataGenerator(String id, Props props) throws Exception {

public void run() throws Exception {

List<Message> list = new ArrayList<Message>();
List<KeyedMessage> list = new ArrayList<KeyedMessage>();
for (int i = 0; i < _count; i++) {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
byte[] bytes = timestamp.toString().getBytes("UTF8");
Message message = new Message(bytes);
list.add(message);
list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
_producer.send(pd);
_producer.send(list);

// close the producer
_producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.KeyedMessage;
import kafka.message.Message;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -33,7 +33,7 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
protected Producer<Integer, Message> producer;
protected String topic;

protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
protected int totalSize = 0;
protected int queueSize;

Expand All @@ -57,7 +57,7 @@ protected void sendMsgList()
public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
{
Message msg = new Message(value.getBytes());
msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
totalSize += msg.size();

// MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
Expand Down
38 changes: 8 additions & 30 deletions core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.serializer.StringDecoder
import kafka.serializer._
import kafka.utils._
import kafka.metrics.KafkaMetricsReporter

Expand Down Expand Up @@ -179,15 +179,15 @@ object ConsoleConsumer extends Logging {
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
try {
val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
val iter = if(maxMessages >= 0)
stream.slice(0, maxMessages)
else
stream

for(messageAndTopic <- iter) {
try {
formatter.writeTo(messageAndTopic.message, System.out)
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
} catch {
case e =>
if (skipMessageOnError)
Expand Down Expand Up @@ -251,36 +251,14 @@ object MessageFormatter {
}

trait MessageFormatter {
def writeTo(message: Message, output: PrintStream)
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
def close() {}
}

class DecodedMessageFormatter extends MessageFormatter {
var topicStr: String = _
val decoder = new StringDecoder()

override def init(props: Properties) {
topicStr = props.getProperty("topic")
if (topicStr != null)
topicStr = topicStr + ":"
else
topicStr = ""
}

def writeTo(message: Message, output: PrintStream) {
try {
output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
} catch {
case e => e.printStackTrace()
}
}
}

class NewlineMessageFormatter extends MessageFormatter {
def writeTo(message: Message, output: PrintStream) {
val payload = message.payload
output.write(payload.array, payload.arrayOffset, payload.limit)
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
output.write(value)
output.write('\n')
}
}
Expand All @@ -296,8 +274,8 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}

def writeTo(message: Message, output: PrintStream) {
val chksum = message.checksum
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
val chksum = new Message(value, key).checksum
output.println(topicStr + "checksum:" + chksum)
}
}
37 changes: 26 additions & 11 deletions core/src/main/scala/kafka/consumer/ConsumerConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,53 @@ package kafka.consumer

import scala.collection._
import kafka.utils.Logging
import kafka.serializer.{DefaultDecoder, Decoder}
import kafka.serializer._

/**
* Main interface for consumer
*/
trait ConsumerConnector {

/**
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param decoder Decoder to decode each Message to type T
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T] = new DefaultDecoder)
: Map[String,List[KafkaStream[T]]]

def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]

/**
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param keyDecoder Decoder to decode the key portion of the message
* @param valueDecoder Decoder to decode the value portion of the message
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
def createMessageStreams[K,V](topicCountMap: Map[String,Int],
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]]

/**
* Create a list of message streams for all topics that match a given filter.
*
* @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
* @param numStreams Number of streams to return
* @param decoder Decoder to decode each Message to type T
* @param keyDecoder Decoder to decode the key portion of the message
* @param valueDecoder Decoder to decode the value portion of the message
* @return a list of KafkaStream each of which provides an
* iterator over message/metadata pairs over allowed topics.
*/
def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
numStreams: Int = 1,
decoder: Decoder[T] = new DefaultDecoder)
: Seq[KafkaStream[T]]
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
numStreams: Int = 1,
keyDecoder: Decoder[K] = new DefaultDecoder(),
valueDecoder: Decoder[V] = new DefaultDecoder())
: Seq[KafkaStream[K,V]]

/**
* Commit the offsets of all broker partitions connected by this connector.
Expand Down
24 changes: 14 additions & 10 deletions core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.consumer

import kafka.utils.{IteratorTemplate, Logging}
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -30,17 +30,18 @@ import kafka.common.{KafkaException, MessageSizeTooLargeException}
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {

private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo:PartitionTopicInfo = null
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L

override def next(): MessageAndMetadata[T] = {
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
Expand All @@ -52,7 +53,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
item
}

protected def makeNext(): MessageAndMetadata[T] = {
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
Expand Down Expand Up @@ -103,7 +104,10 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],

item.message.ensureValid() // validate checksum of message to ensure it is valid

new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
val keyBuffer = item.message.key
val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
}

def clearCurrentChunk() {
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/consumer/KafkaStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ import java.util.concurrent.BlockingQueue
import kafka.serializer.Decoder
import kafka.message.MessageAndMetadata

class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {

private val iter: ConsumerIterator[T] =
new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
private val iter: ConsumerIterator[K,V] =
new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)

/**
* Create an iterator over messages in the stream.
*/
def iterator(): ConsumerIterator[T] = iter
def iterator(): ConsumerIterator[K,V] = iter

/**
* This method clears the queue being iterated during the consumer rebalancing. This is mainly
Expand Down
Loading

0 comments on commit f4ccf21

Please sign in to comment.