Skip to content

Commit

Permalink
support otter use tableMetaTSDB
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Aug 8, 2018
1 parent 377a72e commit 9e816bc
Show file tree
Hide file tree
Showing 21 changed files with 149 additions and 611 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ final Serializable fetchValue(int type, final int meta, boolean isBinary) {
// t % 100);

StringBuilder builder = new StringBuilder();
builder.append(26);
appendNumber4(builder, d / 10000);
builder.append('-');
appendNumber2(builder, (d % 10000) / 100);
Expand Down Expand Up @@ -615,7 +614,13 @@ final Serializable fetchValue(int type, final int meta, boolean isBinary) {
if (i32 < 0) {
builder.append('-');
}
appendNumber2(builder, u32 / 10000);

int d = u32 / 10000;
if (d > 100) {
builder.append(String.valueOf(d));
} else {
appendNumber2(builder, d);
}
builder.append(':');
appendNumber2(builder, (u32 % 10000) / 100);
builder.append(':');
Expand Down Expand Up @@ -724,7 +729,13 @@ final Serializable fetchValue(int type, final int meta, boolean isBinary) {
if (ltime < 0) {
builder.append('-');
}
appendNumber2(builder, (int) ((intpart >> 12) % (1 << 10)));

int d = (int) ((intpart >> 12) % (1 << 10));
if (d > 100) {
builder.append(String.valueOf(d));
} else {
appendNumber2(builder, d);
}
builder.append(':');
appendNumber2(builder, (int) ((intpart >> 6) % (1 << 6)));
builder.append(':');
Expand Down Expand Up @@ -1134,17 +1145,15 @@ private void appendNumber4(StringBuilder builder, int d) {
.append(digits[(d / 100) % 10])
.append(digits[(d / 10) % 10])
.append(digits[d % 10]);
} else if (d >= 100) {
} else {
builder.append('0');
appendNumber3(builder, d);
}
}

private void appendNumber3(StringBuilder builder, int d) {
if (d >= 100) {
builder.append(digits[d / 100])
.append(digits[(d / 10) % 10])
.append(digits[d % 10]);
builder.append(digits[d / 100]).append(digits[(d / 10) % 10]).append(digits[d % 10]);
} else {
builder.append('0');
appendNumber2(builder, d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
import java.util.Collections;
import java.util.List;


import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.HistoryTableMetaCache;

import com.alibaba.otter.canal.meta.FileMixedMetaManager;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,7 +20,14 @@
import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageScavengeMode;
import com.alibaba.otter.canal.meta.FileMixedMetaManager;
import com.alibaba.otter.canal.meta.MemoryMetaManager;
import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
Expand All @@ -36,7 +38,12 @@
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.*;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
Expand Down Expand Up @@ -114,7 +121,7 @@ protected void initMetaManager() {
ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
zooKeeperMetaManager.setZkClientx(getZkclientx());
((PeriodMixedMetaManager) metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
} else if (mode.isLocalFile()){
} else if (mode.isLocalFile()) {
FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
fileMixedMetaManager.setDataDir(parameters.getDataDir());
fileMixedMetaManager.setPeriod(parameters.getMetaFileFlushPeriod());
Expand Down Expand Up @@ -241,13 +248,6 @@ private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAdd
mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
// 数据库信息参数
mysqlEventParser.setSlaveId(parameters.getSlaveId());
mysqlEventParser.setTableMetaStorageFactory(parameters.getTableMetaStorageFactory());
// Ctrip callback
// mysqlEventParser.setCallback(parameters.getCallback());
// HistoryTableMetaCache cache = new HistoryTableMetaCache();
// cache.init(parameters.getEntries());
// mysqlEventParser.setHistoryTableMetaCache(cache);

if (!CollectionUtils.isEmpty(dbAddresses)) {
mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
parameters.getDbUsername(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.ArrayList;
import java.util.List;

import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;

Expand Down Expand Up @@ -94,6 +93,10 @@ public class CanalParameter implements Serializable {
private Boolean filterTableError = Boolean.FALSE; // 是否忽略表解析异常
private String blackFilter = null; // 匹配黑名单,忽略解析

private Boolean tsdbEnable = Boolean.FALSE; // 是否开启tableMetaTSDB
private String tsdbJdbcUrl;
private String tsdbJdbcUserName;
private String tsdbJdbcPassword;
// ================================== 兼容字段处理
private InetSocketAddress masterAddress; // 主库信息
private String masterUsername; // 帐号
Expand All @@ -109,9 +112,6 @@ public class CanalParameter implements Serializable {
private Long standbyLogfileOffest = null;
private Long standbyTimestamp = null;

// Ctrip Table Meta
TableMetaStorageFactory tableMetaStorageFactory;

public static enum RunMode {

/** 嵌入式 */
Expand Down Expand Up @@ -250,7 +250,7 @@ public static enum MetaMode {
ZOOKEEPER,
/** 混合模式,内存+文件 */
MIXED,
/** 本地文件存储模式*/
/** 本地文件存储模式 */
LOCAL_FILE;

public boolean isMemory() {
Expand All @@ -265,7 +265,7 @@ public boolean isMixed() {
return this.equals(MetaMode.MIXED);
}

public boolean isLocalFile(){
public boolean isLocalFile() {
return this.equals(MetaMode.LOCAL_FILE);
}
}
Expand Down Expand Up @@ -887,12 +887,36 @@ public void setBlackFilter(String blackFilter) {
this.blackFilter = blackFilter;
}

public TableMetaStorageFactory getTableMetaStorageFactory() {
return tableMetaStorageFactory;
public Boolean getTsdbEnable() {
return tsdbEnable;
}

public void setTsdbEnable(Boolean tsdbEnable) {
this.tsdbEnable = tsdbEnable;
}

public String getTsdbJdbcUrl() {
return tsdbJdbcUrl;
}

public void setTsdbJdbcUrl(String tsdbJdbcUrl) {
this.tsdbJdbcUrl = tsdbJdbcUrl;
}

public String getTsdbJdbcUserName() {
return tsdbJdbcUserName;
}

public void setTsdbJdbcUserName(String tsdbJdbcUserName) {
this.tsdbJdbcUserName = tsdbJdbcUserName;
}

public String getTsdbJdbcPassword() {
return tsdbJdbcPassword;
}

public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
this.tableMetaStorageFactory = tableMetaStorageFactory;
public void setTsdbJdbcPassword(String tsdbJdbcPassword) {
this.tsdbJdbcPassword = tsdbJdbcPassword;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,30 @@
import com.alibaba.otter.canal.parse.inbound.BinlogParser;
import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
import com.alibaba.otter.canal.protocol.position.EntryPosition;

public abstract class AbstractMysqlEventParser extends AbstractEventParser {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final long BINLOG_START_OFFEST = 4L;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final long BINLOG_START_OFFEST = 4L;

protected TableMetaTSDBFactory tableMetaTSDBFactory = new DefaultTableMetaTSDBFactory();
protected boolean enableTsdb = false;
protected String tsdbSpringXml;
protected TableMetaTSDB tableMetaTSDB;

protected boolean enableTsdb = false;
protected String tsdbSpringXml;
protected TableMetaTSDB tableMetaTSDB;
// 编码信息
protected byte connectionCharsetNumber = (byte) 33;
protected Charset connectionCharset = Charset.forName("UTF-8");
protected boolean filterQueryDcl = false;
protected boolean filterQueryDml = false;
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;
protected byte connectionCharsetNumber = (byte) 33;
protected Charset connectionCharset = Charset.forName("UTF-8");
protected boolean filterQueryDcl = false;
protected boolean filterQueryDml = false;
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;

protected BinlogParser buildParser() {
LogEventConvert convert = new LogEventConvert();
Expand Down Expand Up @@ -97,7 +100,7 @@ public void start() throws CanalParseException {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
} finally {
System.setProperty("canal.instance.destination", "");
}
Expand All @@ -110,7 +113,7 @@ public void start() throws CanalParseException {

public void stop() throws CanalParseException {
if (enableTsdb) {
TableMetaTSDBBuilder.destory(destination);
tableMetaTSDBFactory.destory(destination);
tableMetaTSDB = null;
}

Expand Down Expand Up @@ -182,7 +185,7 @@ public void setEnableTsdb(boolean enableTsdb) {
if (this.enableTsdb) {
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
}
}
}
Expand All @@ -192,9 +195,13 @@ public void setTsdbSpringXml(String tsdbSpringXml) {
if (this.enableTsdb) {
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
}
}
}

public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
this.tableMetaTSDBFactory = tableMetaTSDBFactory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.inbound.*;
import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheWithStorage;
import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
import org.apache.commons.lang.StringUtils;
import org.springframework.util.CollectionUtils;

Expand All @@ -25,6 +20,9 @@
import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
import com.alibaba.otter.canal.parse.inbound.SinkFunction;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
Expand Down Expand Up @@ -64,8 +62,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
// 心跳检查信息
private String detectingSQL; // 心跳sql
private MysqlConnection metaConnection; // 查询meta信息的链接
private TableMetaCacheInterface tableMetaCache; // 对应meta
// cache
private TableMetaCache tableMetaCache; // 对应meta
private int fallbackIntervalInSeconds = 60; // 切换回退时间
private BinlogFormat[] supportBinlogFormats; // 支持的binlogFormat,如果设置会执行强校验
private BinlogImage[] supportBinlogImages; // 支持的binlogImage,如果设置会执行强校验
Expand All @@ -74,8 +71,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值

private TableMetaStorageFactory tableMetaStorageFactory;

protected ErosaConnection buildErosaConnection() {
return buildMysqlConnection(this.runningInfo);
}
Expand Down Expand Up @@ -129,13 +124,7 @@ protected void preDump(ErosaConnection connection) {
((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
}


TableMetaStorage storage = null;
if (tableMetaStorageFactory != null) {
storage = tableMetaStorageFactory.getTableMetaStorage();
}

tableMetaCache = new TableMetaCacheWithStorage(metaConnection, storage);
tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
}
}
Expand Down Expand Up @@ -921,11 +910,4 @@ public void setDumpErrorCountThreshold(int dumpErrorCountThreshold) {
this.dumpErrorCountThreshold = dumpErrorCountThreshold;
}

public TableMetaStorageFactory getTableMetaStorageFactory() {
return tableMetaStorageFactory;
}

public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
this.tableMetaStorageFactory = tableMetaStorageFactory;
}
}
Loading

0 comments on commit 9e816bc

Please sign in to comment.