Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] FlinkPulsarSource parallelism change #248

Open
aryemazouz opened this issue Jan 26, 2021 · 1 comment
Open

[BUG] FlinkPulsarSource parallelism change #248

aryemazouz opened this issue Jan 26, 2021 · 1 comment

Comments

@aryemazouz
Copy link

Its look like FlinkPulsarSource doesn’t handle parallelism changing correctly

I tried to look in the code but I didn’t found any reference that handle change in parallelism.

~ I am using key hash feature (version client / pulsar 2.7.0)

Solution Suggestion
Our current use case is single topic per job (can be partitioned or non partitioned)

I thought about simple and fast solution to keep tracking on the current pulsar cursor, my solution can be found in the attached file.
I would like to hear comment on the solution (by using current Pulsar API).

The algorithm explanation can be found in PartitionedTopicCursor class documentation (in the attached file).

  • Currently the solution support only single topic (partitioned or non partitioned) and not support all Flink features but it can be modified.
  • I used SourceSinkUtils.distributeRange to split range to task index (copy it to the example code).
  • There are some unit tests that demonstrate the usage.

new-flink-pulsar-connector.zip

Thanks!

@jianyun8023
Copy link
Contributor

In key-shard mode consumed as a subscriber, the range is persisted by creating a subscriber test[0,1000]. The original progress is restored by reading this subscription at startup time.

I will implement the persistence of connector key-shared subscriptions according to this solution.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants