Skip to content

Commit

Permalink
KAFKA-10435; Fetch protocol changes for KIP-595 (apache#9275)
Browse files Browse the repository at this point in the history
This patch bumps the `Fetch` protocol as specified by KIP-595: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum. The main differences are the following:

- Truncation detection 
- Leader discovery through the response
- Flexible version support

The most notable change is truncation detection. This patch adds logic in the request handling path to detect truncation, but it does not change the replica fetchers to make use of this capability. This will be done separately.

Reviewers: Rajini Sivaram <[email protected]>
  • Loading branch information
hachikuji authored Sep 15, 2020
1 parent a46c07e commit 634c917
Show file tree
Hide file tree
Showing 21 changed files with 442 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,8 @@ private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
}

builder.add(partition, new FetchRequest.PartitionData(position.offset,
FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, position.currentLeader.epoch));
FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize,
position.currentLeader.epoch, Optional.empty()));

log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel,
partition, position, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,48 +64,79 @@ public static final class PartitionData {
public final long logStartOffset;
public final int maxBytes;
public final Optional<Integer> currentLeaderEpoch;
public final Optional<Integer> lastFetchedEpoch;

public PartitionData(
long fetchOffset,
long logStartOffset,
int maxBytes,
Optional<Integer> currentLeaderEpoch
) {
this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty());
}

public PartitionData(long fetchOffset, long logStartOffset, int maxBytes, Optional<Integer> currentLeaderEpoch) {
public PartitionData(
long fetchOffset,
long logStartOffset,
int maxBytes,
Optional<Integer> currentLeaderEpoch,
Optional<Integer> lastFetchedEpoch
) {
this.fetchOffset = fetchOffset;
this.logStartOffset = logStartOffset;
this.maxBytes = maxBytes;
this.currentLeaderEpoch = currentLeaderEpoch;
this.lastFetchedEpoch = lastFetchedEpoch;
}

@Override
public String toString() {
return "(fetchOffset=" + fetchOffset +
", logStartOffset=" + logStartOffset +
", maxBytes=" + maxBytes +
", currentLeaderEpoch=" + currentLeaderEpoch +
")";
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionData that = (PartitionData) o;
return fetchOffset == that.fetchOffset &&
logStartOffset == that.logStartOffset &&
maxBytes == that.maxBytes &&
Objects.equals(currentLeaderEpoch, that.currentLeaderEpoch) &&
Objects.equals(lastFetchedEpoch, that.lastFetchedEpoch);
}

@Override
public int hashCode() {
return Objects.hash(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch);
return Objects.hash(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionData that = (PartitionData) o;
return fetchOffset == that.fetchOffset &&
logStartOffset == that.logStartOffset &&
maxBytes == that.maxBytes &&
currentLeaderEpoch.equals(that.currentLeaderEpoch);
public String toString() {
return "PartitionData(" +
"fetchOffset=" + fetchOffset +
", logStartOffset=" + logStartOffset +
", maxBytes=" + maxBytes +
", currentLeaderEpoch=" + currentLeaderEpoch +
", lastFetchedEpoch=" + lastFetchedEpoch +
')';
}
}

private Optional<Integer> optionalEpoch(int rawEpochValue) {
if (rawEpochValue < 0) {
return Optional.empty();
} else {
return Optional.of(rawEpochValue);
}
}

private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
.filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
fetchPartition.partitionMaxBytes(), leaderEpoch));
new PartitionData(
fetchPartition.fetchOffset(),
fetchPartition.logStartOffset(),
fetchPartition.partitionMaxBytes(),
optionalEpoch(fetchPartition.currentLeaderEpoch()),
optionalEpoch(fetchPartition.lastFetchedEpoch())
));
}));
return Collections.unmodifiableMap(result);
}
Expand Down Expand Up @@ -232,19 +263,25 @@ public FetchRequest build(short version) {
// We collect the partitions in a single FetchTopic only if they appear sequentially in the fetchData
FetchRequestData.FetchTopic fetchTopic = null;
for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
if (fetchTopic == null || !entry.getKey().topic().equals(fetchTopic.topic())) {
TopicPartition topicPartition = entry.getKey();
PartitionData partitionData = entry.getValue();

if (fetchTopic == null || !topicPartition.topic().equals(fetchTopic.topic())) {
fetchTopic = new FetchRequestData.FetchTopic()
.setTopic(entry.getKey().topic())
.setTopic(topicPartition.topic())
.setPartitions(new ArrayList<>());
fetchRequestData.topics().add(fetchTopic);
}

fetchTopic.partitions().add(
new FetchRequestData.FetchPartition().setPartition(entry.getKey().partition())
.setCurrentLeaderEpoch(entry.getValue().currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setFetchOffset(entry.getValue().fetchOffset)
.setLogStartOffset(entry.getValue().logStartOffset)
.setPartitionMaxBytes(entry.getValue().maxBytes));
FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition()
.setPartition(topicPartition.partition())
.setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setLastFetchedEpoch(partitionData.lastFetchedEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setFetchOffset(partitionData.fetchOffset)
.setLogStartOffset(partitionData.logStartOffset)
.setPartitionMaxBytes(partitionData.maxBytes);

fetchTopic.partitions().add(fetchPartition);
}

if (metadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static final class PartitionData<T extends BaseRecords> {

// Derived fields
private final Optional<Integer> preferredReplica;
private final Optional<Long> truncationOffset;
private final List<AbortedTransaction> abortedTransactions;
private final Errors error;

Expand All @@ -141,6 +142,9 @@ private PartitionData(FetchResponseData.FetchablePartitionResponse partitionResp
.collect(Collectors.toList());
}

this.truncationOffset = partitionResponse.truncationOffset() < 0 ?
Optional.empty() :
Optional.of(partitionResponse.truncationOffset());
this.error = Errors.forCode(partitionResponse.errorCode());
}

Expand All @@ -150,10 +154,13 @@ public PartitionData(Errors error,
long logStartOffset,
Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
Optional<Long> truncationOffset,
T records) {
this.preferredReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.error = error;
this.truncationOffset = truncationOffset;

FetchResponseData.FetchablePartitionResponse partitionResponse =
new FetchResponseData.FetchablePartitionResponse();
partitionResponse.setErrorCode(error.code())
Expand All @@ -171,10 +178,22 @@ public PartitionData(Errors error,
}
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
partitionResponse.setRecordSet(records);
truncationOffset.ifPresent(partitionResponse::setTruncationOffset);

this.partitionResponse = partitionResponse;
}

public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
long logStartOffset,
Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
T records) {
this(error, highWatermark, lastStableOffset, logStartOffset, preferredReadReplica,
abortedTransactions, Optional.empty(), records);
}

public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
Expand Down Expand Up @@ -236,6 +255,10 @@ public List<AbortedTransaction> abortedTransactions() {
return abortedTransactions;
}

public Optional<Long> truncationOffset() {
return truncationOffset;
}

@SuppressWarnings("unchecked")
public T records() {
return (T) partitionResponse.recordSet();
Expand Down
10 changes: 8 additions & 2 deletions clients/src/main/resources/common/message/FetchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@
//
// Version 10 indicates that we can use the ZStd compression algorithm, as
// described in KIP-110.
// Version 12 adds flexible versions support as well as epoch validation through
// the `LastFetchedEpoch` field
//
"validVersions": "0-11",
"flexibleVersions": "none",
"validVersions": "0-12",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0,
"about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
Expand All @@ -73,6 +77,8 @@
"about": "The current leader epoch of the partition." },
{ "name": "FetchOffset", "type": "int64", "versions": "0+",
"about": "The message offset." },
{ "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false,
"about": "The epoch of the last fetched record or -1 if there is none"},
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."},
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
Expand Down
15 changes: 13 additions & 2 deletions clients/src/main/resources/common/message/FetchResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
//
// Version 10 indicates that the response data can use the ZStd compression
// algorithm, as described in KIP-110.
// Version 12 adds support for flexible versions, epoch detection through the `TruncationOffset` field,
// and leader discovery through the `CurrentLeader` field
//
"validVersions": "0-11",
"flexibleVersions": "none",
"validVersions": "0-12",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand All @@ -63,6 +65,15 @@
"about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The current log start offset." },
{ "name": "TruncationOffset", "type": "int64", "versions": "12+", "default": "-1", "taggedVersions": "12+", "tag": 0,
"about": "If set and it is not -1, the follower must truncate all offsets that are greater than or equal to this value." },
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
"versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
"about": "The latest known leader epoch"}
]},
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ object ApiVersion {
// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570)
KAFKA_2_6_IV0,
// Introduced feature versioning support (KIP-584)
KAFKA_2_7_IV0
KAFKA_2_7_IV0,
// Bup Fetch protocol for Raft protocol (KIP-595)
KAFKA_2_7_IV1,
)

// Map keys are the union of the short and full versions
Expand Down Expand Up @@ -361,6 +363,13 @@ case object KAFKA_2_7_IV0 extends DefaultApiVersion {
val id: Int = 28
}

case object KAFKA_2_7_IV1 extends DefaultApiVersion {
val shortVersion: String = "2.7"
val subVersion = "IV1"
val recordVersion = RecordVersion.V2
val id: Int = 29
}

object ApiVersionValidator extends Validator {

override def ensureValid(name: String, value: Any): Unit = {
Expand Down
35 changes: 33 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,8 @@ class Partition(val topicPartition: TopicPartition,
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}

def readRecords(fetchOffset: Long,
def readRecords(lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
Expand All @@ -1012,10 +1013,40 @@ class Partition(val topicPartition: TopicPartition,
val initialLogStartOffset = localLog.logStartOffset
val initialLogEndOffset = localLog.logEndOffset
val initialLastStableOffset = localLog.lastStableOffset
val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)

lastFetchedEpoch.ifPresent { fetchEpoch =>
val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
if (epochEndOffset.error != Errors.NONE) {
throw epochEndOffset.error.exception()
}

if (epochEndOffset.hasUndefinedEpochOrOffset) {
throw new OffsetOutOfRangeException("Could not determine the end offset of the last fetched epoch " +
s"$lastFetchedEpoch from the request")
}

if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
val emptyFetchData = FetchDataInfo(
fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
)

return LogReadInfo(
fetchedData = emptyFetchData,
truncationOffset = Some(epochEndOffset.endOffset),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
}
}

val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
LogReadInfo(
fetchedData = fetchedData,
truncationOffset = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ case class LogOffsetSnapshot(logStartOffset: Long,
* Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
truncationOffset: Option[Long],
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,19 @@ class DelayedFetch(delayMs: Long,
quota = quota)

val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica,
fetchMetadata.isFromFollower && replicaManager.isAddingReplica(tp, fetchMetadata.replicaId))
val isReassignmentFetch = fetchMetadata.isFromFollower &&
replicaManager.isAddingReplica(tp, fetchMetadata.replicaId)

tp -> FetchPartitionData(
result.error,
result.highWatermark,
result.leaderLogStartOffset,
result.info.records,
result.truncationOffset,
result.lastStableOffset,
result.info.abortedTransactions,
result.preferredReadReplica,
isReassignmentFetch)
}

responseCallback(fetchPartitionData)
Expand Down
Loading

0 comments on commit 634c917

Please sign in to comment.