Skip to content

Commit

Permalink
[hotfix] [kafka consumer] Increase Kafka test stability by validating…
Browse files Browse the repository at this point in the history
… written data before consuming
  • Loading branch information
StephanEwen committed Apr 12, 2016
1 parent af79988 commit d20eda1
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -33,7 +31,6 @@
import org.junit.Test;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -91,16 +88,11 @@ public void testFailOnDeploy() throws Exception {

@Test(timeout = 60000)
public void testInvalidOffset() throws Exception {
final String topic = "invalidOffsetTopic";

final int parallelism = 1;

// create topic
createTestTopic(topic, parallelism, 1);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);


// write 20 messages into topic:
writeSequence(env, topic, 20, parallelism);
final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);

// set invalid offset:
CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
Expand All @@ -110,6 +102,10 @@ public void testInvalidOffset() throws Exception {
// read from topic
final int valuesCount = 20;
final int startFrom = 0;

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();

readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);

deleteTestTopic(topic);
Expand Down Expand Up @@ -193,10 +189,10 @@ public void runOffsetManipulationInZooKeeperTest() {
*/
@Test(timeout = 60000)
public void testOffsetInZookeeper() throws Exception {
final String topicName = "testOffsetInZK";
final int parallelism = 3;

createTestTopic(topicName, parallelism, 1);
// write a sequence from 0 to 99 to each of the 3 partitions.
final String topicName = writeSequence("testOffsetInZK", 100, parallelism, 1);

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env1.getConfig().disableSysoutLogging();
Expand All @@ -210,16 +206,7 @@ public void testOffsetInZookeeper() throws Exception {
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env2.setParallelism(parallelism);

StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env3.getConfig().disableSysoutLogging();
env3.enableCheckpointing(50);
env3.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env3.setParallelism(parallelism);

// write a sequence from 0 to 99 to each of the 3 partitions.
writeSequence(env1, topicName, 100, parallelism);

readSequence(env2, standardProps, parallelism, topicName, 100, 0);
readSequence(env1, standardProps, parallelism, topicName, 100, 0);

CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();

Expand All @@ -243,33 +230,23 @@ public void testOffsetInZookeeper() throws Exception {
curatorClient.close();

// create new env
readSequence(env3, standardProps, parallelism, topicName, 50, 50);
readSequence(env2, standardProps, parallelism, topicName, 50, 50);

deleteTestTopic(topicName);
}

@Test(timeout = 60000)
public void testOffsetAutocommitTest() throws Exception {
final String topicName = "testOffsetAutocommit";
final int parallelism = 3;

createTestTopic(topicName, parallelism, 1);

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env1.getConfig().disableSysoutLogging();
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env1.setParallelism(parallelism);

StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
// NOTE: We are not enabling the checkpointing!
env2.getConfig().disableSysoutLogging();
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env2.setParallelism(parallelism);


// write a sequence from 0 to 99 to each of the 3 partitions.
writeSequence(env1, topicName, 100, parallelism);
final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
// NOTE: We are not enabling the checkpointing!
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(parallelism);

// the readSequence operation sleeps for 20 ms between each record.
// setting a delay of 25*20 = 500 for the commit interval makes
Expand All @@ -280,7 +257,7 @@ public void testOffsetAutocommitTest() throws Exception {
readProps.setProperty("auto.commit.interval.ms", "500");

// read so that the offset can be committed to ZK
readSequence(env2, readProps, parallelism, topicName, 100, 0);
readSequence(env, readProps, parallelism, topicName, 100, 0);

// get the offset
CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
Expand Down Expand Up @@ -314,19 +291,10 @@ public void testOffsetAutocommitTest() throws Exception {
*/
@Test(timeout = 60000)
public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
final String topicName = "testKafkaOffsetToZk";
final int parallelism = 3;

createTestTopic(topicName, parallelism, 1);

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env1.getConfig().disableSysoutLogging();
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env1.setParallelism(parallelism);

// write a sequence from 0 to 49 to each of the 3 partitions.
writeSequence(env1, topicName, 50, parallelism);

final String topicName = writeSequence("testKafkaOffsetToZk", 50, parallelism, 1);

final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env2.getConfig().disableSysoutLogging();
Expand Down
Loading

0 comments on commit d20eda1

Please sign in to comment.