Skip to content

Commit

Permalink
fixed code format & merge
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Apr 19, 2021
1 parent 75174a6 commit 1369418
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 61 deletions.
10 changes: 9 additions & 1 deletion admin/admin-web/src/main/resources/canal-template.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
Expand Down Expand Up @@ -99,6 +102,9 @@ canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
Expand Down Expand Up @@ -155,6 +161,7 @@ rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
######### RabbitMQ #############
Expand All @@ -163,4 +170,5 @@ rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.password =
rabbitmq.deliveryMode =
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import java.util.NoSuchElementException;

public class URLClassExtensionLoader extends URLClassLoader {
public URLClassExtensionLoader(URL[] urls) {

public URLClassExtensionLoader(URL[] urls){
super(urls);
}

Expand All @@ -18,9 +19,9 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
return c;
}

if (name.startsWith("java.") || name.startsWith("org.slf4j.")
|| name.startsWith("org.apache.logging")
|| name.startsWith("org.apache.commons.logging.")) {
if (name.startsWith("java.") || name.startsWith("org.slf4j.") || name.startsWith("org.apache.logging")
|| name.startsWith("org.apache.zookeeper.") || name.startsWith("org.I0Itec.zkclient.")
|| name.startsWith("org.apache.commons.logging.")) {
// || name.startsWith("org.apache.hadoop."))
// {
c = super.loadClass(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class RabbitMQConstants {
public static final String RABBITMQ_VIRTUAL_HOST = ROOT + "." + "virtual.host";
public static final String RABBITMQ_USERNAME = ROOT + "." + "username";
public static final String RABBITMQ_PASSWORD = ROOT + "." + "password";
public static final String RABBITMQ_DELIVERY_NODE = ROOT + "." + "deliveryMode";

public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ public class RabbitMQProducerConfig extends MQProperties {
private String exchange;
private String username;
private String password;
// 1:transient, 2:"persistent"
private String deliveryMode;

public String getHost() {
return host;
Expand Down Expand Up @@ -57,12 +55,4 @@ public String getPassword() {
public void setPassword(String password) {
this.password = password;
}

public String getDeliveryMode() {
return deliveryMode;
}

public void setDeliveryMode(String deliveryMode) {
this.deliveryMode = deliveryMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Properties;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,6 +26,10 @@
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* RabbitMQ Producer SPI 实现
Expand Down Expand Up @@ -103,10 +106,6 @@ private void loadRabbitMQProperties(Properties properties) {
if (!StringUtils.isEmpty(password)) {
rabbitMQProperties.setPassword(password);
}
String deliveryMode = properties.getProperty(RabbitMQConstants.RABBITMQ_DELIVERY_NODE);
if (!StringUtils.isEmpty(deliveryMode)) {
rabbitMQProperties.setDeliveryMode(deliveryMode);
}
}

@Override
Expand Down Expand Up @@ -166,9 +165,7 @@ private void sendMessage(String queueName, byte[] message) {
// tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
try {
RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
AMQP.BasicProperties properties = rabbitMQProperties != null && rabbitMQProperties.getDeliveryMode() != null
&& "2".equals(rabbitMQProperties.getDeliveryMode()) ? MessageProperties.MINIMAL_PERSISTENT_BASIC : null;
channel.basicPublish(rabbitMQProperties.getExchange(), queueName, properties, message);
channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void loadRocketMQProperties(Properties properties) {
if (!StringUtils.isEmpty(vipChannelEnabled)) {
rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
}
String tag = properties.getProperty(RocketMQConstants.ROCKETMQ_TAG);
String tag = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_TAG);
if (!StringUtils.isEmpty(tag)) {
rocketMQProperties.setTag(tag);
}
Expand Down Expand Up @@ -207,8 +207,10 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
if (dataPartition != null) {
final int index = i;
template.submit(() -> {
Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
sendMessage(data, index);
});
}
Expand All @@ -217,8 +219,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(message,
mqProperties.isFilterTransactionEntry()));
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
sendMessage(data, partition);
}
} else {
Expand Down Expand Up @@ -254,8 +257,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
SerializerFeature.WriteMapNullValue)))
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
Expand All @@ -268,8 +272,9 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
SerializerFeature.WriteMapNullValue)))
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, partition);
Expand Down
14 changes: 9 additions & 5 deletions deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
Expand All @@ -16,6 +16,10 @@ canal.metrics.pull.port = 11112
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
Expand Down Expand Up @@ -60,9 +64,9 @@ canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
Expand Down Expand Up @@ -157,7 +161,7 @@ rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = testTag
rocketmq.tag =

##################################################
######### RabbitMQ #############
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
*/
public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {

private static final int maxFullTimes = 10;
private static final int maxFullTimes = 10;
private LogEventConvert logEventConvert;
private EventTransactionBuffer transactionBuffer;
private ErosaConnection connection;
Expand All @@ -71,13 +71,13 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
private BatchEventProcessor<MessageEvent> simpleParserStage;
private BatchEventProcessor<MessageEvent> sinkStoreStage;
private LogContext logContext;

protected boolean filterDmlInsert = false;
protected boolean filterDmlUpdate = false;
protected boolean filterDmlDelete = false;
protected boolean filterDmlInsert = false;
protected boolean filterDmlUpdate = false;
protected boolean filterDmlDelete = false;

public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
EventTransactionBuffer transactionBuffer, String destination, boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
EventTransactionBuffer transactionBuffer, String destination,
boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
this.ringBufferSize = ringBufferSize;
this.parserThreadCount = parserThreadCount;
this.logEventConvert = logEventConvert;
Expand Down Expand Up @@ -106,8 +106,8 @@ public void start() {
// stage 2
this.logContext = new LogContext();
simpleParserStage = new BatchEventProcessor<>(disruptorMsgBuffer,
sequenceBarrier,
new SimpleParserStage(logContext));
sequenceBarrier,
new SimpleParserStage(logContext));
simpleParserStage.setExceptionHandler(exceptionHandler);
disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());

Expand All @@ -126,9 +126,7 @@ public void start() {

// stage 4
SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer,
sinkSequenceBarrier,
new SinkStoreStage());
sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer, sinkSequenceBarrier, new SinkStoreStage());
sinkStoreStage.setExceptionHandler(exceptionHandler);
disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());

Expand Down Expand Up @@ -278,19 +276,25 @@ public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throw
switch (eventType) {
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
needDmlParse = !filterDmlInsert;//true;
if (!filterDmlInsert) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
needDmlParse = true;
}
break;
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
case LogEvent.UPDATE_ROWS_EVENT:
tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
needDmlParse = !filterDmlUpdate;//true;
if (!filterDmlUpdate) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
needDmlParse = true;
}
break;
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
needDmlParse = !filterDmlDelete;//true;
if (!filterDmlDelete) {
tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
needDmlParse = true;
}
break;
case LogEvent.ROWS_QUERY_LOG_EVENT:
needDmlParse = true;
Expand Down Expand Up @@ -448,7 +452,7 @@ static class SimpleFatalExceptionHandler implements ExceptionHandler {

@Override
public void handleEventException(final Throwable ex, final long sequence, final Object event) {
//异常上抛,否则processEvents的逻辑会默认会mark为成功执行,有丢数据风险
// 异常上抛,否则processEvents的逻辑会默认会mark为成功执行,有丢数据风险
throw new CanalParseException(ex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.List;
import java.util.Map;

import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,6 +58,8 @@
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;

/**
Expand Down Expand Up @@ -195,9 +195,9 @@ private Entry parseMariaGTIDLogEvent(LogEvent logEvent) {
Pair.Builder builder = Pair.newBuilder();
builder.setKey("gtid");
if (logEvent instanceof MariaGtidLogEvent) {
builder.setValue(((MariaGtidLogEvent)logEvent).getGtidStr());
builder.setValue(((MariaGtidLogEvent) logEvent).getGtidStr());
} else if (logEvent instanceof MariaGtidListLogEvent) {
builder.setValue(((MariaGtidListLogEvent)logEvent).getGtidStr());
builder.setValue(((MariaGtidListLogEvent) logEvent).getGtidStr());
}
Header header = createHeader(logHeader, "", "", EventType.GTID);
return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
Expand Down Expand Up @@ -294,7 +294,6 @@ private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {

Header header = createHeader(event.getHeader(), schemaName, tableName, type);
RowChange.Builder rowChangeBuilder = RowChange.newBuilder();

rowChangeBuilder.setIsDdl(!isDml);
rowChangeBuilder.setSql(queryString);
if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
Expand Down

0 comments on commit 1369418

Please sign in to comment.