- Install PostgreSQL 11+
- Setup PostgreSQL to allow Debezium to CDC using pgoutput. Reference here
- Setup Apache Kafka (with Kafka Connect) on your machine/cluster
- Install Debezium PostgreSQL connector from here
- Run Apache Kafka & Kafka Connect
- Create table
transactions
andcustomers
in PostgreSQL (SQL file in here) - Create POST request to your Kafka Connect REST interface with request body as below
{
"name": "postgres_cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<host>",
"database.port": "<port>",
"database.user": "<username>",
"database.password": "****",
"database.dbname" : "<dbname>",
"database.server.name": "<servername>",
"table.whitelist": "<schema>.customers,<schema>.transactions",
"plugin.name": "pgoutput"
}
}
- Run the jar
--checkpoint-path
: path to save Flink's checkpoints.--debug-result-stream
: whether to debug result stream to the console or not--environment
: environment to run the app--auto-offset-reset
: Kafkaauto.offset.reset
parameter--boostrap-server
: Kafka bootstrap servers--consumer-group-id
: Kafka consumer group ID--offset-strategy
: whether to get earliest or latest offset from Kafka--source-topic-1
: Kafka transactions stream name--source-topic-2
: Kafka customers stream name--target-topic
: target topic name to publish enriched data--properties-file
: properties file to load parameters from