Skip to content

Commit

Permalink
Merge pull request alibaba#291 from MarkLinHz/master
Browse files Browse the repository at this point in the history
重构LogPositionManager并适配XML配置
  • Loading branch information
agapple authored Apr 1, 2017
2 parents 4d624cf + a78622a commit a660497
Show file tree
Hide file tree
Showing 25 changed files with 404 additions and 410 deletions.
10 changes: 5 additions & 5 deletions deployer/src/main/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@
<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
<constructor-arg ref="metaManager"/>
</bean>
</property>
</constructor-arg>
</bean>
</property>

Expand Down
10 changes: 5 additions & 5 deletions deployer/src/main/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@
<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
<constructor-arg ref="metaManager"/>
</bean>
</property>
</constructor-arg>
</bean>
</property>

Expand Down
10 changes: 5 additions & 5 deletions deployer/src/main/resources/spring/local-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@
<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
<constructor-arg ref="metaManager"/>
</bean>
</property>
</constructor-arg>
</bean>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,7 +17,6 @@
import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.sink.CanalEventSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Collections;
import java.util.List;

import com.alibaba.otter.canal.parse.index.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -37,12 +38,6 @@
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
Expand Down Expand Up @@ -346,29 +341,22 @@ protected CanalHAController initHaController() {
protected CanalLogPositionManager initLogPositionManager() {
logger.info("init logPositionPersistManager begin...");
IndexMode indexMode = parameters.getIndexMode();
CanalLogPositionManager logPositionManager = null;
CanalLogPositionManager logPositionManager;
if (indexMode.isMemory()) {
logPositionManager = new MemoryLogPositionManager();
} else if (indexMode.isZookeeper()) {
logPositionManager = new ZooKeeperLogPositionManager();
((ZooKeeperLogPositionManager) logPositionManager).setZkClientx(getZkclientx());
logPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
} else if (indexMode.isMixed()) {
logPositionManager = new PeriodMixedLogPositionManager();

ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager();
zooKeeperLogPositionManager.setZkClientx(getZkclientx());
((PeriodMixedLogPositionManager) logPositionManager).setZooKeeperLogPositionManager(zooKeeperLogPositionManager);
MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager, zooKeeperLogPositionManager, 1000L);
} else if (indexMode.isMeta()) {
logPositionManager = new MetaLogPositionManager();
((MetaLogPositionManager) logPositionManager).setMetaManager(metaManager);
logPositionManager = new MetaLogPositionManager(metaManager);
} else if (indexMode.isMemoryMetaFailback()) {
MemoryLogPositionManager primaryLogPositionManager = new MemoryLogPositionManager();
MetaLogPositionManager failbackLogPositionManager = new MetaLogPositionManager();
failbackLogPositionManager.setMetaManager(metaManager);
MemoryLogPositionManager primary = new MemoryLogPositionManager();
MetaLogPositionManager secondary = new MetaLogPositionManager(metaManager);

logPositionManager = new FailbackLogPositionManager();
((FailbackLogPositionManager) logPositionManager).setPrimary(primaryLogPositionManager);
((FailbackLogPositionManager) logPositionManager).setFailback(failbackLogPositionManager);
logPositionManager = new FailbackLogPositionManager(primary, secondary);
} else {
throw new CanalException("unsupport indexMode for " + indexMode);
}
Expand Down
10 changes: 5 additions & 5 deletions instance/spring/src/test/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@
<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
<constructor-arg ref="metaManager"/>
</bean>
</property>
</constructor-arg>
</bean>
</property>

Expand Down
14 changes: 8 additions & 6 deletions instance/spring/src/test/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,20 @@
<!-- 解析编码 -->
<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />

<!-- 解析位点记录 -->
<property name="logPositionManager">
<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
<property name="primary">
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
</property>
<property name="failback">
</constructor-arg>
<constructor-arg>
<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
<property name="metaManager" ref="metaManager" />
<constructor-arg>
<ref bean="metaManager"/>
</constructor-arg>
</bean>
</property>
</constructor-arg>
</bean>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
Expand All @@ -22,7 +23,6 @@
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
Expand All @@ -44,7 +44,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

protected CanalLogPositionManager logPositionManager = null;
protected CanalLogPositionManager logPositionManager = null;
protected CanalEventSink<List<CanalEntry.Entry>> eventSink = null;
protected CanalEventFilter eventFilter = null;
protected CanalEventFilter eventBlackFilter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import java.io.IOException;

import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import org.apache.commons.lang.StringUtils;

import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogPosition;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.alibaba.otter.canal.parse.index;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;

/**
* Created by yinxiu on 17/3/17.
* Email: [email protected]
*/
public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
import com.alibaba.otter.canal.protocol.position.LogPosition;

/**
* 接口组合
*
* @author jianghang 2012-7-7 上午10:02:02
* @version 1.0.0
* Created by yinxiu on 17/3/17.
* Email: [email protected]
*/
public interface CanalLogPositionManager extends CanalLifeCycle {

LogPosition getLatestIndexBy(String destination);

void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException;

}
Original file line number Diff line number Diff line change
@@ -1,74 +1,81 @@
package com.alibaba.otter.canal.parse.index;

import org.springframework.util.Assert;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by yinxiu on 17/3/18.
* Email: [email protected]
*
* 实现基于failover查找的机制完成meta的操作
*
*
* <pre>
* 应用场景:比如针对内存buffer,出现HA切换,先尝试从内存buffer区中找到lastest position,如果不存在才尝试找一下meta里消费的信息
* </pre>
*
* @author jianghang 2012-7-20 下午02:33:20
*/
public class FailbackLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
public class FailbackLogPositionManager extends AbstractLogPositionManager {

private final static Logger logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);

private final CanalLogPositionManager primary;
private final CanalLogPositionManager secondary;

public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary) {
if (primary == null) {
throw new NullPointerException("nul primary LogPositionManager");
}
if (secondary == null) {
throw new NullPointerException("nul secondary LogPositionManager");
}

private CanalLogPositionManager primary;
private CanalLogPositionManager failback;
this.primary = primary;
this.secondary = secondary;
}

@Override
public void start() {
super.start();
Assert.notNull(primary);
Assert.notNull(failback);

if (!primary.isStart()) {
primary.start();
}

if (!failback.isStart()) {
failback.start();
if (!secondary.isStart()) {
primary.start();
}
}

@Override
public void stop() {
super.stop();

if (primary.isStart()) {
primary.stop();
if (secondary.isStart()) {
secondary.stop();
}

if (failback.isStart()) {
failback.stop();
if (primary.isStart()) {
primary.stop();
}
}

@Override
public LogPosition getLatestIndexBy(String destination) {
LogPosition logPosition = primary.getLatestIndexBy(destination);
if (logPosition == null) {
return failback.getLatestIndexBy(destination);
} else {
if (logPosition != null) {
return logPosition;
}
return secondary.getLatestIndexBy(destination);
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
try {
primary.persistLogPosition(destination, logPosition);
} catch (CanalParseException e) {
failback.persistLogPosition(destination, logPosition);
logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}", destination, logPosition, e);
secondary.persistLogPosition(destination, logPosition);
}
}

public void setPrimary(CanalLogPositionManager primary) {
this.primary = primary;
}

public void setFailback(CanalLogPositionManager failback) {
this.failback = failback;
}

}
Loading

0 comments on commit a660497

Please sign in to comment.