Skip to content

Commit

Permalink
KAFKA-3548: Use root locale for case transformation of constant strings
Browse files Browse the repository at this point in the history
For enums and other constant strings, use locale independent case conversions to enable comparisons to work regardless of the default locale.

Author: Rajini Sivaram <[email protected]>

Reviewers: Manikumar Reddy, Ismael Juma, Guozhang Wang, Gwen Shapira

Closes apache#1220 from rajinisivaram/KAFKA-3548
  • Loading branch information
rajinisivaram authored and gwenshap committed Apr 21, 2016
1 parent f213625 commit 9d71489
Show file tree
Hide file tree
Showing 23 changed files with 57 additions and 30 deletions.
7 changes: 7 additions & 0 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,12 @@
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>

<!-- locale-sensitive methods should specify locale -->
<module name="Regexp">
<property name="format" value="\.to(Lower|Upper)Case\(\)"/>
<property name="illegalPattern" value="true"/>
<property name="ignoreComments" value="true"/>
</module>
</module>
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -586,7 +587,7 @@ private KafkaConsumer(ConsumerConfig config,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -291,7 +292,7 @@ else if (strategy == OffsetResetStrategy.LATEST)
else
throw new NoOffsetForPartitionException(partition);

log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
long offset = listOffset(partition, timestamp);

// we might lose the assignment while fetching the offset, so check it is still active
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial

private static int parseAcks(String acksString) {
try {
return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim());
return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
} catch (NumberFormatException e) {
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -891,7 +892,7 @@ public String toHtmlTable() {
b.append(def.documentation);
b.append("</td>");
b.append("<td>");
b.append(def.type.toString().toLowerCase());
b.append(def.type.toString().toLowerCase(Locale.ROOT));
b.append("</td>");
b.append("<td>");
if (def.hasDefault()) {
Expand All @@ -908,7 +909,7 @@ else if (def.type == Type.STRING && def.defaultValue.toString().isEmpty())
b.append(def.validator != null ? def.validator.toString() : "");
b.append("</td>");
b.append("<td>");
b.append(def.importance.toString().toLowerCase());
b.append(def.importance.toString().toLowerCase(Locale.ROOT));
b.append("</td>");
b.append("</tr>\n");
}
Expand Down Expand Up @@ -937,7 +938,7 @@ public String toRst() {
b.append("\n\n");
}
b.append(" * Type: ");
b.append(def.type.toString().toLowerCase());
b.append(def.type.toString().toLowerCase(Locale.ROOT));
b.append("\n");
if (def.defaultValue != null) {
b.append(" * Default: ");
Expand All @@ -951,7 +952,7 @@ public String toRst() {
b.append("\n");
}
b.append(" * Importance: ");
b.append(def.importance.toString().toLowerCase());
b.append(def.importance.toString().toLowerCase(Locale.ROOT));
b.append("\n\n");
}
return b.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.apache.kafka.common.metrics.stats;

import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.metrics.MeasurableStat;
Expand Down Expand Up @@ -48,7 +49,7 @@ public Rate(TimeUnit unit, SampledStat stat) {
}

public String unitName() {
return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
return unit.name().substring(0, unit.name().length() - 2).toLowerCase(Locale.ROOT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -85,7 +86,7 @@ public static SecurityProtocol forId(Short id) {

/** Case insensitive lookup by protocol name */
public static SecurityProtocol forName(String name) {
return SecurityProtocol.valueOf(name.toUpperCase());
return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -59,7 +60,7 @@ public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String>
record.timestamp(), record.timestampType(),
record.checksum(), record.serializedKeySize(),
record.serializedValueSize(),
record.key(), record.value().toUpperCase()));
record.key(), record.value().toUpperCase(Locale.ROOT)));
}
recordMap.put(tp, lst);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.connect.data;

import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand Down Expand Up @@ -48,7 +49,7 @@ enum Type {
private String name;

Type() {
this.name = this.name().toLowerCase();
this.name = this.name().toLowerCase(Locale.ROOT);
}

public String getName() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/consumer/ConsumerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties
import kafka.api.OffsetRequest
import kafka.utils._
import kafka.common.{InvalidConfigException, Config}
import java.util.Locale

object ConsumerConfig extends Config {
val RefreshMetadataBackoffMs = 200
Expand Down Expand Up @@ -163,7 +164,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)

/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase(Locale.ROOT)

/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
import java.util.Locale

object Defaults {
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
Expand Down Expand Up @@ -70,10 +71,10 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete
val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase(Locale.ROOT) != LogConfig.Delete
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase(Locale.ROOT)
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/message/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.message

import java.util.Locale

object CompressionCodec {
def getCompressionCodec(codec: Int): CompressionCodec = {
codec match {
Expand All @@ -28,7 +30,7 @@ object CompressionCodec {
}
}
def getCompressionCodec(name: String): CompressionCodec = {
name.toLowerCase match {
name.toLowerCase(Locale.ROOT) match {
case NoCompressionCodec.name => NoCompressionCodec
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
Expand All @@ -43,10 +45,10 @@ object BrokerCompressionCodec {
val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)

def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase())
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))

def getCompressionCodec(compressionType: String): CompressionCodec = {
compressionType.toLowerCase match {
compressionType.toLowerCase(Locale.ROOT) match {
case UncompressedCodec.name => NoCompressionCodec
case _ => CompressionCodec.getCompressionCodec(compressionType)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ class DefaultMessageFormatter extends MessageFormatter {

override def init(props: Properties) {
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes
if (props.containsKey("line.separator"))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ object ConsoleProducer {
override def init(inputStream: InputStream, props: Properties) {
topic = props.getProperty("topic")
if (props.containsKey("parse.key"))
parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
if (props.containsKey("ignore.error"))
ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/utils/Log4jController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.utils

import org.apache.log4j.{Logger, Level, LogManager}
import java.util
import java.util.Locale


object Log4jController {
Expand Down Expand Up @@ -81,7 +82,7 @@ private class Log4jController extends Log4jControllerMBean {
def setLogLevel(loggerName: String, level: String) = {
val log = newLogger(loggerName)
if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
log.setLevel(Level.toLevel(level.toUpperCase))
log.setLevel(Level.toLevel(level.toUpperCase(Locale.ROOT)))
true
}
else false
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/utils/Os.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package kafka.utils

import java.util.Locale

object Os {
val name = System.getProperty("os.name").toLowerCase
val name = System.getProperty("os.name").toLowerCase(Locale.ROOT)
val isWindows = name.startsWith("windows")
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.junit.Test

import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
import java.util.Locale

/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest {
Expand Down Expand Up @@ -606,7 +607,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
for (i <- 0 until numRecords) {
val record = records.get(i)
assertEquals(s"key $i", new String(record.key()))
assertEquals(s"value $i$appendStr".toUpperCase, new String(record.value()))
assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new String(record.value()))
}

// commit sync and verify onCommit is called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

public class KafkaConsumerProducerDemo {
public static void main(String[] args) {
boolean isAsync = args.length == 0 || !args[0].trim().toLowerCase().equals("sync");
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
producerThread.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

/**
Expand Down Expand Up @@ -63,7 +64,7 @@ public static void main(String[] args) throws Exception {
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase().split(" "));
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;

import java.util.Locale;
import java.util.Properties;

/**
Expand Down Expand Up @@ -63,7 +64,7 @@ public void init(ProcessorContext context) {

@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
String[] words = line.toLowerCase(Locale.getDefault()).split(" ");

for (String word : words) {
Integer oldValue = this.kvStore.get(word);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.Locale;

import static org.junit.Assert.assertEquals;

Expand All @@ -42,7 +43,7 @@ public void testFlatMapValues() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> result = new ArrayList<String>();
result.add(value.toLowerCase());
result.add(value.toLowerCase(Locale.ROOT));
result.add(value);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.test.KStreamTestDriver;
import org.junit.Test;
import java.util.List;
import java.util.Locale;
import java.util.ArrayList;
import java.util.Arrays;

Expand Down Expand Up @@ -60,7 +61,7 @@ public void testForeach() {
new ForeachAction<Integer, String>() {
@Override
public void apply(Integer key, String value) {
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
}
};

Expand Down
Loading

0 comments on commit 9d71489

Please sign in to comment.