Skip to content

Commit

Permalink
[FLINK-6107] Enable LeftCurly check in streaming checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 26, 2017
1 parent dceb913 commit b24c917
Show file tree
Hide file tree
Showing 15 changed files with 25 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
@SuppressWarnings({"unchecked", "rawtypes"})
public DirectedOutput(
List<OutputSelector<OUT>> outputSelectors,
List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs)
{
List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
this.outputSelectors = outputSelectors.toArray(new OutputSelector[outputSelectors.size()]);

this.allOutputs = new Output[outputs.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1709,8 +1709,7 @@ public static StreamExecutionEnvironment createRemoteEnvironment(
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, int parallelism, String... jarFiles)
{
String host, int port, int parallelism, String... jarFiles) {
RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
env.setParallelism(parallelism);
return env;
Expand Down Expand Up @@ -1738,8 +1737,7 @@ public static StreamExecutionEnvironment createRemoteEnvironment(
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, Configuration clientConfig, String... jarFiles)
{
String host, int port, Configuration clientConfig, String... jarFiles) {
return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public SocketClientSink(String hostName, int port, SerializationSchema<IN> schem
* @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
int maxNumRetries, boolean autoflush)
{
int maxNumRetries, boolean autoflush) {
checkArgument(port > 0 && port < 65536, "port is out of range");
checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public List<String> getSelectedNames() {
return selectedNames;
}

public OutputTag getOutputTag() { return this.outputTag; }
public OutputTag getOutputTag() {
return this.outputTag;
}

public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public void open() throws Exception {
}

try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais))
{
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
initialValue = outTypeSerializer.deserialize(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ protected AbstractAlignedProcessingTimeWindowOperator(
TypeSerializer<KEY> keySerializer,
TypeSerializer<STATE> stateTypeSerializer,
long windowLength,
long windowSlide)
{
long windowSlide) {
super(function);

if (windowLength < MIN_SLIDE_TIME) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traver
public void writeToOutput(
final DataOutputView output,
final TypeSerializer<Key> keySerializer,
final TypeSerializer<Aggregate> aggSerializer) throws IOException
{
final TypeSerializer<Aggregate> aggSerializer) throws IOException {
output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);

int numPanes = getNumPanes();
Expand All @@ -125,8 +124,7 @@ public void writeToOutput(
public void readFromInput(
final DataInputView input,
final TypeSerializer<Key> keySerializer,
final TypeSerializer<Aggregate> aggSerializer) throws IOException
{
final TypeSerializer<Aggregate> aggSerializer) throws IOException {
validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
int numPanes = input.readInt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void addElementToLatestPane(Type element) throws Exception {

@Override
public void evaluateWindow(Collector<Result> out, final TimeWindow window,
AbstractStreamOperator<Result> operator) throws Exception
{
AbstractStreamOperator<Result> operator) throws Exception {
if (previousPanes.isEmpty()) {
// optimized path for single pane case (tumbling window)
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public AccumulatingProcessingTimeWindowOperator(
TypeSerializer<KEY> keySerializer,
TypeSerializer<IN> valueSerializer,
long windowLength,
long windowSlide)
{
long windowSlide) {
super(function, keySelector, keySerializer,
new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public AggregatingProcessingTimeWindowOperator(
TypeSerializer<KEY> keySerializer,
TypeSerializer<IN> aggregateSerializer,
long windowLength,
long windowSlide)
{
long windowSlide) {
super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ public static <K, V> void traverseMaps(
final KeyMap<K, V>[] maps,
final TraversalEvaluator<K, V> visitor,
final long touchedTag)
throws Exception
{
throws Exception {
// we need to work on the maps in descending size
Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);

Expand All @@ -524,8 +523,7 @@ public static <K, V> void traverseMaps(
int mask;
for (int rootTable = 0;
rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask;
rootTable++)
{
rootTable++) {
// use that table to gather keys and start collecting keys from the following tables
// go over all entries of that slot in the table
Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]];
Expand Down Expand Up @@ -563,8 +561,7 @@ private static <K, V> void addEntriesFromChain(
TraversalEvaluator<K, V> visitor,
K key,
long touchedTag,
int hashCode) throws Exception
{
int hashCode) throws Exception {
while (entry != null) {
if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) {
entry.touchedTag = touchedTag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ private <T> Output<StreamRecord<T>> createOutputCollector(
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators)
{
List<StreamOperator<?>> allOperators) {
List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

// create collectors for the network outputs
Expand Down Expand Up @@ -326,8 +325,7 @@ private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag)
{
OutputTag<IN> outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
Output<StreamRecord<OUT>> output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ public void quiesceAndAwaitPending() throws InterruptedException {
@Override
public void shutdownService() {
if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
{
status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) {
timerService.shutdownNow();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ else if (typeInfo instanceof PrimitiveArrayTypeInfo) {


public static <X, K> KeySelector<X, K> getSelectorForOneKey(
Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
{
Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
if (!(typeInfo instanceof CompositeType)) {
throw new InvalidTypesException(
"This key operation requires a composite type such as Tuples, POJOs, case classes, etc");
Expand Down
8 changes: 4 additions & 4 deletions tools/maven/strict-checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ This file is based on the checkstyle file of Apache Beam.
<!--value="^(package .*;\s*)|(import .*;\s*)|( *\* .*https?://.*)$"/>-->
<!--</module>-->

<!--<module name="LeftCurly">-->
<!--&lt;!&ndash; Checks for placement of the left curly brace ('{'). &ndash;&gt;-->
<!--<property name="severity" value="error"/>-->
<!--</module>-->
<module name="LeftCurly">
<!-- Checks for placement of the left curly brace ('{'). -->
<property name="severity" value="error"/>
</module>

<!--<module name="RightCurly">-->
<!--&lt;!&ndash; Checks right curlies on CATCH, ELSE, and TRY blocks are on-->
Expand Down

0 comments on commit b24c917

Please sign in to comment.