Skip to content

Commit

Permalink
[Feature-469][client] Add MysqlCDCSource sync extended configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed May 3, 2022
1 parent 2da5c90 commit de28dc1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -52,32 +53,45 @@ public CDCBuilder create(FlinkCDCConfig config) {

@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
String serverId = config.getSource().get("server-id");
String serverTimeZone = config.getSource().get("server-time-zone");
String fetchSize = config.getSource().get("scan.snapshot.fetch.size");
String connectTimeout = config.getSource().get("connect.timeout");
String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}

MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();

if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
}

List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
Expand All @@ -90,6 +104,35 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}

if (Asserts.isNotNullString(serverId)) {
sourceBuilder.serverId(serverId);
}

if (Asserts.isNotNullString(serverTimeZone)) {
sourceBuilder.serverTimeZone(serverTimeZone);
}

if (Asserts.isNotNullString(fetchSize)) {
sourceBuilder.fetchSize(Integer.valueOf(fetchSize));
}

if (Asserts.isNotNullString(connectTimeout)) {
sourceBuilder.connectTimeout(Duration.ofMillis(Long.valueOf(connectTimeout)));
}

if (Asserts.isNotNullString(connectMaxRetries)) {
sourceBuilder.connectMaxRetries(Integer.valueOf(connectMaxRetries));
}

if (Asserts.isNotNullString(connectionPoolSize)) {
sourceBuilder.connectionPoolSize(Integer.valueOf(connectionPoolSize));
}

if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
}

return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -52,32 +53,45 @@ public CDCBuilder create(FlinkCDCConfig config) {

@Override
public DataStreamSource<String> build(StreamExecutionEnvironment env) {
String database = config.getDatabase();
String serverId = config.getSource().get("server-id");
String serverTimeZone = config.getSource().get("server-time-zone");
String fetchSize = config.getSource().get("scan.snapshot.fetch.size");
String connectTimeout = config.getSource().get("connect.timeout");
String connectMaxRetries = config.getSource().get("connect.max-retries");
String connectionPoolSize = config.getSource().get("connection.pool.size");
String heartbeatInterval = config.getSource().get("heartbeat.interval");

Properties properties = new Properties();
for (Map.Entry<String, String> entry : config.getDebezium().entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey(), entry.getValue());
}
}

MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
.username(config.getUsername())
.password(config.getPassword());
String database = config.getDatabase();

if (Asserts.isNotNullString(database)) {
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
sourceBuilder.databaseList(new String[0]);
}

List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[schemaTableNameList.size()]));
} else {
sourceBuilder.tableList(new String[0]);
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
sourceBuilder.debeziumProperties(properties);

if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
Expand All @@ -90,6 +104,35 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
} else {
sourceBuilder.startupOptions(StartupOptions.latest());
}

if (Asserts.isNotNullString(serverId)) {
sourceBuilder.serverId(serverId);
}

if (Asserts.isNotNullString(serverTimeZone)) {
sourceBuilder.serverTimeZone(serverTimeZone);
}

if (Asserts.isNotNullString(fetchSize)) {
sourceBuilder.fetchSize(Integer.valueOf(fetchSize));
}

if (Asserts.isNotNullString(connectTimeout)) {
sourceBuilder.connectTimeout(Duration.ofMillis(Long.valueOf(connectTimeout)));
}

if (Asserts.isNotNullString(connectMaxRetries)) {
sourceBuilder.connectMaxRetries(Integer.valueOf(connectMaxRetries));
}

if (Asserts.isNotNullString(connectionPoolSize)) {
sourceBuilder.connectionPoolSize(Integer.valueOf(connectionPoolSize));
}

if (Asserts.isNotNullString(heartbeatInterval)) {
sourceBuilder.heartbeatInterval(Duration.ofMillis(Long.valueOf(heartbeatInterval)));
}

return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ public class FlinkCDCConfig {
private List<String> schemaTableNameList;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> sink;
private List<Schema> schemaList;
private String schemaFieldName;

public FlinkCDCConfig() {
}

public FlinkCDCConfig(String type, String hostname, int port, String username, String password, int checkpoint, int parallelism, String database, String schema, String table, String startupMode,
Map<String, String> debezium, Map<String, String> sink) {
public FlinkCDCConfig(String type, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String database, String schema, String table,
String startupMode,
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
this.type = type;
this.hostname = hostname;
this.port = port;
Expand All @@ -45,6 +47,7 @@ public FlinkCDCConfig(String type, String hostname, int port, String username, S
this.table = table;
this.startupMode = startupMode;
this.debezium = debezium;
this.source = source;
this.sink = sink;
}

Expand Down Expand Up @@ -124,6 +127,14 @@ public String getTable() {
return table;
}

public Map<String, String> getSource() {
return source;
}

public void setSource(Map<String, String> source) {
this.source = source;
}

public void setTable(String table) {
this.table = table;
}
Expand Down
35 changes: 28 additions & 7 deletions dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class CDCSource {
private String table;
private String startupMode;
private Map<String, String> debezium;
private Map<String, String> source;
private Map<String, String> sink;

public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> debezium, Map<String, String> sink) {
Map<String, String> debezium, Map<String, String> source, Map<String, String> sink) {
this.connector = connector;
this.statement = statement;
this.name = name;
Expand All @@ -46,6 +47,7 @@ public CDCSource(String connector, String statement, String name, String hostnam
this.parallelism = parallelism;
this.startupMode = startupMode;
this.debezium = debezium;
this.source = source;
this.sink = sink;
}

Expand All @@ -62,6 +64,16 @@ public static CDCSource build(String statement) {
}
}
}
Map<String, String> source = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("source.")) {
String key = entry.getKey();
key = key.replaceFirst("source.", "");
if (!source.containsKey(key)) {
source.put(key, entry.getValue());
}
}
}
Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
Expand All @@ -84,16 +96,17 @@ public static CDCSource build(String statement) {
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
debezium,
source,
sink
);
if (Asserts.isNotNullString(config.get("database"))) {
cdcSource.setDatabase(config.get("database"));
if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name"));
}
if (Asserts.isNotNullString(config.get("schema"))) {
cdcSource.setSchema(config.get("schema"));
if (Asserts.isNotNullString(config.get("schema-name"))) {
cdcSource.setSchema(config.get("schema-name"));
}
if (Asserts.isNotNullString(config.get("table"))) {
cdcSource.setTable(config.get("table"));
if (Asserts.isNotNullString(config.get("table-name"))) {
cdcSource.setTable(config.get("table-name"));
}
return cdcSource;
}
Expand Down Expand Up @@ -229,4 +242,12 @@ public Map<String, String> getDebezium() {
public void setDebezium(Map<String, String> debezium) {
this.debezium = debezium;
}

public Map<String, String> getSource() {
return source;
}

public void setSource(Map<String, String> source) {
this.source = source;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TableResult build(Executor executor) {
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername()
, cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema()
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSink());
, cdcSource.getTable(), cdcSource.getStartupMode(), cdcSource.getDebezium(), cdcSource.getSource(), cdcSource.getSink());
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
Expand Down

0 comments on commit de28dc1

Please sign in to comment.