Skip to content

Commit

Permalink
KAFKA-1841; OffsetCommitRequest API - timestamp field is not versione…
Browse files Browse the repository at this point in the history
…d; patched by Jun Rao; reviewed by Joel Koshy
  • Loading branch information
junrao committed Jan 13, 2015
1 parent 9b6744d commit 432d397
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ public class Protocol {
new Field("offset",
INT64,
"Message offset to be committed."),
new Field("metadata",
STRING,
"Any associated metadata the client wants to keep."));

public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
new Field("offset",
INT64,
"Message offset to be committed."),
new Field("timestamp",
INT64,
"Timestamp of the commit"),
Expand All @@ -125,6 +135,13 @@ public class Protocol {
new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
"Partitions to commit offsets."));

public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
STRING,
"Topic to commit."),
new Field("partitions",
new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
"Partitions to commit offsets."));

public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
STRING,
"The consumer group id."),
Expand All @@ -142,7 +159,7 @@ public class Protocol {
STRING,
"The consumer id assigned by the group coordinator."),
new Field("topics",
new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
"Topics to commit offsets."));

public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
Expand All @@ -158,9 +175,11 @@ public class Protocol {
public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));

public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
/* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
public static Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;

public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1};

/* Offset fetch api */
public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
Expand All @@ -181,6 +200,10 @@ public class Protocol {
new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
"Topics to fetch offsets."));

// version 0 and 1 have exactly the same wire format, but different functionality.
// version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
public static Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;

public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
Expand All @@ -200,8 +223,11 @@ public class Protocol {
public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));

public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
/* The response types for both V0 and V1 of OFFSET_FETCH_RESPONSE are the same. */
public static Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;

public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1 };
public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1 };

/* List offset api */
public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {

public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_CONSUMER_ID = "";
public static final long DEFAULT_TIMESTAMP = -1L;

private final String groupId;
private final int generationId;
Expand All @@ -58,6 +59,11 @@ public static final class PartitionData {
public final long timestamp;
public final String metadata;

// for v0
public PartitionData(long offset, String metadata) {
this(offset, DEFAULT_TIMESTAMP, metadata);
}

public PartitionData(long offset, long timestamp, String metadata) {
this.offset = offset;
this.timestamp = timestamp;
Expand All @@ -73,7 +79,7 @@ public PartitionData(long offset, long timestamp, String metadata) {
@Deprecated
public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
initCommonFields(groupId, offsetData);
initCommonFields(groupId, offsetData, 0);
this.groupId = groupId;
this.generationId = DEFAULT_GENERATION_ID;
this.consumerId = DEFAULT_CONSUMER_ID;
Expand All @@ -90,7 +96,7 @@ public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> of
public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(curSchema));

initCommonFields(groupId, offsetData);
initCommonFields(groupId, offsetData, 1);
struct.set(GENERATION_ID_KEY_NAME, generationId);
struct.set(CONSUMER_ID_KEY_NAME, consumerId);
this.groupId = groupId;
Expand All @@ -99,7 +105,7 @@ public OffsetCommitRequest(String groupId, int generationId, String consumerId,
this.offsetData = offsetData;
}

private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData, int versionId) {
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);

struct.set(GROUP_ID_KEY_NAME, groupId);
Expand All @@ -113,7 +119,8 @@ private void initCommonFields(String groupId, Map<TopicPartition, PartitionData>
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
if (versionId == 1)
partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
partitionArray.add(partitionData);
}
Expand All @@ -133,7 +140,12 @@ public OffsetCommitRequest(Struct struct) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
long timestamp;
// timestamp only exists in v1
if (partitionResponse.hasField(TIMESTAMP_KEY_NAME))
timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
else
timestamp = DEFAULT_TIMESTAMP;
String metadata = partitionResponse.getString(METADATA_KEY_NAME);
PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
offsetData.put(new TopicPartition(topic, partition), partitionData);
Expand Down
21 changes: 15 additions & 6 deletions core/src/main/scala/kafka/api/OffsetCommitRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ object OffsetCommitRequest extends Logging {
val DefaultClientId = ""

def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
val now = SystemTime.milliseconds

// Read values from the envelope
val versionId = buffer.getShort
assert(versionId == 0 || versionId == 1,
Expand Down Expand Up @@ -59,15 +57,25 @@ object OffsetCommitRequest extends Logging {
val partitionId = buffer.getInt
val offset = buffer.getLong
val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
if (versionId == 1) {
val given = buffer.getLong
given
} else
OffsetAndMetadata.InvalidTime
}
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
})
})
OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId)
}

def changeInvalidTimeToCurrentTime(offsetCommitRequest: OffsetCommitRequest) {
val now = SystemTime.milliseconds
for ( (topicAndPartiiton, offsetAndMetadata) <- offsetCommitRequest.requestInfo)
if (offsetAndMetadata.timestamp == OffsetAndMetadata.InvalidTime)
offsetAndMetadata.timestamp = now
}
}

case class OffsetCommitRequest(groupId: String,
Expand Down Expand Up @@ -121,7 +129,8 @@ case class OffsetCommitRequest(groupId: String,
t1._2.foreach( t2 => {
buffer.putInt(t2._1.partition)
buffer.putLong(t2._2.offset)
buffer.putLong(t2._2.timestamp)
if (versionId == 1)
buffer.putLong(t2._2.timestamp)
writeShortString(buffer, t2._2.metadata)
})
})
Expand All @@ -143,7 +152,7 @@ case class OffsetCommitRequest(groupId: String,
innerCount +
4 /* partition */ +
8 /* offset */ +
8 /* timestamp */ +
(if (versionId == 1) 8 else 0 ) /* timestamp */ +
shortStringLength(offsetAndMetadata._2.metadata)
})
})
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/api/OffsetCommitResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.utils.Logging
import kafka.common.TopicAndPartition

object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 0
val CurrentVersion: Short = 1

def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
val correlationId = buffer.getInt
Expand All @@ -41,6 +41,9 @@ object OffsetCommitResponse extends Logging {
}
}

/**
* Single constructor for both version 0 and 1 since they have the same format.
*/
case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
correlationId: Int = 0)
extends RequestOrResponse() {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/api/OffsetFetchRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
object OffsetFetchRequest extends Logging {
val CurrentVersion: Short = 0
// version 0 and 1 have exactly the same wire format, but different functionality.
// version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
val CurrentVersion: Short = 1
val DefaultClientId = ""

def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.common

case class OffsetAndMetadata(offset: Long,
metadata: String = OffsetAndMetadata.NoMetadata,
timestamp: Long = -1L) {
var timestamp: Long = -1L) {
override def toString = "OffsetAndMetadata[%d,%s%s]"
.format(offset,
if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",
Expand Down
Loading

0 comments on commit 432d397

Please sign in to comment.