diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/OtterDownStreamHandler.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/OtterDownStreamHandler.java index 22739092..a1772c0e 100644 --- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/OtterDownStreamHandler.java +++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/canal/OtterDownStreamHandler.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,22 +62,51 @@ public class OtterDownStreamHandler extends AbstractCanalEventDownStreamHandler< private int detectingThresoldCount = 10; private int detectingExpCount = 1; // 增常趋势 private AtomicLong detectingFailedCount = new AtomicLong(0); // 检测失败的次数 - private AtomicLong detectingSuccessedCount = new AtomicLong(0); // 检测成功的次数 + private AtomicLong detectingSuccessedCount = new AtomicLong(0); + private ReentrantLock lock = new ReentrantLock(); // 检测成功的次数 public void stop() { - super.stop(); + try { + lock.lock(); + super.stop(); - if (working.compareAndSet(true, false)) { - stopDetecting(); + if (working.compareAndSet(true, false)) { + stopDetecting(); + } + } finally { + lock.unlock(); } } public List before(List events) { + /** + * 1)故障现象channel重启后,pipeline处于定位中,后台日志报超时异常。 + * 2)故障原因:OtterDownStreamHandler里面添加打印日志发现有2个OtterDownStreamHandler.scheduler在运行 + * 分析发现旧的scheduler没有被停止掉导致。原因是OtterDownStreamHandler.scheduler关闭后被AbstractEventParser.startHeartBeat拉起 + */ lastEventExecuteTime = System.currentTimeMillis();// 记录最后一条数据时间 - if (working.compareAndSet(false, true)) {// 第一次有数据时 - startDetecting(); + if (super.isStart()) { + if (working.compareAndSet(false, true)) {// 第一次有数据时 + try { + // 和stop()方法竞争锁,防止stop停止调度任务,又被本方法开启调度 + lock.lock(); + if (super.isStart()) { + // 再次判断是否为isStart(),因为并发的stop()随时可以使状态为停止态 + startDetecting(); + } else { + // 已经被上面的并发的stop()关闭掉 + working.set(false); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + + } + } return super.before(events);