Skip to content

Commit

Permalink
[FLINK-25795][python][connector/pulsar] Support pulsar sink connector…
Browse files Browse the repository at this point in the history
… in PyFlink DataStream API

This closes apache#19682.
  • Loading branch information
a49a authored and dianfu committed May 19, 2022
1 parent d38b7d1 commit a70e289
Show file tree
Hide file tree
Showing 6 changed files with 613 additions and 19 deletions.
78 changes: 75 additions & 3 deletions docs/content.zh/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ Pulsar Sink 使用 builder 类来创建 `PulsarSink` 实例。

下面示例展示了如何通过 Pulsar Sink 以“至少一次”的语义将字符串类型的数据发送给 topic1。

{{< tabs "46e225b1-1e34-4ff3-890c-aa06a2b99c0a" >}}
{{< tab "Java" >}}

```java
DataStream<String> stream = ...

Expand All @@ -358,6 +361,26 @@ PulsarSink<String> sink = PulsarSink.builder()
stream.sinkTo(sink);
```

{{< /tab >}}
{{< tab "Python" >}}

```python
stream = ...

pulsar_sink = PulsarSink.builder() \
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topics("topic1") \
.set_serialization_schema(PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()

stream.sink_to(pulsar_sink)
```

{{< /tab >}}
{{< /tabs >}}

下列为创建一个 `PulsarSink` 实例必需的属性:

- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
Expand All @@ -371,17 +394,37 @@ stream.sinkTo(sink);

`PulsarSink` 指定写入 Topic 的方式和 Pulsar Source [指定消费的 Topic 或者 Topic 分区](#指定消费的-topic-或者-topic-分区)的方式类似。`PulsarSink` 支持以 mixin 风格指定写入的 Topic 或分区。因此,可以指定一组 Topic 或者分区或者是两者都有。

{{< tabs "3d452e6b-bffd-42f7-bb91-974b306262ca" >}}
{{< tab "Java" >}}

```java
// Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().setTopics("some-topic1", "some-topic2")

// Topic "topic-a" 的分区 0 和 2
// Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")

// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
```

{{< /tab >}}
{{< tab "Python" >}}

```python
# Topic "some-topic1" 和 "some-topic2"
PulsarSink.builder().set_topics(["some-topic1", "some-topic2"])

# Topic "topic-a" 的分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])

# Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "some-topic2"])
```

{{< /tab >}}
{{< /tabs >}}

动态分区发现默认处于开启状态,这意味着 `PulsarSink` 将会周期性地从 Pulsar 集群中查询 Topic 的元数据来获取可能有的分区数量变更信息。使用 `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 配置项来指定查询的间隔时间。

可以选择实现 `TopicRouter` 接口来自定义[消息路由策略](#消息路由策略)。此外,阅读 [Topic 名称简写](#topic-名称简写)将有助于理解 Pulsar 的分区在 Pulsar 连接器中的配置方式。
Expand Down Expand Up @@ -410,10 +453,24 @@ PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "so
PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
```
- 使用 Flink 的 `SerializationSchema` 来序列化数据。

{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d8" >}}
{{< tab "Java" >}}

```java
PulsarSerializationSchema.flinkSchema(SerializationSchema)
```

{{< /tab >}}
{{< tab "Python" >}}

```python
PulsarSerializationSchema.flink_schema(SimpleStringSchema())
```

{{< /tab >}}
{{< /tabs >}}

同时使用 `PulsarSerializationSchema.pulsarSchema()` 以及在 builder 中指定 `PulsarSinkBuilder.enableSchemaEvolution()` 可以启用 [Schema evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/#schema-evolution) 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。

```java
Expand Down Expand Up @@ -619,7 +676,22 @@ Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 `ProducerConfigurat

默认情况下,Pulsar 生产者每隔 60 秒才会刷新一次监控数据,然而 Pulsar Sink 每 500 毫秒就会从 Pulsar 生产者中获得最新的监控数据。因此 `numRecordsOut``numBytesOut``numAcksReceived` 以及 `numRecordsOutErrors` 4 个指标实际上每 60 秒才会刷新一次。

如果想要更高地刷新评率,可以通过 `builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)` 来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s)。
如果想要更高地刷新评率,可以通过如下方式来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s):
{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d1" >}}

{{< tab "Java" >}}
```java
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
```
{{< /tab >}}

{{< tab "Python" >}}
```python
builder.set_config("pulsar.client.statsIntervalSeconds", "1")
```
{{< /tab >}}

{{< /tabs >}}

`numBytesOutRate``numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut``numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。

Expand Down
73 changes: 71 additions & 2 deletions docs/content/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous
The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.

{{< tabs "46e225b1-1e34-4ff3-890c-aa06a2b99c0a" >}}
{{< tab "Java" >}}

```java
DataStream<String> stream = ...

Expand All @@ -433,10 +436,30 @@ PulsarSink<String> sink = PulsarSink.builder()
.setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

stream.sinkTo(sink);
```

{{< /tab >}}
{{< tab "Python" >}}

```python
stream = ...

pulsar_sink = PulsarSink.builder() \
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topics("topic1") \
.set_serialization_schema(PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.build()

stream.sink_to(pulsar_sink)
```

{{< /tab >}}
{{< /tabs >}}

The following properties are **required** for building PulsarSink:

- Pulsar service url, configured by `setServiceUrl(String)`
Expand All @@ -454,6 +477,9 @@ Defining the topics for producing is similar to the [topic-partition subscriptio
in the Pulsar source. We support a mix-in style of topic setting. You can provide a list of topics,
partitions, or both of them.

{{< tabs "3d452e6b-bffd-42f7-bb91-974b306262ca" >}}
{{< tab "Java" >}}

```java
// Topic "some-topic1" and "some-topic2"
PulsarSink.builder().setTopics("some-topic1", "some-topic2")
Expand All @@ -465,6 +491,23 @@ PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
```

{{< /tab >}}
{{< tab "Python" >}}

```python
# Topic "some-topic1" and "some-topic2"
PulsarSink.builder().set_topics(["some-topic1", "some-topic2"])

# Partition 0 and 2 of topic "topic-a"
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2"])

# Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "some-topic2"])
```

{{< /tab >}}
{{< /tabs >}}

The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval.
You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` option to change the discovery interval option.

Expand Down Expand Up @@ -500,10 +543,24 @@ you can use the predefined `PulsarSerializationSchema`. The Pulsar sink provides
PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
```
- Encode the message by using Flink's `SerializationSchema`

{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d8" >}}
{{< tab "Java" >}}

```java
PulsarSerializationSchema.flinkSchema(SerializationSchema)
```

{{< /tab >}}
{{< tab "Python" >}}

```python
PulsarSerializationSchema.flink_schema(SimpleStringSchema())
```

{{< /tab >}}
{{< /tabs >}}

[Schema evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
`PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker schema validation is in place.
Expand Down Expand Up @@ -741,9 +798,21 @@ stats every 500ms. That means that `numRecordsOut`, `numBytesOut`, `numAcksRecei
are updated every 60 seconds. To increase the metrics refresh frequency, you can change
the Pulsar producer stats refresh interval to a smaller value (minimum 1 second), as shown below.

{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d1" >}}

{{< tab "Java" >}}
```java
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
```
{{< /tab >}}

{{< tab "Python" >}}
```python
builder.set_config("pulsar.client.statsIntervalSeconds", "1")
```
{{< /tab >}}

{{< /tabs >}}

`numBytesOutRate` and `numRecordsOutRate` are calculated based on the `numBytesOut` and `numRecordsOUt`
counter respectively. Flink internally uses a fixed 60 seconds window to calculate the rates.
Expand Down
11 changes: 9 additions & 2 deletions flink-python/pyflink/datastream/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.datastream.connectors.base import Sink, Source
from pyflink.datastream.connectors.base import Sink, Source, DeliveryGuarantee
from pyflink.datastream.connectors.file_system import (FileEnumeratorProvider, FileSink, FileSource,
BucketAssigner, FileSourceBuilder,
FileSplitAssignerProvider, OutputFileConfig,
Expand All @@ -25,13 +25,15 @@
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer, Semantic
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.connectors.pulsar import PulsarDeserializationSchema, PulsarSource, \
PulsarSourceBuilder, SubscriptionType, StartCursor, StopCursor
PulsarSourceBuilder, SubscriptionType, StartCursor, StopCursor, PulsarSerializationSchema, \
PulsarSink, PulsarSinkBuilder, MessageDelayer, TopicRoutingMode
from pyflink.datastream.connectors.rabbitmq import RMQConnectionConfig, RMQSource, RMQSink


__all__ = [
'Sink',
'Source',
'DeliveryGuarantee',
'FileEnumeratorProvider',
'FileSink',
'FileSource',
Expand All @@ -50,6 +52,11 @@
'PulsarSource',
'PulsarSourceBuilder',
'SubscriptionType',
'PulsarSerializationSchema',
'PulsarSink',
'PulsarSinkBuilder',
'MessageDelayer',
'TopicRoutingMode',
'RMQConnectionConfig',
'RMQSource',
'RMQSink',
Expand Down
34 changes: 34 additions & 0 deletions flink-python/pyflink/datastream/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from enum import Enum
from typing import Union

from py4j.java_gateway import JavaObject

from pyflink.datastream.functions import JavaFunctionWrapper
from pyflink.java_gateway import get_gateway


class Source(JavaFunctionWrapper):
Expand Down Expand Up @@ -48,3 +50,35 @@ def __init__(self, sink: Union[str, JavaObject]):
:param sink: The java Sink object.
"""
super(Sink, self).__init__(sink)


class DeliveryGuarantee(Enum):
"""
DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest
delivery guarantee which is supported by your sources and sinks.
:data: `EXACTLY_ONCE`:
Records are only delivered exactly-once also under failover scenarios. To build a complete
exactly-once pipeline is required that the source and sink support exactly-once and are
properly configured.
:data: `AT_LEAST_ONCE`:
Records are ensured to be delivered but it may happen that the same record is delivered
multiple times. Usually, this guarantee is faster than the exactly-once delivery.
:data: `NONE`:
Records are delivered on a best effort basis. It is often the fastest way to process records
but it may happen that records are lost or duplicated.
"""

EXACTLY_ONCE = 0,
AT_LEAST_ONCE = 1,
NONE = 2

def _to_j_delivery_guarantee(self):
JDeliveryGuarantee = get_gateway().jvm \
.org.apache.flink.connector.base.DeliveryGuarantee
return getattr(JDeliveryGuarantee, self.name)
Loading

0 comments on commit a70e289

Please sign in to comment.