Skip to content

Commit

Permalink
KAFKA-2140 Improve code readability; reviewed by Neha Narkhede
Browse files Browse the repository at this point in the history
  • Loading branch information
ijuma authored and Neha Narkhede committed Apr 26, 2015
1 parent 622e707 commit ed1a548
Show file tree
Hide file tree
Showing 70 changed files with 194 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
*/
package org.apache.kafka.clients.consumer;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
import org.apache.kafka.common.config.AbstractConfig;
Expand All @@ -27,6 +20,13 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/**
* The consumer configuration keys
*/
Expand Down Expand Up @@ -304,7 +304,7 @@ public static Properties addDeserializerToConfig(Properties properties,
return newProperties;
}

ConsumerConfig(Map<? extends Object, ? extends Object> props) {
ConsumerConfig(Map<?, ?> props) {
super(CONFIG, props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

/**
* A container that holds the list {@link ConsumerRecord} per partition for a
* particular topic. There is one for every topic returned by a
Expand Down Expand Up @@ -55,7 +55,7 @@ public Iterable<ConsumerRecord<K, V>> records(String topic) {
throw new IllegalArgumentException("Topic must be non-null.");
List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
if (entry.getKey().equals(topic))
if (entry.getKey().topic().equals(topic))
recs.add(entry.getValue());
}
return new ConcatenatedIterable<K, V>(recs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public static Properties addSerializerToConfig(Properties properties,
return newProperties;
}

ProducerConfig(Map<? extends Object, ? extends Object> props) {
ProducerConfig(Map<?, ?> props) {
super(CONFIG, props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void clear() {

@Override
public String toString() {
StringBuilder b = new StringBuilder('{');
StringBuilder b = new StringBuilder("{");
for (int i = 0; i < this.hist.length - 1; i++) {
b.append(String.format("%.10f", binScheme.fromBin(i)));
b.append(':');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private SecurityProtocol(int id, String name) {
}

public static String getName(int id) {
return CODE_TO_SECURITY_PROTOCOL.get(id).name;
return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
}

public static List<String> getNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
*/
package org.apache.kafka.common.config;

import static org.junit.Assert.fail;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.junit.Test;

import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.fail;

public class AbstractConfigTest {

@Test
Expand Down Expand Up @@ -73,7 +73,7 @@ private static class TestConfig extends AbstractConfig {
METRIC_REPORTER_CLASSES_DOC);
}

public TestConfig(Map<? extends Object, ? extends Object> props) {
public TestConfig(Map<?, ?> props) {
super(CONFIG, props);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void setup() {
new Field("struct", new Schema(new Field("field", Type.INT32))));
this.struct = new Struct(this.schema).set("int8", (byte) 1)
.set("int16", (short) 1)
.set("int32", (int) 1)
.set("int64", (long) 1)
.set("int32", 1)
.set("int64", 1L)
.set("string", "1")
.set("bytes", "1".getBytes())
.set("array", new Object[] {1});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public boolean fetchMore () throws IOException {
_response = _consumer.fetch(fetchRequest);
if(_response != null) {
_respIterator = new ArrayList<ByteBufferMessageSet>(){{
add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
add(_response.messageSet(_request.getTopic(), _request.getPartition()));
}}.iterator();
}
_requestTime += (System.currentTimeMillis() - tempTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object ReassignPartitionsCommand extends Logging {
}
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
// before starting assignment, output the current replica assignment to facilitate rollback
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic))
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
.format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
// start the reassignment
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object TopicCommand {
topics.foreach { topic =>
try {
if (Topic.InternalTopics.contains(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic));
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
println("Topic %s is marked for deletion.".format(topic))
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ object ControlledShutdownRequest extends Logging {
}
}

case class ControlledShutdownRequest(val versionId: Short,
val correlationId: Int,
val brokerId: Int)
case class ControlledShutdownRequest(versionId: Short,
correlationId: Int,
brokerId: Int)
extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){

def this(correlationId: Int, brokerId: Int) =
Expand Down Expand Up @@ -74,4 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short,
controlledShutdownRequest.append("; BrokerId: " + brokerId)
controlledShutdownRequest.toString()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ object ControlledShutdownResponse {
}


case class ControlledShutdownResponse(val correlationId: Int,
val errorCode: Short = ErrorMapping.NoError,
val partitionsRemaining: Set[TopicAndPartition])
case class ControlledShutdownResponse(correlationId: Int,
errorCode: Short = ErrorMapping.NoError,
partitionsRemaining: Set[TopicAndPartition])
extends RequestOrResponse() {
def sizeInBytes(): Int ={
var size =
Expand All @@ -68,4 +68,4 @@ case class ControlledShutdownResponse(val correlationId: Int,

override def describe(details: Boolean):String = { toString }

}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
2 /* version id */ +
4 /* correlation id */ +
(2 + clientId.length) /* client id */ +
body.sizeOf();
body.sizeOf()
}

override def toString(): String = {
Expand All @@ -52,4 +52,4 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
strBuffer.append("; Body: " + body.toString)
strBuffer.toString()
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,

def sizeInBytes(): Int = {
4 /* correlation id */ +
body.sizeOf();
body.sizeOf()
}

override def toString(): String = {
Expand All @@ -43,4 +43,4 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
strBuffer.append("; Body: " + body.toString)
strBuffer.toString()
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ object PartitionStateInfo {
}
}

case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
val allReplicas: Set[Int]) {
case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
allReplicas: Set[Int]) {
def replicationFactor = allReplicas.size

def writeTo(buffer: ByteBuffer) {
Expand Down Expand Up @@ -200,4 +200,4 @@ case class LeaderAndIsrRequest (versionId: Short,
leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
leaderAndIsrRequest.toString()
}
}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/api/StopReplicaResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ object StopReplicaResponse {
}


case class StopReplicaResponse(val correlationId: Int,
val responseMap: Map[TopicAndPartition, Short],
val errorCode: Short = ErrorMapping.NoError)
case class StopReplicaResponse(correlationId: Int,
responseMap: Map[TopicAndPartition, Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse() {
def sizeInBytes(): Int ={
var size =
Expand Down Expand Up @@ -72,4 +72,4 @@ case class StopReplicaResponse(val correlationId: Int,
}

override def describe(details: Boolean):String = { toString }
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/api/TopicMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object PartitionMetadata {
}

case class PartitionMetadata(partitionId: Int,
val leader: Option[BrokerEndPoint],
leader: Option[BrokerEndPoint],
replicas: Seq[BrokerEndPoint],
isr: Seq[BrokerEndPoint] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) extends Logging {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/api/TopicMetadataRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ object TopicMetadataRequest extends Logging {
}
}

case class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
val clientId: String,
val topics: Seq[String])
case class TopicMetadataRequest(versionId: Short,
correlationId: Int,
clientId: String,
topics: Seq[String])
extends RequestOrResponse(Some(RequestKeys.MetadataKey)){

def this(topics: Seq[String], correlationId: Int) =
Expand Down Expand Up @@ -93,4 +93,4 @@ case class TopicMetadataRequest(val versionId: Short,
topicMetadataRequest.append("; Topics: " + topics.mkString(","))
topicMetadataRequest.toString()
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object ClientUtils extends Logging{
} else {
debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
}
return topicMetadataResponse
topicMetadataResponse
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/EndPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
def writeTo(buffer: ByteBuffer): Unit = {
buffer.putInt(port)
writeShortString(buffer, host)
buffer.putShort(protocolType.id.toShort)
buffer.putShort(protocolType.id)
}

def sizeInBytes: Int =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
*/
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up");
info("Controller starting up")
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
Expand Down Expand Up @@ -1326,7 +1326,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
}
}

case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
override def toString(): String = {
val leaderAndIsrInfo = new StringBuilder
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful)
replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
}

def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class DelayedRebalance(sessionTimeout: Long,

/* check if all known consumers have requested to re-join group */
override def tryComplete(): Boolean = {
allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
(true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.forall(_.joinGroupReceived.get()))

if (allConsumersJoinedGroup.get())
forceComplete()
Expand Down
Loading

0 comments on commit ed1a548

Please sign in to comment.