Enriching Kafka Stream with Another Stream Using Flink
Install PostgreSQL 11+
Setup PostgreSQL to allow Debezium to CDC using pgoutput. Reference here
Setup Apache Kafka (with Kafka Connect) on your machine/cluster
Install Debezium PostgreSQL connector from here
Run Apache Kafka & Kafka Connect
Create table transactions
and customers
in PostgreSQL (SQL file in here )
Create POST request to your Kafka Connect REST interface with request body as below
{
"name" : " postgres_cdc" ,
"config" : {
"connector.class" : " io.debezium.connector.postgresql.PostgresConnector" ,
"database.hostname" : " <host>" ,
"database.port" : " <port>" ,
"database.user" : " <username>" ,
"database.password" : " ****" ,
"database.dbname" : " <dbname>" ,
"database.server.name" : " <servername>" ,
"table.whitelist" : " <schema>.customers,<schema>.transactions" ,
"plugin.name" : " pgoutput"
}
}
Run the jar
Streaming Job Available Parameters
--checkpoint-path
: path to save Flink's checkpoints.
--debug-result-stream
: whether to debug result stream to the console or not
--environment
: environment to run the app
--auto-offset-reset
: Kafka auto.offset.reset
parameter
--boostrap-server
: Kafka bootstrap servers
--consumer-group-id
: Kafka consumer group ID
--offset-strategy
: whether to get earliest or latest offset from Kafka
--source-topic-1
: Kafka transactions stream name
--source-topic-2
: Kafka customers stream name
--target-topic
: target topic name to publish enriched data
--properties-file
: properties file to load parameters from