Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] A topic created by the pulsar-flink connector is always created as a partitioned topic with 1 partition #398

Open
mathieudruart opened this issue Aug 23, 2021 · 6 comments

Comments

@mathieudruart
Copy link

Describe the bug
Since #369 when a topic is created by the flink-connector at the start-up of the stream (if the topic doesn't already exist), the topic is always created as a partioned topic with 1 partition.

To Reproduce

  1. create a Flink Stream with a Pulsar source/sink linked to a non-existant topic
  2. start the stream
  3. the topic is created but always as a partioned topic with 1 partition

Expected behavior
Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions).
If it's not possible, the source/sink should be configurable to define the 2 points (partitioned or not and if partitioned, the default partitions number).

Additional context
Tested with :

  • pulsar 2.8.0
  • pulsar-flink 1.13.1.2
  • flink 1.13.1
@mathieudruart mathieudruart changed the title [BUG] A topic created by the pulsar-flink connector il always created as a partitioned topic with 1 partition [BUG] A topic created by the pulsar-flink connector is always created as a partitioned topic with 1 partition Aug 24, 2021
@shibd
Copy link
Member

shibd commented Sep 3, 2021

@mathieudruart

Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions).

When the pulsar broker is configured to automatically create partitions, the flink-pulsar don't create topic with partition = 1.

If the switch is on(allowAutoTopicCreationType = partitioned/non-partitioned), the code will trigger pulsar to automatically create topic

admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;

However, flink needs to ensure that all configured topics exist, so it create the topic with the default partition of 1 when pulsar not config autoTopicCreate.

@mathieudruart
Copy link
Author

@shibd

Even if allowAutoTopicCreation is set to true, this call will not trigger pulsar to automatically create the topic :

admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;

the admin API will only returns "Topic not exist" but without creating the topic.

You can easily reproduce it with a Pulsar deployed with the default configuration :

# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true

# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

Try this with pulsar-admin :

pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/test-topic-creation

it will only answer "Topic not exist", and no topic created.

So this code will always create a partioned topic with 1 partition if the topic doesn't exist (ignoring Pulsar broker configuration) :

    public Set<TopicRange> getTopicPartitionsAll() throws PulsarAdminException {
        List<TopicRange> topics = getTopics();
        HashSet<TopicRange> allTopics = new HashSet<>();
        for (TopicRange topic : topics) {
            int partNum = 1;
            try {
                partNum = admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
            } catch (PulsarAdminException.NotFoundException e) {
                log.info("topic<{}> is not exit, auto create <{}> partition to <{}>", topic.getTopic(), partNum, topic.getTopic());
                try {
                    createTopic(topic.getTopic(), partNum);

@shibd
Copy link
Member

shibd commented Sep 4, 2021

Sorry, I confirmed the implementation of pulsar, (getPartitionedTopicMetadata )This method does not read the broker configuration, We need pulsar support it..

The switch to automatically create topic is exposed in the pulsar interface, but admin api is not exposed. We can provide an issue to pulsar.

@shibd
Copy link
Member

shibd commented Sep 4, 2021

@jianyun8023 @syhily Can you help me review the way to solve the problem?

@syhily
Copy link
Contributor

syhily commented Nov 4, 2021

@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.

@shibd
Copy link
Member

shibd commented Nov 5, 2021

@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.

I think connector need support configuring the default number of partitions.(interim measures)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

4 participants