Skip to content

Commit

Permalink
Add elasticsearch 7.x version adapter support (alibaba#2246)
Browse files Browse the repository at this point in the history
* init es data sync upgrade

* Add elasticsearch 7.x version adapter support
  • Loading branch information
rewerma authored and agapple committed Sep 25, 2019
1 parent 61726c7 commit f45c020
Show file tree
Hide file tree
Showing 54 changed files with 7,172 additions and 6 deletions.
49 changes: 49 additions & 0 deletions client-adapter/es-core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>canal.client-adapter</artifactId>
<groupId>com.alibaba.otter</groupId>
<version>1.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.es-core</artifactId>
<packaging>jar</packaging>
<name>canal client adapter es-core module for otter ${project.version}</name>

<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastsql</groupId>
<artifactId>fastsql</artifactId>
<version>2.0.0_preview_855</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.192</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package com.alibaba.otter.canal.client.adapter.es.core;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
import org.apache.commons.lang.StringUtils;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfigLoader;
import com.alibaba.otter.canal.client.adapter.es.core.config.SchemaItem;
import com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser;
import com.alibaba.otter.canal.client.adapter.es.core.monitor.ESConfigMonitor;
import com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;

/**
* ES外部适配器
*
* @author rewerma 2018-10-20
* @version 1.0.0
*/
public abstract class ESAdapter implements OuterAdapter {

protected Map<String, ESSyncConfig> esSyncConfig = new ConcurrentHashMap<>(); // 文件名对应配置
protected Map<String, Map<String, ESSyncConfig>> dbTableEsSyncConfig = new ConcurrentHashMap<>(); // schema-table对应配置

protected ESTemplate esTemplate;

protected ESSyncService esSyncService;

protected ESConfigMonitor esConfigMonitor;

protected Properties envProperties;

public ESSyncService getEsSyncService() {
return esSyncService;
}

public Map<String, ESSyncConfig> getEsSyncConfig() {
return esSyncConfig;
}

public Map<String, Map<String, ESSyncConfig>> getDbTableEsSyncConfig() {
return dbTableEsSyncConfig;
}

@Override
public void init(OuterAdapterConfig configuration, Properties envProperties) {
try {
this.envProperties = envProperties;
Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
// 过滤不匹配的key的配置
esSyncConfigTmp.forEach((key, config) -> {
if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
|| (config.getOuterAdapterKey() != null
&& config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
esSyncConfig.put(key, config);
}
});

for (Map.Entry<String, ESSyncConfig> entry : esSyncConfig.entrySet()) {
String configName = entry.getKey();
ESSyncConfig config = entry.getValue();

addSyncConfigToCache(configName, config);
}

esSyncService = new ESSyncService(esTemplate);

esConfigMonitor = new ESConfigMonitor();
esConfigMonitor.init(this, envProperties);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

@Override
public void sync(List<Dml> dmls) {
if (dmls == null || dmls.isEmpty()) {
return;
}
for (Dml dml : dmls) {
if (!dml.getIsDdl()) {
sync(dml);
}
}
esSyncService.commit(); // 批次统一提交

}

private void sync(Dml dml) {
String database = dml.getDatabase();
String table = dml.getTable();
Map<String, ESSyncConfig> configMap;
if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
configMap = dbTableEsSyncConfig
.get(StringUtils.trimToEmpty(dml.getDestination()) + "-" + StringUtils.trimToEmpty(dml.getGroupId())
+ "_" + database + "-" + table);
} else {
configMap = dbTableEsSyncConfig
.get(StringUtils.trimToEmpty(dml.getDestination()) + "_" + database + "-" + table);
}

if (configMap != null && !configMap.values().isEmpty()) {
esSyncService.sync(configMap.values(), dml);
}
}

@Override
public abstract EtlResult etl(String task, List<String> params);

@Override
public abstract Map<String, Object> count(String task);

@Override
public void destroy() {
if (esConfigMonitor != null) {
esConfigMonitor.destroy();
}
}

@Override
public String getDestination(String task) {
ESSyncConfig config = esSyncConfig.get(task);
if (config != null) {
return config.getDestination();
}
return null;
}

public void addSyncConfigToCache(String configName, ESSyncConfig config) {
Properties envProperties = this.envProperties;
SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
config.getEsMapping().setSchemaItem(schemaItem);

DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (dataSource == null || dataSource.getUrl() == null) {
throw new RuntimeException("No data source found: " + config.getDataSourceKey());
}
Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
Matcher matcher = pattern.matcher(dataSource.getUrl());
if (!matcher.find()) {
throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
}
String schema = matcher.group(2);

schemaItem.getAliasTableItems().values().forEach(tableItem -> {
Map<String, ESSyncConfig> esSyncConfigMap;
if (envProperties != null
&& !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
esSyncConfigMap = dbTableEsSyncConfig
.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "-"
+ StringUtils.trimToEmpty(config.getGroupId()) + "_" + schema + "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
} else {
esSyncConfigMap = dbTableEsSyncConfig
.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "_" + schema + "-"
+ tableItem.getTableName(),
k -> new ConcurrentHashMap<>());
}

esSyncConfigMap.put(configName, config);
});
}
}
Loading

0 comments on commit f45c020

Please sign in to comment.