Skip to content

Commit

Permalink
format clickhouse adapter code
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Dec 4, 2023
1 parent 9baeff5 commit 93dac2d
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package com.alibaba.otter.canal.client.adapter.clickhouse;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.util.JdbcUtils;
Expand All @@ -13,15 +23,6 @@
import com.alibaba.otter.canal.client.adapter.clickhouse.service.ClickHouseMirrorDbBatchSyncService;
import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
import com.alibaba.otter.canal.client.adapter.support.*;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* ClickHouse Adapter implementation
Expand All @@ -35,11 +36,15 @@ public class ClickHouseAdapter implements OuterAdapter {

private static final Logger logger = LoggerFactory.getLogger(ClickHouseAdapter.class);

private Map<String, MappingConfig> clickHouseMapping = new ConcurrentHashMap<>(); // Store the mapping of filename and configuration, load yml files below resource path
// Store the mapping of filename and configuration, load yml files below
// resource path
private Map<String, MappingConfig> clickHouseMapping = new ConcurrentHashMap<>();

private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>(); // Schema -> Table -> MappingConfig
// Schema -> Table -> MappingConfig
private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();

private Map<String, MirrorDbConfig> mirrorDbConfigCache = new ConcurrentHashMap<>(); // Mirror DB Configuration, don't need to load column mapping
// Mirror DB Configuration, don't need to load column mapping
private Map<String, MirrorDbConfig> mirrorDbConfigCache = new ConcurrentHashMap<>();

private DruidDataSource dataSource;

Expand All @@ -49,7 +54,8 @@ public class ClickHouseAdapter implements OuterAdapter {

private Properties envProperties;

private OuterAdapterConfig configuration; // Launch configuration
// Launch configuration
private OuterAdapterConfig configuration;

private ClickHouseConfigMonitor clickHouseConfigMonitor;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package com.alibaba.otter.canal.client.adapter.clickhouse.config;

import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.YamlUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.YamlUtils;

/**
* CLICKHOUSE表映射配置加载器
*
* @author rewerma 2018-11-07 下午02:41:34
* @version 1.0.0
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/
public class ConfigLoader {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.alibaba.otter.canal.client.adapter.clickhouse.config;

import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import org.apache.commons.lang.StringUtils;

import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;

import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;

/**
* CLICKHOUSE表映射配置
*
* @author rewerma 2018-11-07 下午02:41:34
* @version 1.0.0
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/
public class MappingConfig implements AdapterConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/

public class MirrorDbConfig {

private String fileName;
private MappingConfig mappingConfig;
private String fileName;
private MappingConfig mappingConfig;
private Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();

public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package com.alibaba.otter.canal.client.adapter.clickhouse.monitor;

import com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter;
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.client.adapter.support.YamlUtils;
import java.io.File;
import java.util.Properties;

import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Properties;
import com.alibaba.otter.canal.client.adapter.clickhouse.ClickHouseAdapter;
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.client.adapter.support.YamlUtils;

/**
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/

public class ClickHouseConfigMonitor {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
package com.alibaba.otter.canal.client.adapter.clickhouse.service;

import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONWriter.Feature;
Expand All @@ -10,25 +23,14 @@
import com.alibaba.otter.canal.client.adapter.clickhouse.support.SyncUtil;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.Util;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* ClickHouse batch synchronize
*
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/
public class ClickHouseBatchSyncService {

Expand All @@ -45,7 +47,7 @@ public class ClickHouseBatchSyncService {
private BatchExecutor alterExecutors; // Alter Single Executor(update/delete/truncate)

private ExecutorService[] executorThreads; // Be initialized once

private ScheduledExecutorService[] scheduledExecutors;

private int threads = 3; // Default parallel thread count
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package com.alibaba.otter.canal.client.adapter.clickhouse.service;

import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import javax.sql.DataSource;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.clickhouse.config.MappingConfig.DbMapping;
Expand All @@ -9,19 +18,13 @@
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.Util;

import javax.sql.DataSource;
import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* ClickHouse ETL 操作业务类
*
* @author rewerma @ 2018-11-7
* @version 1.0.0
* @author: Xander
* @date: Created in 2023/11/10 22:23
* @email: [email protected]
* @version 1.1.8
*/
public class ClickHouseEtlService extends AbstractEtlService {

Expand Down Expand Up @@ -58,23 +61,23 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
Util.sqlRS(targetDS,
"SELECT * FROM " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " LIMIT 1 ",
rs -> {
try {
try {

ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
columns.add(rsd.getColumnName(i));
}

ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
columns.add(rsd.getColumnName(i));
columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}

columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
});
});

Util.sqlRS(srcDS, sql, values, rs -> {
int idx = 1;
Expand All @@ -86,8 +89,10 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
insertSql.append("INSERT INTO ")
.append(SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()))
.append(" (");
columnsMap
.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick).append(targetColumnName).append(backtick).append(","));
columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(backtick)
.append(targetColumnName)
.append(backtick)
.append(","));

int len = insertSql.length();
insertSql.delete(len - 1, len).append(") VALUES (");
Expand All @@ -97,7 +102,7 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
}
len = insertSql.length();
insertSql.delete(len - 1, len).append(")");
logger.info("executeSqlImport sql:{}",insertSql.toString());
logger.info("executeSqlImport sql:{}", insertSql.toString());
try (Connection connTarget = targetDS.getConnection();
PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
connTarget.setAutoCommit(false);
Expand All @@ -110,7 +115,8 @@ protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> va
// 删除数据
Map<String, Object> pkVal = new LinkedHashMap<>();
StringBuilder deleteSql = new StringBuilder(
"ALTER TABLE " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType()) + " DELETE WHERE ");
"ALTER TABLE " + SyncUtil.getDbTableName(dbMapping, dataSource.getDbType())
+ " DELETE WHERE ");
appendCondition(dbMapping, deleteSql, pkVal, rs, backtick);
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
int k = 1;
Expand Down
Loading

0 comments on commit 93dac2d

Please sign in to comment.