Skip to content

Commit

Permalink
KAFKA-5627; Reduce classes needed for LeaderAndIsrPartitionState and …
Browse files Browse the repository at this point in the history
…MetadataPartitionState

Author: Dong Lin <[email protected]>

Reviewers: Jiangjie Qin <[email protected]>, Ismael Juma <[email protected]>

Closes apache#3565 from lindong28/KAFKA-5627
  • Loading branch information
lindong28 authored and becketqin committed Jul 26, 2017
1 parent 91c207c commit f15cdbc
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,25 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.utils.Utils;

import java.util.List;

// This is used to describe per-partition state in the LeaderAndIsrRequest
public class PartitionState {
// This class contains the common fields shared between LeaderAndIsrRequest.PartitionState and UpdateMetadataRequest.PartitionState
public class BasePartitionState {

public final int controllerEpoch;
public final int leader;
public final int leaderEpoch;
public final List<Integer> isr;
public final int zkVersion;
public final List<Integer> replicas;
public final boolean isNew;

public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas, boolean isNew) {
BasePartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas) {
this.controllerEpoch = controllerEpoch;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.isr = isr;
this.zkVersion = zkVersion;
this.replicas = replicas;
this.isNew = isNew;
}

@Override
public String toString() {
return "PartitionState(controllerEpoch=" + controllerEpoch +
", leader=" + leader +
", leaderEpoch=" + leaderEpoch +
", isr=" + Utils.join(isr, ",") +
", zkVersion=" + zkVersion +
", replicas=" + Utils.join(replicas, ",") +
", isNew=" + isNew + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public String toString() {
private final Set<Node> liveLeaders;

private LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
Set<Node> liveLeaders, short version) {
Set<Node> liveLeaders, short version) {
super(version);
this.controllerId = controllerId;
this.controllerEpoch = controllerEpoch;
Expand Down Expand Up @@ -157,12 +157,12 @@ protected Struct toStruct() {
partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
PartitionState partitionState = entry.getValue();
partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);
partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.basePartitionState.leaderEpoch);
partitionStateData.set(ISR_KEY_NAME, partitionState.basePartitionState.isr.toArray());
partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.basePartitionState.zkVersion);
partitionStateData.set(REPLICAS_KEY_NAME, partitionState.basePartitionState.replicas.toArray());
if (partitionStateData.hasField(IS_NEW_KEY_NAME))
partitionStateData.set(IS_NEW_KEY_NAME, partitionState.isNew);
partitionStatesData.add(partitionStateData);
Expand Down Expand Up @@ -219,4 +219,31 @@ public static LeaderAndIsrRequest parse(ByteBuffer buffer, short version) {
return new LeaderAndIsrRequest(ApiKeys.LEADER_AND_ISR.parseRequest(version, buffer), version);
}

public static final class PartitionState {
public final BasePartitionState basePartitionState;
public final boolean isNew;

public PartitionState(int controllerEpoch,
int leader,
int leaderEpoch,
List<Integer> isr,
int zkVersion,
List<Integer> replicas,
boolean isNew) {
this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
this.isNew = isNew;
}

@Override
public String toString() {
return "PartitionState(controllerEpoch=" + basePartitionState.controllerEpoch +
", leader=" + basePartitionState.leader +
", leaderEpoch=" + basePartitionState.leaderEpoch +
", isr=" + Utils.join(basePartitionState.isr, ",") +
", zkVersion=" + basePartitionState.zkVersion +
", replicas=" + Utils.join(basePartitionState.replicas, ",") +
", isNew=" + isNew + ")";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ public String toString() {
}

public static final class PartitionState {
public final int controllerEpoch;
public final int leader;
public final int leaderEpoch;
public final List<Integer> isr;
public final int zkVersion;
public final List<Integer> replicas;
public final BasePartitionState basePartitionState;
public final List<Integer> offlineReplicas;

public PartitionState(int controllerEpoch,
Expand All @@ -91,24 +86,19 @@ public PartitionState(int controllerEpoch,
int zkVersion,
List<Integer> replicas,
List<Integer> offlineReplicas) {
this.controllerEpoch = controllerEpoch;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.isr = isr;
this.zkVersion = zkVersion;
this.replicas = replicas;
this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
this.offlineReplicas = offlineReplicas;
}

@Override
public String toString() {
return "PartitionState(controllerEpoch=" + controllerEpoch +
", leader=" + leader +
", leaderEpoch=" + leaderEpoch +
", isr=" + Arrays.toString(isr.toArray()) +
", zkVersion=" + zkVersion +
", replicas=" + Arrays.toString(replicas.toArray()) +
", offlineReplicas=" + Arrays.toString(replicas.toArray()) + ")";
return "PartitionState(controllerEpoch=" + basePartitionState.controllerEpoch +
", leader=" + basePartitionState.leader +
", leaderEpoch=" + basePartitionState.leaderEpoch +
", isr=" + Arrays.toString(basePartitionState.isr.toArray()) +
", zkVersion=" + basePartitionState.zkVersion +
", replicas=" + Arrays.toString(basePartitionState.replicas.toArray()) +
", offlineReplicas=" + Arrays.toString(offlineReplicas.toArray()) + ")";
}
}

Expand Down Expand Up @@ -285,12 +275,12 @@ protected Struct toStruct() {
partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
PartitionState partitionState = entry.getValue();
partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);
partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.basePartitionState.leaderEpoch);
partitionStateData.set(ISR_KEY_NAME, partitionState.basePartitionState.isr.toArray());
partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.basePartitionState.zkVersion);
partitionStateData.set(REPLICAS_KEY_NAME, partitionState.basePartitionState.replicas.toArray());
if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME))
partitionStateData.set(OFFLINE_REPLICAS_KEY_NAME, partitionState.offlineReplicas.toArray());
partitionStatesData.add(partitionStateData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,15 @@ private ControlledShutdownResponse createControlledShutdownResponse() {
}

private LeaderAndIsrRequest createLeaderAndIsrRequest() {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = asList(1, 2);
List<Integer> replicas = asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false));
new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false));
partitionStates.put(new TopicPartition("topic5", 1),
new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false));
new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false));
partitionStates.put(new TopicPartition("topic20", 1),
new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false));
new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false));

Set<Node> leaders = Utils.mkSet(
new Node(0, "test0", 1223),
Expand Down
27 changes: 0 additions & 27 deletions core/src/main/scala/kafka/api/LeaderAndIsr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package kafka.api

import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.utils._

import scala.collection.Set

object LeaderAndIsr {
val initialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0
Expand Down Expand Up @@ -49,27 +46,3 @@ case class LeaderAndIsr(leader: Int,
Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
}
}

case class LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], isNew: Boolean) {

override def toString: String = {
val partitionStateInfo = new StringBuilder
partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
partitionStateInfo.append(",isNew:" + isNew + ")")
partitionStateInfo.toString()
}
}

case class MetadataPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], offlineReplicas: Seq[Int]) {

override def toString: String = {
val partitionStateInfo = new StringBuilder
partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
partitionStateInfo.append(",OfflineReplicas:" + offlineReplicas.mkString(",") + ")")
partitionStateInfo.toString()
}
}
29 changes: 15 additions & 14 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kafka.cluster


import java.util.concurrent.locks.ReentrantReadWriteLock

import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
Expand All @@ -34,7 +35,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState}
import org.apache.kafka.common.requests.{EpochEndOffset, LeaderAndIsrRequest}
import org.apache.kafka.common.utils.Time

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -168,25 +169,25 @@ class Partition(val topic: String,
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas

info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")

//We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
leaderEpoch = partitionStateInfo.leaderEpoch
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))

zkVersion = partitionStateInfo.zkVersion
zkVersion = partitionStateInfo.basePartitionState.zkVersion
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
false
Expand Down Expand Up @@ -221,20 +222,20 @@ class Partition(val topic: String,
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change, return false to indicate the replica manager
*/
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader
val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.basePartitionState.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
zkVersion = partitionStateInfo.basePartitionState.zkVersion

if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
Expand Down
Loading

0 comments on commit f15cdbc

Please sign in to comment.