Skip to content

Commit

Permalink
KAFKA-5362: Add EOS system tests for Streams API
Browse files Browse the repository at this point in the history
Author: Matthias J. Sax <[email protected]>

Reviewers: Damian Guy <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>

Closes apache#3201 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api
  • Loading branch information
mjsax authored and guozhangwang committed Jun 8, 2017
1 parent 21194a6 commit ba07d82
Show file tree
Hide file tree
Showing 7 changed files with 813 additions and 2 deletions.
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@
<allow pkg="org.I0Itec.zkclient" />
</subpackage>

<subpackage name="test">
<allow pkg="kafka.admin" />
</subpackage>

<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ private void runLoop() {
timerStartedMs = time.milliseconds();

// try to fetch some records if necessary
final ConsumerRecords<byte[], byte[]> records = pollRequests(pollTimeMs);
final ConsumerRecords<byte[], byte[]> records = pollRequests();
if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
addRecordsToTasks(records);
Expand All @@ -573,7 +573,7 @@ private void runLoop() {
* Get the next batch of records by polling.
* @return Next batch of records or null if no records available.
*/
private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
private ConsumerRecords<byte[], byte[]> pollRequests() {
ConsumerRecords<byte[], byte[]> records = null;

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class EosTestClient extends SmokeTestUtil {

static final String APP_ID = "EosTest";
private final String kafka;
private final File stateDir;
private KafkaStreams streams;
private boolean uncaughtException;

EosTestClient(final File stateDir, final String kafka) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
}

private boolean isRunning = true;

public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
isRunning = false;
streams.close(5, TimeUnit.SECONDS);
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println("EOS-TEST-CLIENT-CLOSED");
}
}
}));

while (isRunning) {
if (streams == null) {
uncaughtException = false;

streams = createKafkaStreams(stateDir, kafka);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("EOS-TEST-CLIENT-EXCEPTION");
e.printStackTrace();
uncaughtException = true;
}
});
streams.start();
}
if (uncaughtException) {
streams.close(5, TimeUnit.SECONDS);
streams = null;
}
sleep(1000);
}
}

private static KafkaStreams createKafkaStreams(final File stateDir,
final String kafka) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());


final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, Integer> data = builder.stream("data");

data.to("echo");
data.process(SmokeTestUtil.printProcessorSupplier("data"));

final KGroupedStream<String, Integer> groupedData = data.groupByKey();
// min
groupedData
.aggregate(
new Initializer<Integer>() {
@Override
public Integer apply() {
return Integer.MAX_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(final String aggKey,
final Integer value,
final Integer aggregate) {
return (value < aggregate) ? value : aggregate;
}
},
intSerde,
"min")
.to(stringSerde, intSerde, "min");

// sum
groupedData.aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
},
new Aggregator<String, Integer, Long>() {
@Override
public Long apply(final String aggKey,
final Integer value,
final Long aggregate) {
return (long) value + aggregate;
}
},
longSerde,
"sum")
.to(stringSerde, longSerde, "sum");

return new KafkaStreams(builder, props);
}

}
Loading

0 comments on commit ba07d82

Please sign in to comment.