Skip to content

Commit

Permalink
new consumer request format; patched by Prashanth Menon; reviewed by …
Browse files Browse the repository at this point in the history
…Jun Rao and Jay Kreps; KAFKA-240

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1243407 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jun Rao committed Feb 13, 2012
1 parent 21da304 commit a5fb217
Show file tree
Hide file tree
Showing 37 changed files with 1,037 additions and 803 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,26 @@
*/
package kafka.etl;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
import kafka.api.FetchRequest;
import kafka.javaapi.MultiFetchResponse;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.javaapi.message.MessageSet;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;

@SuppressWarnings({ "deprecation"})
public class KafkaETLContext {
Expand All @@ -59,7 +58,8 @@ public class KafkaETLContext {
protected long _offset = Long.MAX_VALUE; /*current offset*/
protected long _count; /*current count*/

protected MultiFetchResponse _response = null; /*fetch response*/
protected int requestId = 0; /* the id of the next fetch request */
protected FetchResponse _response = null; /*fetch response*/
protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
protected Iterator<ByteBufferMessageSet> _respIterator = null;
protected int _retry = 0;
Expand Down Expand Up @@ -149,15 +149,19 @@ public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException
public boolean fetchMore () throws IOException {
if (!hasMore()) return false;

FetchRequest fetchRequest =
new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
List<FetchRequest> array = new ArrayList<FetchRequest>();
array.add(fetchRequest);
FetchRequest fetchRequest = new FetchRequestBuilder()
.correlationId(requestId)
.clientId(_request.clientId())
.addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
.build();

long tempTime = System.currentTimeMillis();
_response = _consumer.multifetch(array);
if(_response != null)
_respIterator = _response.iterator();
_response = _consumer.fetch(fetchRequest);
if(_response != null) {
_respIterator = new ArrayList<ByteBufferMessageSet>(){{
add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
}}.iterator();
}
_requestTime += (System.currentTimeMillis() - tempTime);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class KafkaETLRequest {
URI _uri;
int _partition;
long _offset = DEFAULT_OFFSET;
String _clientId = "KafkaHadoopETL";

public KafkaETLRequest() {

Expand Down Expand Up @@ -83,11 +84,11 @@ public void setOffset(long offset) {
_offset = offset;
}

public String getTopic() { return _topic;}
public URI getURI () { return _uri;}
public int getPartition() { return _partition;}

public long getOffset() { return _offset;}
public String getTopic() { return _topic; }
public URI getURI () { return _uri; }
public int getPartition() { return _partition; }
public long getOffset() { return _offset; }
public String clientId() { return _clientId; }

public boolean isValidOffset() {
return _offset >= 0;
Expand Down
155 changes: 136 additions & 19 deletions core/src/main/scala/kafka/api/FetchRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,149 @@ package kafka.api
import java.nio._
import kafka.network._
import kafka.utils._
import scala.collection.mutable.{HashMap, Buffer, ListBuffer}

object OffsetDetail {

def readFrom(buffer: ByteBuffer): OffsetDetail = {
val topic = Utils.readShortString(buffer, "UTF-8")

val partitionsCount = buffer.getInt
val partitions = new Array[Int](partitionsCount)
for (i <- 0 until partitions.length)
partitions(i) = buffer.getInt

val offsetsCount = buffer.getInt
val offsets = new Array[Long](offsetsCount)
for (i <- 0 until offsets.length)
offsets(i) = buffer.getLong

val fetchesCount = buffer.getInt
val fetchSizes = new Array[Int](fetchesCount)
for (i <- 0 until fetchSizes.length)
fetchSizes(i) = buffer.getInt

new OffsetDetail(topic, partitions, offsets, fetchSizes)
}

}

case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {

def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic, "UTF-8")

if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")

buffer.putInt(partitions.length)
partitions.foreach(buffer.putInt(_))

buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))

buffer.putInt(fetchSizes.length)
fetchSizes.foreach(buffer.putInt(_))
}

def sizeInBytes(): Int = {
2 + topic.length() + // topic string
partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int)
offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long)
fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size
}
}

object FetchRequest {

val CurrentVersion = 1.shortValue()

def readFrom(buffer: ByteBuffer): FetchRequest = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt()
val offset = buffer.getLong()
val size = buffer.getInt()
new FetchRequest(topic, partition, offset, size)
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = Utils.readShortString(buffer, "UTF-8")
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val offsetsCount = buffer.getInt
val offsetInfo = new Array[OffsetDetail](offsetsCount)
for(i <- 0 until offsetInfo.length)
offsetInfo(i) = OffsetDetail.readFrom(buffer)

new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
}

}

class FetchRequest(val topic: String,
val partition: Int,
val offset: Long,
val maxSize: Int) extends Request(RequestKeys.Fetch) {

case class FetchRequest( versionId: Short,
correlationId: Int,
clientId: String,
replicaId: Int,
maxWait: Int,
minBytes: Int,
offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {

def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(offset)
buffer.putInt(maxSize)
buffer.putShort(versionId)
buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, "UTF-8")
buffer.putInt(replicaId)
buffer.putInt(maxWait)
buffer.putInt(minBytes)
buffer.putInt(offsetInfo.size)
for(topicDetail <- offsetInfo) {
topicDetail.writeTo(buffer)
}
}

def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4

override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
" maxSize:" + maxSize + ")"
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
}

class FetchRequestBuilder() {
private var correlationId = -1
private val versionId = FetchRequest.CurrentVersion
private var clientId = ""
private var replicaId = -1 // sensible default
private var maxWait = -1 // sensible default
private var minBytes = -1 // sensible default
private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]

def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
topicData._1.append(partition)
topicData._2.append(offset)
topicData._3.append(fetchSize)
this
}

def correlationId(correlationId: Int): FetchRequestBuilder = {
this.correlationId = correlationId
this
}

def clientId(clientId: String): FetchRequestBuilder = {
this.clientId = clientId
this
}

def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId
this
}

def maxWait(maxWait: Int): FetchRequestBuilder = {
this.maxWait = maxWait
this
}

def minBytes(minBytes: Int): FetchRequestBuilder = {
this.minBytes = minBytes
this
}

def build() = {
val offsetDetails = requestMap.map{ topicData =>
new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
}
new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
}
}
Loading

0 comments on commit a5fb217

Please sign in to comment.