Skip to content

Commit

Permalink
KAFKA-3020; Ensure CheckStyle runs on all Java code
Browse files Browse the repository at this point in the history
- Adds CheckStyle to core and examples modules
- Fixes any existing CheckStyle issues

Author: Grant Henke <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes apache#703 from granthenke/checkstyle-core
  • Loading branch information
granthenke authored and ewencp committed Dec 22, 2015
1 parent a0d2140 commit 64b746b
Show file tree
Hide file tree
Showing 9 changed files with 720 additions and 648 deletions.
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ project(':core') {
println "Building project 'core' with Scala version $resolvedScalaVersion"

apply plugin: 'scala'
apply plugin: 'checkstyle'
archivesBaseName = "kafka_${baseScalaVersion}"

dependencies {
Expand Down Expand Up @@ -334,8 +335,6 @@ project(':core') {
into 'site-docs'
}



tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
into "kafka_${baseScalaVersion}-${version}"
compression = Compression.GZIP
Expand Down Expand Up @@ -390,15 +389,27 @@ project(':core') {
}
into "$buildDir/dependant-testlibs"
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':examples') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-examples"

dependencies {
compile project(':core')
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':clients') {
Expand Down
69 changes: 69 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->

<import-control pkg="kafka">

<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->

<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="scala" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.easymock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />

<allow pkg="kafka.common" />
<allow pkg="kafka.utils" />
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />

<subpackage name="javaapi">
<subpackage name="consumer">
<allow pkg="kafka.consumer" />
</subpackage>

<subpackage name="message">
<allow pkg="kafka.message" />
</subpackage>

<subpackage name="producer">
<allow pkg="kafka.producer" />
</subpackage>
</subpackage>

<subpackage name="tools">
<allow pkg="kafka.javaapi" />
<allow pkg="kafka.producer" />
<allow pkg="kafka.consumer" />
<allow pkg="joptsimple" />
</subpackage>

<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="kafka.api" />
<allow pkg="kafka.javaapi" />
<allow pkg="kafka.message" />
</subpackage>

</import-control>
112 changes: 56 additions & 56 deletions core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,72 @@

package kafka.javaapi.consumer;


import java.util.List;
import java.util.Map;

import kafka.common.OffsetAndMetadata;
import kafka.common.TopicAndPartition;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.serializer.Decoder;

import java.util.List;
import java.util.Map;

public interface ConsumerConnector {
/**
* Create a list of MessageStreams of type T for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of MessageStreams of type T for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
public <K, V> Map<String, List<KafkaStream<K, V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

/**
* Create a list of MessageAndTopicStreams containing messages of type T.
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
public <K, V> List<KafkaStream<K, V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

/**
* Create a list of MessageAndTopicStreams containing messages of type T.
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all broker partitions connected by this connector.
*/
public void commitOffsets();

/**
* Commit the offsets of all broker partitions connected by this connector.
*/
public void commitOffsets();
public void commitOffsets(boolean retryOnFailure);
public void commitOffsets(boolean retryOnFailure);

/**
* Commit offsets using the provided offsets map
*
* @param offsetsToCommit a map containing the offset to commit for each partition.
* @param retryOnFailure enable retries on the offset commit if it fails.
*/
public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
/**
* Commit offsets using the provided offsets map
*
* @param offsetsToCommit a map containing the offset to commit for each partition.
* @param retryOnFailure enable retries on the offset commit if it fails.
*/
public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);

/**
* Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
* @param listener The consumer rebalance listener to wire in
*/
public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
/**
* Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
* @param listener The consumer rebalance listener to wire in
*/
public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);

/**
* Shut down the connector
*/
public void shutdown();
/**
* Shut down the connector
*/
public void shutdown();
}
Loading

0 comments on commit 64b746b

Please sign in to comment.