Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wildcard topic #92

Open
Kenji-H opened this issue Jun 20, 2017 · 16 comments
Open

Wildcard topic #92

Kenji-H opened this issue Jun 20, 2017 · 16 comments

Comments

@Kenji-H
Copy link
Contributor

Kenji-H commented Jun 20, 2017

For now we provide topic names in a properties file as following:

topics=kcbq-quickstart,app.click

If there are a few topics to deal with, It's ok to list all the topics in a comma-seperated way.
But when you have to deal with hundreds or thousands of topics, it doesn't help. In those situations, we would like to use a wildcard expression to specify topics.

How about implementing a method to retrive all the topic names matching a given wildcard expression in SchemaRegistrySchemaRetriever class?

@criccomini
Copy link
Contributor

@mtagle @C0urante any thoughts on this?

For one, I'd prefer not to tie this to the schema registry. Getting topics directly from Kafka seems preferable. Thus far, we've tried to make the dependency on confluent schema registry as pluggable as we can.

Also, to be clear, a regex would only work on startup. The Kafka Connect framework doesn't yet provide consumer refreshing--once it starts, the topic list is static.

@C0urante
Copy link
Collaborator

@criccomini Think you nailed it with

a regex would only work on startup. The Kafka Connect framework doesn't yet provide consumer refreshing

Seems like the best fix would be with the Kafka Connect framework itself, as opposed to this specific connector. I just talked to @ewencp about it and he pointed out that there's an existing JIRA for adding that support; he'd be glad to give you some info on how to add this functionality to the framework if you're interested.

@Kenji-H let us know if that would address your needs; if not I'm sure adding a bit of regex-fiddling to the connector shouldn't be too much work.

@criccomini
Copy link
Contributor

@C0urante yea, that's kind of what I was thinking. We don't have resourcing to dedicate to the KC JIRA right now, @Kenji-H, if you want to move discussion over to the JIRA and do the work there, that'd be super useful!

@Kenji-H
Copy link
Contributor Author

Kenji-H commented Jun 21, 2017

@criccomini @C0urante

Thanks for your quick replies.
Like you said, we need to update Kafka Connect framework itself. I'll bring the discussion to the JIRA. I'd be happy if I can get some help from @ewencp.

As far as I can understand from the sorce code, we still need to update this library after the fix of Kafka Connect framework. When creating the mapping from schema to table, namely BigQuerySinkTask::topicsToBaseTableIds, the library reads the "topics" parameter in a properties file. It looks this part needs some updates.

Anyway, we should start from Kafka Connect framework. I think I have some time to make contribution to these changes.

@Kenji-H
Copy link
Contributor Author

Kenji-H commented Jun 21, 2017

@ewencp

KAFKA-3037 has a duplicated issue KAFKA-3074, which is resolved. So regex sink is already implemented?

@ewencp
Copy link

ewencp commented Jun 21, 2017

@Kenji-H No, that's resolved as a duplicate because it was accidentally re-registered. Doing this in the framework is definitely the right choice -- there's really no reason it shouldn't be handled as a generic option for sink connectors.

This will likely be a very simple JIRA to implement, and there's really only one design decision to be addressed -- whether to try to do this with the existing option and just allow regexes or if we should add another option (e.g. topics.regex) that is specific to these subscriptions. I think detecting regexes will probably be messy and error prone at best since . is a valid character in topic names and regexes while having different meaning. So probably just adding another config option for sink connectors would be the way to go. From there, the code changes are probably quite minimal -- add the option to SinkConnectorConfig, then make WorkerSinkTask use the different versions of Consumer.subscribe() depending on which of the two is set. There might be a little bit more work in places where we validate configs since presumably only one of topics or topics.regex should be set for any given connector.

To propose this change, you'd write up a Kafka Improvement Proposal. These are used to let the community vet changes to public interfaces since adding to public interfaces is a commitment to support them moving forward. This might sound like a lot of effort, but for something simple like this the KIP will just be a couple of sentences in each section of the KIP template and since it's a well known feature that people want, discussion will probably be pretty minimal.

If you're interested in taking this on, we can help guide you through the rest of the process and get the code committed.

@Kenji-H
Copy link
Contributor Author

Kenji-H commented Jun 21, 2017

@ewencp

Thank you for your kind help.
I'd like to take it on. I will write up the Kafka Improvement Proposal first.

As for the design option, I was considering a similar way. Like you said, some special characters in reg exp, . and - can be used in a schema name and they need some consideration. Just adding another option looks better.

@criccomini
Copy link
Contributor

Fantastic. @Kenji-H we'll leave this issue open until everything gets resolved.

@criccomini
Copy link
Contributor

@criccomini
Copy link
Contributor

@criccomini
Copy link
Contributor

Looks like this was released in 1.1.0. Once we upgrade Kafka dependencies, I believe we can support this feature. That said, we don't have plans to upgrade to 1.1.0 yet.

@kmillanr
Copy link

Hello, any update on the solution?

@salihoto-zz
Copy link

Also, to be clear, a regex would only work on startup. The Kafka Connect framework doesn't yet provide consumer refreshing--once it starts, the topic list is static.

Is this feature implemented ? are we able to consume from new added topics?

@kmillanr
Copy link

kmillanr commented Oct 22, 2019 via email

@salihoto-zz
Copy link

I was think on creating some python script mechanism that would reload the connector whenever the length of the topic list changes within the kafka cluster (create/delete tables go through to the kafka cluster but not through the sink connector). But haven't gotten around the mechanism yet.

Actually I was thinking about the same solution. By the way, you mentioned altering operations work well, but dropping column or renaming column is not working. I am not sure but this might be about Big query limitations. Another thing, In avro mod,e deleting opeartion doesn't reflect to kafka but in json mode it's available as the after part is null.

@kmillanr
Copy link

kmillanr commented Oct 22, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants