Skip to content

Commit

Permalink
fixed issue alibaba#2258 , optimizer kafka send perf
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Sep 30, 2019
1 parent c634f98 commit 0fb717a
Show file tree
Hide file tree
Showing 10 changed files with 602 additions and 364 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.alibaba.otter.canal.common.utils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 多线程执行器模板代码,otter中好多地方都写多线程,比较多的都是重复的逻辑代码,抽象一下做个模板把
*
* <pre>
* 示例代码:
* ExecutorTemplate template = new ExecutorTemplate(executor);
* ...
* try {
* for ( ....) {
* template.submit(new Runnable() {})
* }
*
* List<?> result = template.waitForResult();
* // do result
* } finally {
* template.clear();
* }
*
* 注意:该模板工程,不支持多业务并发调用,会出现数据混乱
* </pre>
*/
public class ExecutorTemplate {

private volatile ThreadPoolExecutor executor = null;
private volatile List<Future> futures = null;

public ExecutorTemplate(ThreadPoolExecutor executor){
this.futures = Collections.synchronizedList(new ArrayList<Future>());
this.executor = executor;
}

public void submit(Runnable task) {
Future future = executor.submit(task, null);
futures.add(future);
check(future);
}

public void submit(Callable<Exception> task) {
Future future = executor.submit(task);
futures.add(future);
check(future);
}

private void check(Future future) {
if (future.isDone()) {
// 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对有异常时快速响应,而不是等跑完所有的才抛异常
try {
future.get();
} catch (Throwable e) {
// 取消完之后立马退出
cacelAllFutures();
throw new RuntimeException(e);
}
}
}

public synchronized List<?> waitForResult() {
List result = new ArrayList();
RuntimeException exception = null;

for (Future future : futures) {
try {
result.add(future.get());
} catch (Throwable e) {
exception = new RuntimeException(e);
// 如何一个future出现了异常,就退出
break;
}
}

if (exception != null) {
cacelAllFutures();
throw exception;
} else {
return result;
}
}

public void cacelAllFutures() {
for (Future future : futures) {
if (!future.isDone() && !future.isCancelled()) {
future.cancel(true);
}
}
}

public void clear() {
futures.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.deployer.admin.CanalAdminController;
import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
import com.alibaba.otter.canal.rabbitmq.CanalRabbitMQProducer;
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
import com.alibaba.otter.canal.server.CanalMQStarter;
import com.alibaba.otter.canal.spi.CanalMQProducer;

Expand Down Expand Up @@ -69,11 +69,15 @@ public synchronized void start() throws Throwable {
canalMQProducer = new CanalRabbitMQProducer();
}

MQProperties mqProperties = null;
if (canalMQProducer != null) {
mqProperties = buildMQProperties(properties);
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
if (mqProperties.getFlatMessage()) {
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
}

logger.info("## start the canal server.");
Expand All @@ -99,7 +103,6 @@ public void run() {

if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(mqProperties, destinations);
controller.setCanalMQStarter(canalMQStarter);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.alibaba.otter.canal.common;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.otter.canal.spi.CanalMQProducer;

/**
* @author agapple 2019年9月29日 上午11:17:11
* @since 1.1.5
*/
public abstract class AbstractMQProducer implements CanalMQProducer {

protected ThreadPoolExecutor executor;

@Override
public void init(MQProperties mqProperties) {
int parallelThreadSize = mqProperties.getParallelThreadSize();
executor = new ThreadPoolExecutor(parallelThreadSize,
parallelThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(parallelThreadSize * 2),
new ThreadPoolExecutor.CallerRunsPolicy());

}

@Override
public void stop() {
executor.shutdownNow();
executor = null;
}

}
Loading

0 comments on commit 0fb717a

Please sign in to comment.