Skip to content

Commit

Permalink
[FLINK-3058] Add support for Kafka 0.9.0.0
Browse files Browse the repository at this point in the history
For adding Kafka 0.9.0.0 support, this commit changes the following:
- Split up of the kafka connector into a flink-connector-kafka-(base|0.9|0.8) with different dependencies
- The base package contains common test cases, classes and implementations (the producer for 0.9 and 0.8 relies on exactly the same code)
- the 0.8 package contains a kafka connector implementation against the SimpleConsumer (low level) API of Kafka 0.8. There are some additional tests for the ZK offset committing
- The 0.9 package relies on the new Consumer API of Kafka 0.9.0.0
- Support for metrics for all producers and the 0.9 consumer through Flink's accumulators.

This closes apache#1489
  • Loading branch information
rmetzger committed Jan 20, 2016
1 parent 83fb2fa commit 81320c1
Show file tree
Hide file tree
Showing 81 changed files with 3,894 additions and 1,291 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/gen
flink-runtime-web/web-dashboard/node_modules/
flink-runtime-web/web-dashboard/bower_components/
atlassian-ide-plugin.xml
out/
67 changes: 47 additions & 20 deletions docs/apis/streaming/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ exactly-once processing semantics. To achieve that, Flink does not purely rely o
offset tracking, but tracks and checkpoints these offsets internally as well.

Please pick a package (maven artifact id) and class name for your use-case and environment.
For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate.
For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is appropriate.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left">Maven Dependency</th>
<th class="text-left">Supported since</th>
<th class="text-left">Class name</th>
<th class="text-left">Consumer and <br>
Producer Class name</th>
<th class="text-left">Kafka version</th>
<th class="text-left">Notes</th>
</tr>
Expand All @@ -50,17 +51,27 @@ For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is
<tr>
<td>flink-connector-kafka</td>
<td>0.9.1, 0.10</td>
<td>FlinkKafkaConsumer081</td>
<td>0.8.1</td>
<td>FlinkKafkaConsumer082<br>
FlinkKafkaProducer</td>
<td>0.8.x</td>
<td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
<tr>
<td>flink-connector-kafka</td>
<td>0.9.1, 0.10</td>
<td>FlinkKafkaConsumer082</td>
<td>0.8.2</td>
<tr>
<td>flink-connector-kafka-0.8</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer08<br>
FlinkKafkaProducer08</td>
<td>0.8.x</td>
<td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
<tr>
<td>flink-connector-kafka-0.9</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer09<br>
FlinkKafkaProducer09</td>
<td>0.9.x</td>
<td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td>
</tr>
</tbody>
</table>

Expand All @@ -69,7 +80,7 @@ Then, import the connector in your maven project:
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<artifactId>flink-connector-kafka-0.8</artifactId>
<version>{{site.version }}</version>
</dependency>
{% endhighlight %}
Expand All @@ -84,14 +95,16 @@ Note that the streaming connectors are currently not part of the binary distribu

#### Kafka Consumer

The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor:
Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09`). It provides access to one or more Kafka topics.

The constructor accepts the following arguments:

1. The topic name
2. A DeserializationSchema
1. The topic name / list of topic names
2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the data from Kafka
3. Properties for the Kafka consumer.
The following properties are required:
- "bootstrap.servers" (comma separated list of Kafka brokers)
- "zookeeper.connect" (comma separated list of Zookeeper servers)
- "zookeeper.connect" (comma separated list of Zookeeper servers) (**only required for Kafka 0.8**)
- "group.id" the id of the consumer group

Example:
Expand All @@ -101,26 +114,40 @@ Example:
{% highlight java %}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties))
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties))
.print();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print
{% endhighlight %}
</div>
</div>


##### The `DeserializationSchema`

The `FlinkKafkaConsumer08` needs to know how to turn the data in Kafka into Java objects. The
`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
method gets called for each Kafka message, passing the value from Kafka.
For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has
the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`.

For convenience, Flink provides a `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`)
which creates a schema based on a Flink `TypeInformation`.

#### Kafka Consumers and Fault Tolerance

With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
Expand Down Expand Up @@ -155,20 +182,20 @@ If checkpointing is not enabled, the Kafka consumer will periodically commit the

#### Kafka Producer

The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
recors to partitions.
The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
records to partitions.

Example:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
{% endhighlight %}
</div>
</div>
Expand Down
4 changes: 2 additions & 2 deletions docs/apis/streaming/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ Collection-based:
Custom:

- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use
`addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.
`addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.

</div>

Expand Down Expand Up @@ -1682,7 +1682,7 @@ Collection-based:
Custom:

- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use
`addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.
`addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details.

</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ private static void validateInputType(Type t, TypeInformation<?> inType) {
validateInfo(typeHierarchy, t, inType);
}
catch(InvalidTypesException e) {
throw new InvalidTypesException("Input mismatch: " + e.getMessage());
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}

Expand All @@ -840,7 +840,7 @@ private static void validateInputType(Class<?> baseClass, Class<?> clazz, int in
validateInfo(typeHierarchy, inType, inTypeInfo);
}
catch(InvalidTypesException e) {
throw new InvalidTypesException("Input mismatch: " + e.getMessage());
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
Expand All @@ -58,6 +60,8 @@
@ChannelHandler.Sharable
public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {

private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);

private static final Charset ENCODING = Charset.forName("UTF-8");

public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
Expand Down Expand Up @@ -143,12 +147,14 @@ private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGate
: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
LOG.warn("Error while handling request", e);
}
catch (Exception e) {
byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
LOG.warn("Error while handling request", e);
}

response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-connector-kafka</artifactId>
<name>flink-connector-kafka</name>
<artifactId>flink-connector-kafka-0.8</artifactId>
<name>flink-connector-kafka-0.8</name>

<packaging>jar</packaging>

Expand All @@ -42,6 +42,21 @@ under the License.

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down Expand Up @@ -92,9 +107,6 @@ under the License.
</exclusions>
</dependency>

<!-- force using the latest zkclient -->


<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -121,9 +133,16 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>



<build>
<plugins>
Expand Down
Loading

0 comments on commit 81320c1

Please sign in to comment.