Skip to content

Commit

Permalink
[FLINK-1753] Extend KafkaITCase with large messages test + rework Per…
Browse files Browse the repository at this point in the history
…sistentKafkaSource.

This closes apache#603
  • Loading branch information
rmetzger committed Apr 16, 2015
1 parent 4754a97 commit 354922b
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,28 @@

public class FunctionUtils {

public static void openFunction (Function function, Configuration parameters) throws Exception{
public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open (parameters);
richFunction.open(parameters);
}
}

public static void closeFunction (Function function) throws Exception{
public static void closeFunction(Function function) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.close ();
richFunction.close();
}
}

public static void setFunctionRuntimeContext (Function function, RuntimeContext context){
public static void setFunctionRuntimeContext(Function function, RuntimeContext context){
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.setRuntimeContext(context);
}
}

public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
public static RuntimeContext getFunctionRuntimeContext(Function function, RuntimeContext defaultContext){
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
return richFunction.getRuntimeContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception {

// get input data
DataSet<String> text = getTextDataSet(env);

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.io.DataInputStream;
import java.io.IOException;

class TestInputView extends DataInputStream implements DataInputView {
public class ByteArrayInputView extends DataInputStream implements DataInputView {

public TestInputView(byte[] data) {
public ByteArrayInputView(byte[] data) {
super(new ByteArrayInputStream(data));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -35,6 +36,9 @@ public ConnectorSource(DeserializationSchema<OUT> schema) {

@Override
public TypeInformation<OUT> getType() {
if(schema instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) schema).getProducedType();
}
return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
null, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

import java.io.IOException;

public class Utils {
public static class TypeInformationSerializationSchema<T>
implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
private final TypeSerializer<T> serializer;
private final TypeInformation<T> ti;

public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
this.serializer = ti.createSerializer(ec);
}
@Override
public T deserialize(byte[] message) {
try {
return serializer.deserialize(new ByteArrayInputView(message));
} catch (IOException e) {
throw new RuntimeException("Unable to deserialize message", e);
}
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public byte[] serialize(T element) {
DataOutputSerializer dos = new DataOutputSerializer(16);
try {
serializer.serialize(element, dos);
} catch (IOException e) {
throw new RuntimeException("Unable to serialize record", e);
}
return dos.getByteArray();
}

@Override
public TypeInformation<T> getProducedType() {
return ti;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.flink.streaming.connectors.kafka.api.simple;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.google.common.base.Preconditions;

import kafka.consumer.ConsumerConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
Expand All @@ -33,7 +38,6 @@
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
Expand All @@ -50,25 +54,19 @@
public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);

public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";

private final String topicId;
private final String zookeeperServerAddress;
private final int zookeeperSyncTimeMillis;
private final int waitOnEmptyFetchMillis;
private final KafkaOffset startingOffset;

private int connectTimeoutMs = 100000;
private int bufferSize = 64 * 1024;
private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.

private transient KafkaConsumerIterator iterator;
private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;

private transient Map<Integer, KafkaOffset> partitions;

private volatile boolean isRunning = false;

/**
* Creates a persistent Kafka source that consumes a topic.
* If there is are no new messages on the topic, this consumer will wait
Expand Down Expand Up @@ -109,11 +107,14 @@ public PersistentKafkaSource(String zookeeperAddress, String topicId,
this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
}


/**
* Creates a persistent Kafka source that consumes a topic.
* If there is are no new messages on the topic, this consumer will wait
* waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
*
* THIS CONSTRUCTOR IS DEPRECATED: USE the constructor with the ConsumerConfig.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
Expand All @@ -127,18 +128,44 @@ public PersistentKafkaSource(String zookeeperAddress, String topicId,
* @param startOffsetType
* The offset to start from (beginning or current).
*/
public PersistentKafkaSource(String zookeeperAddress, String topicId,
DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
int waitOnEmptyFetchMillis, Offset startOffsetType) {
@Deprecated
public PersistentKafkaSource(String zookeeperAddress, String topicId,DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis, Offset startOffsetType) {
this(topicId, deserializationSchema, startOffsetType, legacyParametersToConsumerConfig(zookeeperAddress, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis));
}

private static ConsumerConfig legacyParametersToConsumerConfig(String zookeeperAddress, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
Properties props = new Properties();
props.setProperty("zookeeper.sync.time.ms", Integer.toString(zookeeperSyncTimeMillis));
props.setProperty(WAIT_ON_EMPTY_FETCH_KEY, Integer.toString(waitOnEmptyFetchMillis));
props.setProperty("zookeeper.connect", zookeeperAddress);
props.setProperty("group.id", "flink-persistent-kafka-source");
return new ConsumerConfig(props);
}

/**
* Creates a persistent Kafka source that consumes a topic.
* If there is are no new messages on the topic, this consumer will wait
* waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
*
* @param topicId
* ID of the Kafka topic.
* @param deserializationSchema
* User defined deserialization schema.
* @param startOffsetType
* The offset to start from (beginning or current).
* @param consumerConfig
* Additional configuration for the PersistentKafkaSource.
* NOTE: This source will only respect certain configuration values from the config!
*/
public PersistentKafkaSource(String topicId, DeserializationSchema<OUT> deserializationSchema, Offset startOffsetType, ConsumerConfig consumerConfig) {
super(deserializationSchema);
Preconditions.checkNotNull(zookeeperAddress, "The Zookeeper address can not be null");
Preconditions.checkNotNull(topicId, "The topic id can not be null");
Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
Preconditions.checkArgument(zookeeperSyncTimeMillis > 0, "The sync time must be positive");
Preconditions.checkArgument(waitOnEmptyFetchMillis > 0, "The wait time must be positive");
Preconditions.checkNotNull(consumerConfig, "ConsumerConfig can not be null");

this.consumerConfig = consumerConfig;

this.topicId = topicId;
this.zookeeperServerAddress = zookeeperAddress;

switch (startOffsetType) {
case FROM_BEGINNING:
Expand All @@ -151,20 +178,17 @@ public PersistentKafkaSource(String zookeeperAddress, String topicId,
this.startingOffset = new CurrentOffset();
break;
}

this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
this.waitOnEmptyFetchMillis = waitOnEmptyFetchMillis;
}

@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws InterruptedException {
LOG.info("Starting PersistentKafkaSource");
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
int indexOfSubtask = context.getIndexOfThisSubtask();
int numberOfSubtasks = context.getNumberOfParallelSubtasks();

KafkaTopicUtils kafkaTopicUtils =
new KafkaTopicUtils(zookeeperServerAddress, zookeeperSyncTimeMillis, zookeeperSyncTimeMillis);
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(consumerConfig.zkConnect(), consumerConfig.zkSyncTimeMs(), consumerConfig.zkConnectionTimeoutMs());

int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);

Expand All @@ -187,10 +211,10 @@ public void open(Configuration parameters) throws InterruptedException {
context.registerState("kafka", kafkaOffSet);
}

iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize);
iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);

if (LOG.isInfoEnabled()) {
LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.",
LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
}
}
Expand All @@ -200,9 +224,8 @@ public void open(Configuration parameters) throws InterruptedException {

@Override
public void run(Collector<OUT> collector) throws Exception {
isRunning = true;
MessageWithMetadata msg;
while (isRunning && iterator.hasNext()) {
while (iterator.hasNext()) {
msg = iterator.nextWithOffset();
OUT out = schema.deserialize(msg.getMessage());

Expand All @@ -218,17 +241,21 @@ public void run(Collector<OUT> collector) throws Exception {
}
}

public void setConnectTimeoutMs(int connectTimeoutMs) {
Preconditions.checkArgument(connectTimeoutMs > 0, "The timeout must be positive");
this.connectTimeoutMs = connectTimeoutMs;
@Override
public void cancel() {
LOG.info("PersistentKafkaSource has been cancelled");
}

public void setBufferSize(int bufferSize) {
Preconditions.checkArgument(connectTimeoutMs > 0, "The buffer size must be positive");
this.bufferSize = bufferSize;
private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
out.writeObject(consumerConfig.props().props());
}

@Override
public void cancel() {
private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
Properties props = (Properties) in.readObject();
consumerConfig = new ConsumerConfig(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.List;
import java.util.Map;

import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,24 +34,22 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);

protected List<KafkaSinglePartitionIterator> partitions;
protected final int waitOnEmptyFetch;
protected final ConsumerConfig consumerConfig;

public KafkaMultiplePartitionsIterator(String topic,
Map<Integer, KafkaOffset> partitionsWithOffset,
KafkaTopicUtils kafkaTopicUtils,
int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());

this.waitOnEmptyFetch = waitOnEmptyFetch;
this.consumerConfig = consumerConfig;

for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
partitions.add(new KafkaSinglePartitionIterator(
topic,
partitionWithOffset.getKey(),
partitionWithOffset.getValue(),
kafkaTopicUtils,
connectTimeoutMs,
bufferSize));
this.consumerConfig));
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ public MessageWithMetadata nextWithOffset() throws InterruptedException {
// do not wait if a new message has been fetched
if (!gotNewMessage) {
try {
Thread.sleep(waitOnEmptyFetch);
Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for new messages", e);
}
Expand Down
Loading

0 comments on commit 354922b

Please sign in to comment.