From 5d474972002474b0cbe79ab327530b69293c07e8 Mon Sep 17 00:00:00 2001 From: block Date: Wed, 8 Apr 2020 11:10:11 +0800 Subject: [PATCH] add shared raft timer, default false (#414) * add shared raft timer, default false * more robust * add shared scheduler * clear necessary code * minor change * fix unit test * add channel init low/high write buf water mark for bolt impl * minor fix * minor fix for CR --- .../jraft/ThreadPoolMetricsSignalHandler.java | 8 +- .../com/alipay/sofa/jraft/core/NodeImpl.java | 39 +-- .../alipay/sofa/jraft/core/Replicator.java | 2 +- .../com/alipay/sofa/jraft/core/Scheduler.java | 88 +++++++ .../alipay/sofa/jraft/core/TimerManager.java | 44 ++-- .../alipay/sofa/jraft/option/NodeOptions.java | 89 ++++++- .../jraft/option/ReplicatorGroupOptions.java | 8 +- .../sofa/jraft/option/ReplicatorOptions.java | 7 +- .../jraft/option/SnapshotCopierOptions.java | 12 +- .../sofa/jraft/rpc/RaftClientService.java | 52 ++-- .../sofa/jraft/rpc/impl/BoltRpcClient.java | 20 +- ...ice.java => DefaultRaftClientService.java} | 4 +- .../jraft/storage/impl/LogManagerImpl.java | 7 +- .../storage/snapshot/remote/CopySession.java | 6 +- .../snapshot/remote/RemoteFileCopier.java | 4 +- .../util/LogScheduledThreadPoolExecutor.java | 94 +++++++ .../jraft/util/LogThreadPoolExecutor.java | 2 + .../MetricScheduledThreadPoolExecutor.java | 75 ++++++ .../jraft/util/MetricThreadPoolExecutor.java | 21 +- .../jraft/util/ThreadPoolMetricRegistry.java | 40 +++ .../sofa/jraft/util/ThreadPoolUtil.java | 109 +++++++- .../util/timer/DefaultRaftTimerFactory.java | 240 ++++++++++++++++++ .../sofa/jraft/util/timer/DefaultTimer.java | 123 +++++++++ .../jraft/util/timer/RaftTimerFactory.java | 35 +++ ...pay.sofa.jraft.util.timer.RaftTimerFactory | 1 + .../com/alipay/sofa/jraft/core/NodeTest.java | 29 ++- .../sofa/jraft/core/ReplicatorGroupTest.java | 3 +- .../sofa/jraft/core/ReplicatorTest.java | 3 +- ...java => DefaultRaftClientServiceTest.java} | 10 +- .../jraft/storage/SnapshotExecutorTest.java | 3 +- .../jraft/storage/impl/LogManagerTest.java | 16 +- .../local/LocalSnapshotCopierTest.java | 6 +- .../snapshot/remote/CopySessionTest.java | 3 +- .../snapshot/remote/RemoteFileCopierTest.java | 3 +- .../alipay/sofa/jraft/rhea/StoreEngine.java | 4 +- 35 files changed, 1044 insertions(+), 166 deletions(-) create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/core/Scheduler.java rename jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/{BoltRaftClientService.java => DefaultRaftClientService.java} (97%) create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogScheduledThreadPoolExecutor.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricScheduledThreadPoolExecutor.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolMetricRegistry.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultTimer.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java create mode 100644 jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.timer.RaftTimerFactory rename jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/{BoltRaftClientServiceTest.java => DefaultRaftClientServiceTest.java} (85%) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/ThreadPoolMetricsSignalHandler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/ThreadPoolMetricsSignalHandler.java index 77220fa5a..6e8966795 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/ThreadPoolMetricsSignalHandler.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/ThreadPoolMetricsSignalHandler.java @@ -26,8 +26,8 @@ import com.alipay.sofa.jraft.util.FileOutputSignalHandler; import com.alipay.sofa.jraft.util.MetricReporter; -import com.alipay.sofa.jraft.util.MetricThreadPoolExecutor; import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.ThreadPoolMetricRegistry; /** * @@ -48,10 +48,10 @@ public void handle(final String signalName) { LOG.info("Printing thread pools metrics with signal: {} to file: {}.", signalName, file); try (final PrintStream out = new PrintStream(new FileOutputStream(file, true))) { - final MetricReporter reporter = MetricReporter.forRegistry(MetricThreadPoolExecutor.metricRegistry()) // + MetricReporter.forRegistry(ThreadPoolMetricRegistry.metricRegistry()) // .outputTo(out) // - .build(); - reporter.report(); + .build() // + .report(); } } catch (final IOException e) { LOG.error("Fail to print thread pools metrics.", e); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 0fae93afd..95fdaeb4c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -94,7 +94,7 @@ import com.alipay.sofa.jraft.rpc.RpcResponseClosure; import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter; import com.alipay.sofa.jraft.rpc.RpcResponseFactory; -import com.alipay.sofa.jraft.rpc.impl.core.BoltRaftClientService; +import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService; import com.alipay.sofa.jraft.storage.LogManager; import com.alipay.sofa.jraft.storage.LogStorage; import com.alipay.sofa.jraft.storage.RaftMetaStorage; @@ -116,6 +116,7 @@ import com.alipay.sofa.jraft.util.ThreadHelper; import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.Utils; +import com.alipay.sofa.jraft.util.timer.RaftTimerFactory; import com.google.protobuf.Message; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; @@ -152,6 +153,11 @@ public class NodeImpl implements Node, RaftServerService { } } + private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader + .load( + RaftTimerFactory.class) // + .first(); + // Max retry times when applying tasks. private static final int MAX_APPLY_RETRY_TIMES = 3; @@ -194,7 +200,7 @@ public class NodeImpl implements Node, RaftServerService { private RaftClientService rpcService; private ReadOnlyService readOnlyService; /** Timers */ - private TimerManager timerManager; + private Scheduler timerManager; private RepeatedTimer electionTimer; private RepeatedTimer voteTimer; private RepeatedTimer stepDownTimer; @@ -865,15 +871,14 @@ public boolean init(final NodeOptions opts) { return false; } - this.timerManager = new TimerManager(); - if (!this.timerManager.init(this.options.getTimerPoolSize())) { - LOG.error("Fail to init timer manager."); - return false; - } + this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(), + this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool"); // Init timers final String suffix = getNodeId().toString(); - this.voteTimer = new RepeatedTimer("JRaft-VoteTimer-" + suffix, this.options.getElectionTimeoutMs()) { + String name = "JRaft-VoteTimer-" + suffix; + this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer( + this.options.isSharedVoteTimer(), name)) { @Override protected void onTrigger() { @@ -885,7 +890,9 @@ protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } }; - this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer-" + suffix, this.options.getElectionTimeoutMs()) { + name = "JRaft-ElectionTimer-" + suffix; + this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), + TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) { @Override protected void onTrigger() { @@ -897,16 +904,18 @@ protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } }; - this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer-" + suffix, - this.options.getElectionTimeoutMs() >> 1) { + name = "JRaft-StepDownTimer-" + suffix; + this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1, + TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) { @Override protected void onTrigger() { handleStepDownTimeout(); } }; - this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer-" + suffix, - this.options.getSnapshotIntervalSecs() * 1000) { + name = "JRaft-SnapshotTimer-" + suffix; + this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000, + TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) { private volatile boolean firstSchedule = true; @@ -1000,7 +1009,7 @@ protected int adjustTimeout(final int timeoutMs) { // TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it this.replicatorGroup = new ReplicatorGroupImpl(); - this.rpcService = new BoltRaftClientService(this.replicatorGroup); + this.rpcService = new DefaultRaftClientService(this.replicatorGroup); final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs())); rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs()); @@ -2300,7 +2309,7 @@ public NodeOptions getOptions() { return this.options; } - public TimerManager getTimerManager() { + public Scheduler getTimerManager() { return this.timerManager; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 0d5ee5068..0de843eae 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -109,7 +109,7 @@ public class Replicator implements ThreadId.OnError { private ScheduledFuture heartbeatTimer; private volatile SnapshotReader reader; private CatchUpClosure catchUpClosure; - private final TimerManager timerManager; + private final Scheduler timerManager; private final NodeMetrics nodeMetrics; private volatile State state; diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Scheduler.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Scheduler.java new file mode 100644 index 000000000..95c70d312 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Scheduler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.core; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * + * @author jiachun.fjc + */ +public interface Scheduler { + + /** + * Creates and executes a one-shot action that becomes enabled + * after the given delay. + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of + * the task and whose {@code get()} method will return + * {@code null} upon completion + * scheduled for execution + */ + ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the given + * period; that is executions will commence after + * {@code initialDelay} then {@code initialDelay+period}, then + * {@code initialDelay + 2 * period}, and so on. + * If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. If any execution of this task + * takes longer than its period, then subsequent executions + * may start late, but will not concurrently execute. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param period the period between successive executions + * @param unit the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose {@code get()} method will throw an + * exception upon cancellation + */ + ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, + final TimeUnit unit); + + /** + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose {@code get()} method will throw an + * exception upon cancellation + */ + ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, + final TimeUnit unit); + + void shutdown(); +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/TimerManager.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/TimerManager.java index 54cd8cceb..1939de650 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/TimerManager.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/TimerManager.java @@ -16,13 +16,12 @@ */ package com.alipay.sofa.jraft.core; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import com.alipay.sofa.jraft.Lifecycle; import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; /** * The global timer manager. @@ -31,45 +30,42 @@ * * 2018-Mar-30 3:24:34 PM */ -public class TimerManager implements Lifecycle { +public class TimerManager implements Scheduler { - private ScheduledExecutorService executor; + private final ScheduledExecutorService executor; - @Override - public boolean init(Integer coreSize) { - this.executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory( - "JRaft-Node-ScheduleThreadPool-", true)); - return true; - } - - @Override - public void shutdown() { - if (this.executor != null) { - this.executor.shutdownNow(); - this.executor = null; - } + public TimerManager(int workerNum) { + this(workerNum, "JRaft-Node-ScheduleThreadPool"); } - private void checkStarted() { - if (this.executor == null) { - throw new IllegalStateException("Please init timer manager."); - } + public TimerManager(int workerNum, String name) { + this.executor = ThreadPoolUtil.newScheduledBuilder() // + .poolName(name) // + .coreThreads(workerNum) // + .enableMetric(true) // + .threadFactory(new NamedThreadFactory(name, true)) // + .build(); } + @Override public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { - checkStarted(); return this.executor.schedule(command, delay, unit); } + @Override public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { - checkStarted(); return this.executor.scheduleAtFixedRate(command, initialDelay, period, unit); } + @Override public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { - checkStarted(); return this.executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); } + + @Override + public void shutdown() { + this.executor.shutdownNow(); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index fc04362da..0d1d756e5 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -118,6 +118,10 @@ public class NodeOptions extends RpcOptions implements Copiable { // Default: false private boolean disableCli = false; + /** + * Whether use global timer pool, if true, the {@code timerPoolSize} will be invalid. + */ + private boolean sharedTimerPool = false; /** * Timer manager thread pool size */ @@ -142,6 +146,23 @@ public class NodeOptions extends RpcOptions implements Copiable { */ private SnapshotThrottle snapshotThrottle; + /** + * Whether use global election timer + */ + private boolean sharedElectionTimer = false; + /** + * Whether use global vote timer + */ + private boolean sharedVoteTimer = false; + /** + * Whether use global step down timer + */ + private boolean sharedStepDownTimer = false; + /** + * Whether use global snapshot timer + */ + private boolean sharedSnapshotTimer = false; + /** * Custom service factory. */ @@ -192,6 +213,14 @@ public void setRaftRpcThreadPoolSize(final int raftRpcThreadPoolSize) { this.raftRpcThreadPoolSize = raftRpcThreadPoolSize; } + public boolean isSharedTimerPool() { + return sharedTimerPool; + } + + public void setSharedTimerPool(boolean sharedTimerPool) { + this.sharedTimerPool = sharedTimerPool; + } + public int getTimerPoolSize() { return this.timerPoolSize; } @@ -340,6 +369,38 @@ public void setDisableCli(final boolean disableCli) { this.disableCli = disableCli; } + public boolean isSharedElectionTimer() { + return sharedElectionTimer; + } + + public void setSharedElectionTimer(boolean sharedElectionTimer) { + this.sharedElectionTimer = sharedElectionTimer; + } + + public boolean isSharedVoteTimer() { + return sharedVoteTimer; + } + + public void setSharedVoteTimer(boolean sharedVoteTimer) { + this.sharedVoteTimer = sharedVoteTimer; + } + + public boolean isSharedStepDownTimer() { + return sharedStepDownTimer; + } + + public void setSharedStepDownTimer(boolean sharedStepDownTimer) { + this.sharedStepDownTimer = sharedStepDownTimer; + } + + public boolean isSharedSnapshotTimer() { + return sharedSnapshotTimer; + } + + public void setSharedSnapshotTimer(boolean sharedSnapshotTimer) { + this.sharedSnapshotTimer = sharedSnapshotTimer; + } + @Override public NodeOptions copy() { final NodeOptions nodeOptions = new NodeOptions(); @@ -351,26 +412,32 @@ public NodeOptions copy() { nodeOptions.setCatchupMargin(this.catchupMargin); nodeOptions.setFilterBeforeCopyRemote(this.filterBeforeCopyRemote); nodeOptions.setDisableCli(this.disableCli); + nodeOptions.setSharedTimerPool(this.sharedTimerPool); nodeOptions.setTimerPoolSize(this.timerPoolSize); nodeOptions.setCliRpcThreadPoolSize(this.cliRpcThreadPoolSize); nodeOptions.setRaftRpcThreadPoolSize(this.raftRpcThreadPoolSize); nodeOptions.setEnableMetrics(this.enableMetrics); nodeOptions.setRaftOptions(this.raftOptions == null ? new RaftOptions() : this.raftOptions.copy()); + nodeOptions.setSharedElectionTimer(this.sharedElectionTimer); + nodeOptions.setSharedVoteTimer(this.sharedVoteTimer); + nodeOptions.setSharedStepDownTimer(this.sharedStepDownTimer); + nodeOptions.setSharedSnapshotTimer(this.sharedSnapshotTimer); return nodeOptions; } @Override public String toString() { - return "NodeOptions [electionTimeoutMs=" + this.electionTimeoutMs + ", leaderLeaseTimeRatio=" - + this.leaderLeaseTimeRatio + ", snapshotIntervalSecs=" + this.snapshotIntervalSecs - + ", snapshotLogIndexMargin=" + this.snapshotLogIndexMargin + ", catchupMargin=" + this.catchupMargin - + ", initialConf=" + this.initialConf + ", fsm=" + this.fsm + ", logUri=" + this.logUri - + ", raftMetaUri=" + this.raftMetaUri + ", snapshotUri=" + this.snapshotUri - + ", filterBeforeCopyRemote=" + this.filterBeforeCopyRemote + ", disableCli=" + this.disableCli - + ", timerPoolSize=" + this.timerPoolSize + ", cliRpcThreadPoolSize=" + this.cliRpcThreadPoolSize - + ", raftRpcThreadPoolSize=" + this.raftRpcThreadPoolSize + ", enableMetrics=" + this.enableMetrics - + ", snapshotThrottle=" + this.snapshotThrottle + ", serviceFactory=" + this.serviceFactory - + ", electionPriority=" + this.electionPriority + ", decayPriorityGap=" + this.decayPriorityGap - + ", raftOptions=" + this.raftOptions + "]"; + return "NodeOptions{" + "electionTimeoutMs=" + electionTimeoutMs + ", electionPriority=" + electionPriority + + ", decayPriorityGap=" + decayPriorityGap + ", leaderLeaseTimeRatio=" + leaderLeaseTimeRatio + + ", snapshotIntervalSecs=" + snapshotIntervalSecs + ", snapshotLogIndexMargin=" + + snapshotLogIndexMargin + ", catchupMargin=" + catchupMargin + ", initialConf=" + initialConf + + ", fsm=" + fsm + ", logUri='" + logUri + '\'' + ", raftMetaUri='" + raftMetaUri + '\'' + + ", snapshotUri='" + snapshotUri + '\'' + ", filterBeforeCopyRemote=" + filterBeforeCopyRemote + + ", disableCli=" + disableCli + ", sharedTimerPool=" + sharedTimerPool + ", timerPoolSize=" + + timerPoolSize + ", cliRpcThreadPoolSize=" + cliRpcThreadPoolSize + ", raftRpcThreadPoolSize=" + + raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle + + ", sharedElectionTimer=" + sharedElectionTimer + ", sharedVoteTimer=" + sharedVoteTimer + + ", sharedStepDownTimer=" + sharedStepDownTimer + ", sharedSnapshotTimer=" + sharedSnapshotTimer + + ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString(); } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorGroupOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorGroupOptions.java index aba4eb036..25c81af0d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorGroupOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorGroupOptions.java @@ -18,7 +18,7 @@ import com.alipay.sofa.jraft.core.BallotBox; import com.alipay.sofa.jraft.core.NodeImpl; -import com.alipay.sofa.jraft.core.TimerManager; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.rpc.RaftClientService; import com.alipay.sofa.jraft.storage.LogManager; import com.alipay.sofa.jraft.storage.SnapshotStorage; @@ -40,13 +40,13 @@ public class ReplicatorGroupOptions { private SnapshotStorage snapshotStorage; private RaftClientService raftRpcClientService; private RaftOptions raftOptions; - private TimerManager timerManager; + private Scheduler timerManager; - public TimerManager getTimerManager() { + public Scheduler getTimerManager() { return this.timerManager; } - public void setTimerManager(TimerManager timerManager) { + public void setTimerManager(Scheduler timerManager) { this.timerManager = timerManager; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorOptions.java index 42ee9186f..c19fa301b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/ReplicatorOptions.java @@ -19,6 +19,7 @@ import com.alipay.sofa.jraft.core.BallotBox; import com.alipay.sofa.jraft.core.NodeImpl; import com.alipay.sofa.jraft.core.ReplicatorType; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.core.TimerManager; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RaftClientService; @@ -46,7 +47,7 @@ public class ReplicatorOptions implements Copiable { private long term; private SnapshotStorage snapshotStorage; private RaftClientService raftRpcService; - private TimerManager timerManager; + private Scheduler timerManager; private ReplicatorType replicatorType; public ReplicatorOptions() { @@ -113,11 +114,11 @@ public ReplicatorOptions copy() { return replicatorOptions; } - public TimerManager getTimerManager() { + public Scheduler getTimerManager() { return this.timerManager; } - public void setTimerManager(final TimerManager timerManager) { + public void setTimerManager(final Scheduler timerManager) { this.timerManager = timerManager; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java index 56fbd10fb..220ca9f8c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java @@ -16,7 +16,7 @@ */ package com.alipay.sofa.jraft.option; -import com.alipay.sofa.jraft.core.TimerManager; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.rpc.RaftClientService; /** @@ -29,7 +29,7 @@ public class SnapshotCopierOptions { private RaftClientService raftClientService; - private TimerManager timerManager; + private Scheduler timerManager; private RaftOptions raftOptions; private NodeOptions nodeOptions; @@ -37,8 +37,8 @@ public SnapshotCopierOptions() { super(); } - public SnapshotCopierOptions(RaftClientService raftClientService, TimerManager timerManager, - RaftOptions raftOptions, NodeOptions nodeOptions) { + public SnapshotCopierOptions(RaftClientService raftClientService, Scheduler timerManager, RaftOptions raftOptions, + NodeOptions nodeOptions) { super(); this.raftClientService = raftClientService; this.timerManager = timerManager; @@ -62,11 +62,11 @@ public void setRaftClientService(RaftClientService raftClientService) { this.raftClientService = raftClientService; } - public TimerManager getTimerManager() { + public Scheduler getTimerManager() { return this.timerManager; } - public void setTimerManager(TimerManager timerManager) { + public void setTimerManager(Scheduler timerManager) { this.timerManager = timerManager; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RaftClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RaftClientService.java index 7e411774e..44e4b813a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RaftClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RaftClientService.java @@ -33,46 +33,46 @@ public interface RaftClientService extends ClientService { /** * Sends a pre-vote request and handle the response with done. * - * @param endpoint destination address (ip, port) - * @param request request data - * @param done callback + * @param endpoint destination address (ip, port) + * @param request request data + * @param done callback * @return a future with result */ - Future preVote(Endpoint endpoint, RpcRequests.RequestVoteRequest request, - RpcResponseClosure done); + Future preVote(final Endpoint endpoint, final RpcRequests.RequestVoteRequest request, + final RpcResponseClosure done); /** * Sends a request-vote request and handle the response with done. * - * @param endpoint destination address (ip, port) - * @param request request data - * @param done callback + * @param endpoint destination address (ip, port) + * @param request request data + * @param done callback * @return a future with result */ - Future requestVote(Endpoint endpoint, RpcRequests.RequestVoteRequest request, - RpcResponseClosure done); + Future requestVote(final Endpoint endpoint, final RpcRequests.RequestVoteRequest request, + final RpcResponseClosure done); /** * Sends a append-entries request and handle the response with done. * - * @param endpoint destination address (ip, port) - * @param request request data - * @param done callback + * @param endpoint destination address (ip, port) + * @param request request data + * @param done callback * @return a future with result */ - Future appendEntries(Endpoint endpoint, RpcRequests.AppendEntriesRequest request, int timeoutMs, - RpcResponseClosure done); + Future appendEntries(final Endpoint endpoint, final RpcRequests.AppendEntriesRequest request, + final int timeoutMs, final RpcResponseClosure done); /** * Sends a install-snapshot request and handle the response with done. * - * @param endpoint destination address (ip, port) - * @param request request data - * @param done callback + * @param endpoint destination address (ip, port) + * @param request request data + * @param done callback * @return a future result */ - Future installSnapshot(Endpoint endpoint, RpcRequests.InstallSnapshotRequest request, - RpcResponseClosure done); + Future installSnapshot(final Endpoint endpoint, final RpcRequests.InstallSnapshotRequest request, + final RpcResponseClosure done); /** * Get a piece of file data by GetFileRequest, and handle the response with done. @@ -83,8 +83,8 @@ Future installSnapshot(Endpoint endpoint, RpcRequests.InstallSnapshotRe * @param done callback * @return a future result */ - Future getFile(Endpoint endpoint, RpcRequests.GetFileRequest request, int timeoutMs, - RpcResponseClosure done); + Future getFile(final Endpoint endpoint, final RpcRequests.GetFileRequest request, final int timeoutMs, + final RpcResponseClosure done); /** * Send a timeout-now request and handle the response with done. @@ -95,8 +95,8 @@ Future getFile(Endpoint endpoint, RpcRequests.GetFileRequest request, i * @param done callback * @return a future result */ - Future timeoutNow(Endpoint endpoint, RpcRequests.TimeoutNowRequest request, int timeoutMs, - RpcResponseClosure done); + Future timeoutNow(final Endpoint endpoint, final RpcRequests.TimeoutNowRequest request, + final int timeoutMs, final RpcResponseClosure done); /** * Send a read-index request and handle the response with done. @@ -107,6 +107,6 @@ Future timeoutNow(Endpoint endpoint, RpcRequests.TimeoutNowRequest requ * @param done callback * @return a future result */ - Future readIndex(Endpoint endpoint, RpcRequests.ReadIndexRequest request, int timeoutMs, - RpcResponseClosure done); + Future readIndex(final Endpoint endpoint, final RpcRequests.ReadIndexRequest request, final int timeoutMs, + final RpcResponseClosure done); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java index cbab1b373..68884df9c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/BoltRpcClient.java @@ -34,6 +34,7 @@ import com.alipay.sofa.jraft.rpc.impl.core.ClientServiceConnectionEventProcessor; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; /** * Bolt rpc client impl. @@ -42,14 +43,22 @@ */ public class BoltRpcClient implements RpcClient { - public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER"; - public static final String BOLT_CTX = "BOLT_CTX"; - public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY"; + public static final String BOLT_ADDRESS_PARSER = "BOLT_ADDRESS_PARSER"; + public static final String BOLT_CTX = "BOLT_CTX"; + public static final String BOLT_REJECTED_EXECUTION_POLICY = "BOLT_REJECTED_EXECUTION_POLICY"; - private final com.alipay.remoting.rpc.RpcClient rpcClient; + private static final int CHANNEL_WRITE_BUF_LOW_WATER_MARK = SystemPropertyUtil + .getInt( + "bolt.channel_write_buf_low_water_mark", + 256 * 1024); + private static final int CHANNEL_WRITE_BUF_HIGH_WATER_MARK = SystemPropertyUtil + .getInt( + "bolt.channel_write_buf_high_water_mark", + 512 * 1024); + private final com.alipay.remoting.rpc.RpcClient rpcClient; private com.alipay.remoting.InvokeContext defaultInvokeCtx; - private RpcAddressParser defaultAddressParser = new RpcAddressParser(); + private RpcAddressParser defaultAddressParser = new RpcAddressParser(); public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) { this.rpcClient = Requires.requireNonNull(rpcClient, "rpcClient"); @@ -58,6 +67,7 @@ public BoltRpcClient(com.alipay.remoting.rpc.RpcClient rpcClient) { @Override public boolean init(final RpcOptions opts) { this.rpcClient.switches().turnOn(GlobalSwitch.CODEC_FLUSH_CONSOLIDATION); + this.rpcClient.initWriteBufferWaterMark(CHANNEL_WRITE_BUF_LOW_WATER_MARK, CHANNEL_WRITE_BUF_HIGH_WATER_MARK); this.rpcClient.startup(); return true; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java similarity index 97% rename from jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java rename to jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java index 142b02316..5dd544f32 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java @@ -53,7 +53,7 @@ * @author boyan (boyan@alibaba-inc.com) * @author jiachun.fjc */ -public class BoltRaftClientService extends AbstractClientService implements RaftClientService { +public class DefaultRaftClientService extends AbstractClientService implements RaftClientService { private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = DefaultFixedThreadsExecutorGroupFactory.INSTANCE .newExecutorGroup( @@ -73,7 +73,7 @@ protected void configRpcClient(final RpcClient rpcClient) { rpcClient.registerConnectEventListener(this.rgGroup); } - public BoltRaftClientService(final ReplicatorGroup rgGroup) { + public DefaultRaftClientService(final ReplicatorGroup rgGroup) { this.rgGroup = rgGroup; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java index c05c538a2..87319898b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java @@ -253,7 +253,6 @@ public void shutdown() { private void clearMemoryLogs(final LogId id) { this.writeLock.lock(); try { - this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0); } finally { this.writeLock.unlock(); @@ -524,7 +523,7 @@ public void onEvent(final StableClosureEvent event, final long sequence, final b long startMs = Utils.monotonicMs(); try { final TruncatePrefixClosure tpc = (TruncatePrefixClosure) done; - LOG.debug("Truncating storage to firstIndexKept={}", tpc.firstIndexKept); + LOG.debug("Truncating storage to firstIndexKept={}.", tpc.firstIndexKept); ret = LogManagerImpl.this.logStorage.truncatePrefix(tpc.firstIndexKept); } finally { LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-prefix", Utils.monotonicMs() @@ -535,7 +534,7 @@ public void onEvent(final StableClosureEvent event, final long sequence, final b startMs = Utils.monotonicMs(); try { final TruncateSuffixClosure tsc = (TruncateSuffixClosure) done; - LOG.warn("Truncating storage to lastIndexKept={}", tsc.lastIndexKept); + LOG.warn("Truncating storage to lastIndexKept={}.", tsc.lastIndexKept); ret = LogManagerImpl.this.logStorage.truncateSuffix(tsc.lastIndexKept); if (ret) { this.lastId.setIndex(tsc.lastIndexKept); @@ -549,7 +548,7 @@ public void onEvent(final StableClosureEvent event, final long sequence, final b break; case RESET: final ResetClosure rc = (ResetClosure) done; - LOG.info("Reseting storage to nextLogIndex={}", rc.nextLogIndex); + LOG.info("Resetting storage to nextLogIndex={}.", rc.nextLogIndex); ret = LogManagerImpl.this.logStorage.reset(rc.nextLogIndex); break; default: diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySession.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySession.java index 4cca27445..c9e4fe533 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySession.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySession.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.core.TimerManager; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.CopyOptions; import com.alipay.sofa.jraft.option.RaftOptions; @@ -67,7 +67,7 @@ public class CopySession implements Session { private final RaftClientService rpcService; private final GetFileRequest.Builder requestBuilder; private final Endpoint endpoint; - private final TimerManager timerManager; + private final Scheduler timerManager; private final SnapshotThrottle snapshotThrottle; private final RaftOptions raftOptions; private int retryTimes = 0; @@ -123,7 +123,7 @@ public void close() throws IOException { } } - public CopySession(final RaftClientService rpcService, final TimerManager timerManager, + public CopySession(final RaftClientService rpcService, final Scheduler timerManager, final SnapshotThrottle snapshotThrottle, final RaftOptions raftOptions, final GetFileRequest.Builder rb, final Endpoint ep) { super(); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java index 51a273d50..b8614e2e0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alipay.sofa.jraft.core.TimerManager; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.option.CopyOptions; import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.SnapshotCopierOptions; @@ -52,7 +52,7 @@ public class RemoteFileCopier { private RaftClientService rpcService; private Endpoint endpoint; private RaftOptions raftOptions; - private TimerManager timerManager; + private Scheduler timerManager; private SnapshotThrottle snapshotThrottle; @OnlyForTest diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogScheduledThreadPoolExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogScheduledThreadPoolExecutor.java new file mode 100644 index 000000000..4f6b5f13a --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogScheduledThreadPoolExecutor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link java.util.concurrent.ThreadPoolExecutor} that can additionally + * schedule commands to run after a given delay with a logger witch can print + * error message for failed execution. + * + * @author jiachun.fjc + */ +public class LogScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(LogScheduledThreadPoolExecutor.class); + + private final String name; + + public LogScheduledThreadPoolExecutor(int corePoolSize, String name) { + super(corePoolSize); + this.name = name; + } + + public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, String name) { + super(corePoolSize, threadFactory); + this.name = name; + } + + public LogScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, String name) { + super(corePoolSize, handler); + this.name = name; + } + + public LogScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler, String name) { + super(corePoolSize, threadFactory, handler); + this.name = name; + } + + public String getName() { + return name; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future) { + try { + final Future f = (Future) r; + if (f.isDone()) { + f.get(); + } + } catch (final CancellationException ce) { + // ignored + } catch (final ExecutionException ee) { + t = ee.getCause(); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); // ignore/reset + } + } + if (t != null) { + LOG.error("Uncaught exception in pool: {}, {}.", this.name, super.toString(), t); + } + } + + @Override + protected void terminated() { + super.terminated(); + LOG.info("ThreadPool is terminated: {}, {}.", this.name, super.toString()); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogThreadPoolExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogThreadPoolExecutor.java index aeda46d13..4b0164ed1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogThreadPoolExecutor.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/LogThreadPoolExecutor.java @@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory; /** + * A {@link java.util.concurrent.ExecutorService} that witch can print + * error message for failed execution. * * @author jiachun.fjc */ diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricScheduledThreadPoolExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricScheduledThreadPoolExecutor.java new file mode 100644 index 000000000..5dbc2897e --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricScheduledThreadPoolExecutor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; + +import com.codahale.metrics.Timer; + +/** + * A {@link java.util.concurrent.ThreadPoolExecutor} that can additionally + * schedule commands to run after a given delay with a timer metric + * which aggregates timing durations and provides duration statistics. + * + * @author jiachun.fjc + */ +public class MetricScheduledThreadPoolExecutor extends LogScheduledThreadPoolExecutor { + + public MetricScheduledThreadPoolExecutor(int corePoolSize, String name) { + super(corePoolSize, name); + } + + public MetricScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, String name) { + super(corePoolSize, threadFactory, name); + } + + public MetricScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler, String name) { + super(corePoolSize, handler, name); + } + + public MetricScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler, String name) { + super(corePoolSize, threadFactory, handler, name); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + try { + ThreadPoolMetricRegistry.timerThreadLocal() // + .set(ThreadPoolMetricRegistry.metricRegistry().timer("scheduledThreadPool." + getName()).time()); + } catch (final Throwable ignored) { + // ignored + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + try { + final ThreadLocal tl = ThreadPoolMetricRegistry.timerThreadLocal(); + final Timer.Context ctx = tl.get(); + if (ctx != null) { + ctx.stop(); + tl.remove(); + } + } catch (final Throwable ignored) { + // ignored + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricThreadPoolExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricThreadPoolExecutor.java index 8643b6337..87e1d39f7 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricThreadPoolExecutor.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/MetricThreadPoolExecutor.java @@ -21,18 +21,16 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; /** + * A {@link java.util.concurrent.ExecutorService} that with a timer metric + * which aggregates timing durations and provides duration statistics. * * @author jiachun.fjc */ public class MetricThreadPoolExecutor extends LogThreadPoolExecutor { - private static final MetricRegistry metricRegistry = new MetricRegistry(); - private static final ThreadLocal timerThreadLocal = new ThreadLocal<>(); - public MetricThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, String name) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, name); @@ -54,18 +52,12 @@ public MetricThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keep super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, name); } - /** - * Return the global registry of metric instances. - */ - public static MetricRegistry metricRegistry() { - return metricRegistry; - } - @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); try { - timerThreadLocal.set(metricRegistry().timer("threadPool." + getName()).time()); + ThreadPoolMetricRegistry.timerThreadLocal() // + .set(ThreadPoolMetricRegistry.metricRegistry().timer("threadPool." + getName()).time()); } catch (final Throwable ignored) { // ignored } @@ -75,10 +67,11 @@ protected void beforeExecute(Thread t, Runnable r) { protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); try { - final Timer.Context ctx = timerThreadLocal.get(); + final ThreadLocal tl = ThreadPoolMetricRegistry.timerThreadLocal(); + final Timer.Context ctx = tl.get(); if (ctx != null) { ctx.stop(); - timerThreadLocal.remove(); + tl.remove(); } } catch (final Throwable ignored) { // ignored diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolMetricRegistry.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolMetricRegistry.java new file mode 100644 index 000000000..f3c916b75 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolMetricRegistry.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; + +/** + * @author jiachun.fjc + */ +public class ThreadPoolMetricRegistry { + + private static final MetricRegistry metricRegistry = new MetricRegistry(); + private static final ThreadLocal timerThreadLocal = new ThreadLocal<>(); + + /** + * Return the global registry of metric instances. + */ + public static MetricRegistry metricRegistry() { + return metricRegistry; + } + + public static ThreadLocal timerThreadLocal() { + return timerThreadLocal; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolUtil.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolUtil.java index 41e8a92c0..2124fed7d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolUtil.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolUtil.java @@ -18,6 +18,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,6 +38,10 @@ public static PoolBuilder newBuilder() { return new PoolBuilder(); } + public static ScheduledPoolBuilder newScheduledBuilder() { + return new ScheduledPoolBuilder(); + } + /** * Creates a new {@code MetricThreadPoolExecutor} or {@code LogThreadPoolExecutor} * with the given initial parameters and default rejected execution handler. @@ -86,7 +91,7 @@ public static ThreadPoolExecutor newThreadPool(final String poolName, final bool * This queue will hold only the {@code Runnable} tasks submitted * by the {@code execute} method. * @param threadFactory the factory to use when the executor creates a new thread - * @param handler the handler to use when execution is blocked because the + * @param rejectedHandler the handler to use when execution is blocked because the * thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
@@ -101,14 +106,65 @@ public static ThreadPoolExecutor newThreadPool(final String poolName, final bool final long keepAliveSeconds, final BlockingQueue workQueue, final ThreadFactory threadFactory, - final RejectedExecutionHandler handler) { + final RejectedExecutionHandler rejectedHandler) { final TimeUnit unit = TimeUnit.SECONDS; if (enableMetric) { return new MetricThreadPoolExecutor(coreThreads, maximumThreads, keepAliveSeconds, unit, workQueue, - threadFactory, handler, poolName); + threadFactory, rejectedHandler, poolName); } else { return new LogThreadPoolExecutor(coreThreads, maximumThreads, keepAliveSeconds, unit, workQueue, - threadFactory, handler, poolName); + threadFactory, rejectedHandler, poolName); + } + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param poolName the name of the thread pool + * @param enableMetric if metric is enabled + * @param coreThreads the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set. + * @param threadFactory the factory to use when the executor + * creates a new thread + * + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @throws NullPointerException if {@code threadFactory} or + * {@code handler} is null + * @return a new ScheduledThreadPoolExecutor + */ + public static ScheduledThreadPoolExecutor newScheduledThreadPool(final String poolName, final boolean enableMetric, + final int coreThreads, + final ThreadFactory threadFactory) { + return newScheduledThreadPool(poolName, enableMetric, coreThreads, threadFactory, defaultHandler); + } + + /** + * Creates a new ScheduledThreadPoolExecutor with the given + * initial parameters. + * + * @param poolName the name of the thread pool + * @param enableMetric if metric is enabled + * @param coreThreads the number of threads to keep in the pool, even if they are + * idle, unless {@code allowCoreThreadTimeOut} is set. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @param rejectedHandler the handler to use when execution is blocked because the + * thread bounds and queue capacities are reached + * + * @throws IllegalArgumentException if {@code corePoolSize < 0} + * @throws NullPointerException if {@code threadFactory} or + * {@code handler} is null + * @return a new ScheduledThreadPoolExecutor + */ + public static ScheduledThreadPoolExecutor newScheduledThreadPool(final String poolName, final boolean enableMetric, + final int coreThreads, + final ThreadFactory threadFactory, + final RejectedExecutionHandler rejectedHandler) { + if (enableMetric) { + return new MetricScheduledThreadPoolExecutor(coreThreads, threadFactory, rejectedHandler, poolName); + } else { + return new LogScheduledThreadPoolExecutor(coreThreads, threadFactory, rejectedHandler, poolName); } } @@ -179,4 +235,49 @@ public ThreadPoolExecutor build() { this.maximumThreads, this.keepAliveSeconds, this.workQueue, this.threadFactory, this.handler); } } + + public static class ScheduledPoolBuilder { + private String poolName; + private Boolean enableMetric; + private Integer coreThreads; + private ThreadFactory threadFactory; + private RejectedExecutionHandler handler = ThreadPoolUtil.defaultHandler; + + public ScheduledPoolBuilder poolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public ScheduledPoolBuilder enableMetric(final Boolean enableMetric) { + this.enableMetric = enableMetric; + return this; + } + + public ScheduledPoolBuilder coreThreads(final Integer coreThreads) { + this.coreThreads = coreThreads; + return this; + } + + public ScheduledPoolBuilder threadFactory(final ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + + public ScheduledPoolBuilder rejectedHandler(final RejectedExecutionHandler handler) { + this.handler = handler; + return this; + } + + public ScheduledThreadPoolExecutor build() { + Requires.requireNonNull(this.poolName, "poolName"); + Requires.requireNonNull(this.enableMetric, "enableMetric"); + Requires.requireNonNull(this.coreThreads, "coreThreads"); + + Requires.requireNonNull(this.threadFactory, "threadFactory"); + Requires.requireNonNull(this.handler, "handler"); + + return ThreadPoolUtil.newScheduledThreadPool(this.poolName, this.enableMetric, this.coreThreads, + this.threadFactory, this.handler); + } + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java new file mode 100644 index 000000000..a4143744c --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.timer; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.alipay.sofa.jraft.core.Scheduler; +import com.alipay.sofa.jraft.core.TimerManager; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.SPI; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.Utils; + +/** + * + * @author jiachun.fjc + */ +@SPI +public class DefaultRaftTimerFactory implements RaftTimerFactory { + + private static final String GLOBAL_ELECTION_TIMER_WORKERS = "jraft.timer.global_election_timer_workers"; + private static final String GLOBAL_VOTE_TIMER_WORKERS = "jraft.timer.global_vote_timer_workers"; + private static final String GLOBAL_STEP_DOWN_TIMER_WORKERS = "jraft.timer.global_step_down_timer_workers"; + private static final String GLOBAL_SNAPSHOT_TIMER_WORKERS = "jraft.timer.global_snapshot_timer_workers"; + private static final String GLOBAL_SCHEDULER_WORKERS = "jraft.timer.global_scheduler_workers"; + + private static final TimerSharedRef ELECTION_TIMER_REF = new TimerSharedRef( + SystemPropertyUtil.getInt( + GLOBAL_ELECTION_TIMER_WORKERS, + Utils.cpus()), + "JRaft-Global-ElectionTimer"); + private static final TimerSharedRef VOTE_TIMER_REF = new TimerSharedRef( + SystemPropertyUtil.getInt( + GLOBAL_VOTE_TIMER_WORKERS, + Utils.cpus()), + "JRaft-Global-VoteTimer"); + private static final TimerSharedRef STEP_DOWN_TIMER_REF = new TimerSharedRef( + SystemPropertyUtil.getInt( + GLOBAL_STEP_DOWN_TIMER_WORKERS, + Utils.cpus()), + "JRaft-Global-StepDownTimer"); + private static final TimerSharedRef SNAPSHOT_TIMER_REF = new TimerSharedRef( + SystemPropertyUtil.getInt( + GLOBAL_SNAPSHOT_TIMER_WORKERS, + Utils.cpus()), + "JRaft-Global-SnapshotTimer"); + private static final SchedulerSharedRef SCHEDULER_REF = new SchedulerSharedRef( + SystemPropertyUtil.getInt( + GLOBAL_SCHEDULER_WORKERS, + Utils.cpus() * 3 > 20 ? 20 : Utils + .cpus() * 3), + "JRaft-Node-ScheduleThreadPool"); + + @Override + public Timer getElectionTimer(final boolean shared, final String name) { + return shared ? ELECTION_TIMER_REF.getRef() : createTimer(name); + } + + @Override + public Timer getVoteTimer(final boolean shared, final String name) { + return shared ? VOTE_TIMER_REF.getRef() : createTimer(name); + } + + @Override + public Timer getStepDownTimer(final boolean shared, final String name) { + return shared ? STEP_DOWN_TIMER_REF.getRef() : createTimer(name); + } + + @Override + public Timer getSnapshotTimer(final boolean shared, final String name) { + return shared ? SNAPSHOT_TIMER_REF.getRef() : createTimer(name); + } + + @Override + public Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name) { + return shared ? SCHEDULER_REF.getRef() : createScheduler(workerNum, name); + } + + private static Timer createTimer(final String name) { + return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048); + } + + private static Scheduler createScheduler(final int workerNum, final String name) { + return new TimerManager(workerNum, name); + } + + private static abstract class Shared { + + private AtomicInteger refCount = new AtomicInteger(0); + private AtomicBoolean started = new AtomicBoolean(true); + protected final T shared; + + protected Shared(T shared) { + this.shared = shared; + } + + public T getRef() { + if (this.started.get()) { + this.refCount.incrementAndGet(); + return current(); + } + throw new IllegalStateException("Shared shutdown"); + } + + public boolean isShutdown() { + return !this.started.get(); + } + + public abstract T current(); + + public boolean mayShutdown() { + return this.refCount.decrementAndGet() <= 0 && this.started.compareAndSet(true, false); + } + } + + private static abstract class SharedRef { + + private final int workerNum; + private final String name; + private Shared shared; + + public SharedRef(int workerNum, String name) { + this.workerNum = workerNum; + this.name = name; + } + + public synchronized T getRef() { + if (this.shared == null || this.shared.isShutdown()) { + this.shared = create(this.workerNum, this.name); + } + return this.shared.getRef(); + } + + public abstract Shared create(final int workerNum, final String name); + } + + private static class TimerSharedRef extends SharedRef { + + public TimerSharedRef(int workerNum, String name) { + super(workerNum, name); + } + + @Override + public Shared create(final int workerNum, final String name) { + return new SharedTimer(new DefaultTimer(workerNum, name)); + } + } + + private static class SharedTimer extends Shared implements Timer { + + protected SharedTimer(Timer shared) { + super(shared); + } + + @Override + public SharedTimer current() { + return this; + } + + @Override + public Timeout newTimeout(final TimerTask task, final long delay, final TimeUnit unit) { + return this.shared.newTimeout(task, delay, unit); + } + + @Override + public Set stop() { + if (mayShutdown()) { + return this.shared.stop(); + } + return Collections.emptySet(); + } + } + + private static class SchedulerSharedRef extends SharedRef { + + public SchedulerSharedRef(int workerNum, String name) { + super(workerNum, name); + } + + @Override + public Shared create(final int workerNum, final String name) { + return new SharedScheduler(new TimerManager(workerNum, name)); + } + } + + private static class SharedScheduler extends Shared implements Scheduler { + + protected SharedScheduler(Scheduler shared) { + super(shared); + } + + @Override + public Scheduler current() { + return this; + } + + @Override + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + return this.shared.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + return this.shared.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, + final long delay, final TimeUnit unit) { + return this.shared.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void shutdown() { + if (mayShutdown()) { + this.shared.shutdown(); + } + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultTimer.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultTimer.java new file mode 100644 index 000000000..d820519c5 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultTimer.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.timer; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; + +/** + * @author jiachun.fjc + */ +public class DefaultTimer implements Timer { + + private final ScheduledExecutorService scheduledExecutorService; + + public DefaultTimer(int workerNum, String name) { + this.scheduledExecutorService = ThreadPoolUtil.newScheduledBuilder() // + .coreThreads(workerNum) // + .poolName(name) // + .enableMetric(true) // + .threadFactory(new NamedThreadFactory(name, true)) // + .build(); + } + + @Override + public Timeout newTimeout(final TimerTask task, final long delay, final TimeUnit unit) { + Requires.requireNonNull(task, "task"); + Requires.requireNonNull(unit, "unit"); + + final TimeoutTask timeoutTask = new TimeoutTask(task); + final ScheduledFuture future = this.scheduledExecutorService.schedule(new TimeoutTask(task), delay, unit); + timeoutTask.setFuture(future); + return timeoutTask.getTimeout(); + } + + @Override + public Set stop() { + ExecutorServiceHelper.shutdownAndAwaitTermination(this.scheduledExecutorService); + return Collections.emptySet(); + } + + private class TimeoutTask implements Runnable { + + private final TimerTask task; + private final Timeout timeout; + private volatile ScheduledFuture future; + + private TimeoutTask(TimerTask task) { + this.task = task; + this.timeout = new Timeout() { + + @Override + public Timer timer() { + return DefaultTimer.this; + } + + @Override + public TimerTask task() { + return task; + } + + @Override + public boolean isExpired() { + return false; // never use + } + + @Override + public boolean isCancelled() { + final ScheduledFuture f = future; + return f != null && f.isCancelled(); + } + + @Override + public boolean cancel() { + final ScheduledFuture f = future; + return f != null && f.cancel(true); + } + }; + } + + public Timeout getTimeout() { + return timeout; + } + + public ScheduledFuture getFuture() { + return future; + } + + public void setFuture(ScheduledFuture future) { + this.future = future; + } + + @Override + public void run() { + try { + this.task.run(this.timeout); + } catch (final Throwable ignored) { + // never get here + } + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java new file mode 100644 index 000000000..dcb60171f --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.timer; + +import com.alipay.sofa.jraft.core.Scheduler; + +/** + * @author jiachun.fjc + */ +public interface RaftTimerFactory { + + Timer getElectionTimer(final boolean shared, final String name); + + Timer getVoteTimer(final boolean shared, final String name); + + Timer getStepDownTimer(final boolean shared, final String name); + + Timer getSnapshotTimer(final boolean shared, final String name); + + Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name); +} diff --git a/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.timer.RaftTimerFactory b/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.timer.RaftTimerFactory new file mode 100644 index 000000000..943fda242 --- /dev/null +++ b/jraft-core/src/main/resources/META-INF/services/com.alipay.sofa.jraft.util.timer.RaftTimerFactory @@ -0,0 +1 @@ +com.alipay.sofa.jraft.util.timer.DefaultRaftTimerFactory \ No newline at end of file diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index f54ee2cce..2198a6e0c 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -161,7 +161,7 @@ public void testNodeTaskOverload() throws Exception { final PeerId peer = new PeerId(addr, 0); NodeManager.getInstance().addAddress(addr); - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final RaftOptions raftOptions = new RaftOptions(); raftOptions.setDisruptorBufferSize(2); nodeOptions.setRaftOptions(raftOptions); @@ -215,7 +215,7 @@ public void testRollbackStateMachineWithReadIndex_Issue317() throws Exception { final PeerId peer = new PeerId(addr, 0); NodeManager.getInstance().addAddress(addr); - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final CountDownLatch applyCompleteLatch = new CountDownLatch(1); final CountDownLatch applyLatch = new CountDownLatch(1); final CountDownLatch readIndexLatch = new CountDownLatch(1); @@ -321,7 +321,7 @@ public void testSingleNode() throws Exception { final PeerId peer = new PeerId(addr, 0); NodeManager.getInstance().addAddress(addr); - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockStateMachine(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -609,7 +609,7 @@ public void testSingleNodeWithLearner() throws Exception { RaftGroupService learnerServer = null; { // Start learner - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); learnerFsm = new MockStateMachine(learnerAddr); nodeOptions.setFsm(learnerFsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log1"); @@ -625,7 +625,7 @@ public void testSingleNodeWithLearner() throws Exception { { // Start leader - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockStateMachine(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -2205,7 +2205,7 @@ public void testInstallSnapshot() throws Exception { public void testNoSnapshot() throws Exception { final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT); NodeManager.getInstance().addAddress(addr); - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockStateMachine(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -2237,7 +2237,7 @@ public void testNoSnapshot() throws Exception { public void testAutoSnapshot() throws Exception { final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT); NodeManager.getInstance().addAddress(addr); - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockStateMachine(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -2482,7 +2482,7 @@ public void testShutdownAndJoinWorkAfterInitFails() throws Exception { final Endpoint addr = new Endpoint(TestUtils.getMyIp(), TestUtils.INIT_PORT); NodeManager.getInstance().addAddress(addr); { - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockStateMachine(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -2504,7 +2504,7 @@ public void testShutdownAndJoinWorkAfterInitFails() throws Exception { node.join(); } { - final NodeOptions nodeOptions = new NodeOptions(); + final NodeOptions nodeOptions = createNodeOptionsWithSharedTimer(); final MockStateMachine fsm = new MockFSM1(addr); nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); @@ -2866,7 +2866,7 @@ public void testBootStrapWithSnapshot() throws Exception { NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); - final NodeOptions nodeOpts = new NodeOptions(); + final NodeOptions nodeOpts = createNodeOptionsWithSharedTimer(); nodeOpts.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOpts.setLogUri(this.dataPath + File.separator + "log"); nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); @@ -2905,7 +2905,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); - final NodeOptions nodeOpts = new NodeOptions(); + final NodeOptions nodeOpts = createNodeOptionsWithSharedTimer(); nodeOpts.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOpts.setLogUri(this.dataPath + File.separator + "log"); nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); @@ -3297,4 +3297,11 @@ public void testChangePeersChaosApplyTasks() throws Exception { cluster.stopAll(); } } + + private NodeOptions createNodeOptionsWithSharedTimer() { + final NodeOptions options = new NodeOptions(); + options.setSharedElectionTimer(true); + options.setSharedVoteTimer(true); + return options; + } } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorGroupTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorGroupTest.java index fed06b65d..d5537cd0b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorGroupTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorGroupTest.java @@ -78,8 +78,7 @@ public class ReplicatorGroupTest { @Before public void setup() { - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); this.replicatorGroup = new ReplicatorGroupImpl(); final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs())); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java index 18c1d1ad4..c55238c2b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java @@ -86,8 +86,7 @@ public class ReplicatorTest { @Before public void setup() { - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); this.opts = new ReplicatorOptions(); this.opts.setRaftRpcService(this.rpcService); this.opts.setPeerId(this.peerId); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientServiceTest.java similarity index 85% rename from jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientServiceTest.java rename to jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientServiceTest.java index d824de60f..b788cc937 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientServiceTest.java @@ -28,16 +28,16 @@ import com.alipay.sofa.jraft.util.Endpoint; @RunWith(value = MockitoJUnitRunner.class) -public class BoltRaftClientServiceTest { - private BoltRaftClientService clientService; +public class DefaultRaftClientServiceTest { + private DefaultRaftClientService clientService; @Mock - private ReplicatorGroup rgGroup; + private ReplicatorGroup rgGroup; - private final Endpoint endpoint = new Endpoint("localhost", 8081); + private final Endpoint endpoint = new Endpoint("localhost", 8081); @Before public void setup() { - this.clientService = new BoltRaftClientService(this.rgGroup); + this.clientService = new DefaultRaftClientService(this.rgGroup); this.clientService.init(new NodeOptions()); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java index 3afe8dd56..f1a27427d 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java @@ -95,8 +95,7 @@ public class SnapshotExecutorTest extends BaseStorageTest { @Before public void setup() throws Exception { super.setup(); - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); this.raftOptions = new RaftOptions(); this.writer = new LocalSnapshotWriter(this.path, this.snapshotStorage, this.raftOptions); this.reader = new LocalSnapshotReader(this.snapshotStorage, null, new Endpoint("localhost", 8081), diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java index 5680863a1..850befdca 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java @@ -16,13 +16,6 @@ */ package com.alipay.sofa.jraft.storage.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -53,6 +46,13 @@ import com.alipay.sofa.jraft.storage.LogStorage; import com.alipay.sofa.jraft.test.TestUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + @RunWith(value = MockitoJUnitRunner.class) public class LogManagerTest extends BaseStorageTest { private LogManagerImpl logManager; @@ -266,6 +266,7 @@ public void testSetAppliedId() throws Exception { // it's in memory Assert.assertEquals(mockEntries.get(i), this.logManager.getEntryFromMemory(i + 1)); } + Thread.sleep(200); // waiting for setDiskId() this.logManager.setAppliedId(new LogId(10, 10)); for (int i = 0; i < 10; i++) { assertNull(this.logManager.getEntryFromMemory(i + 1)); @@ -281,6 +282,7 @@ public void testSetAppliedId2() throws Exception { // it's in memory Assert.assertEquals(mockEntries.get(i), this.logManager.getEntryFromMemory(i + 1)); } + Thread.sleep(200); // waiting for setDiskId() this.logManager.setAppliedId(new LogId(10, 10)); for (int i = 0; i < 10; i++) { assertNull(this.logManager.getEntryFromMemory(i + 1)); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java index 9504d936b..fba2d1667 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java @@ -29,6 +29,7 @@ import org.mockito.runners.MockitoJUnitRunner; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.Scheduler; import com.alipay.sofa.jraft.core.TimerManager; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; import com.alipay.sofa.jraft.entity.RaftOutter; @@ -70,14 +71,13 @@ public class LocalSnapshotCopierTest extends BaseStorageTest { private RaftOptions raftOptions; @Mock private LocalSnapshotStorage snapshotStorage; - private TimerManager timerManager; + private Scheduler timerManager; @Override @Before public void setup() throws Exception { super.setup(); - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); this.raftOptions = new RaftOptions(); this.writer = new LocalSnapshotWriter(this.path, this.snapshotStorage, this.raftOptions); this.reader = new LocalSnapshotReader(this.snapshotStorage, null, new Endpoint("localhost", 8081), diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySessionTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySessionTest.java index 38b6c1b03..8d375e3de 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySessionTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/CopySessionTest.java @@ -58,8 +58,7 @@ public class CopySessionTest { @Before public void setup() { - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); this.copyOpts = new CopyOptions(); this.rb = RpcRequests.GetFileRequest.newBuilder(); this.rb.setReaderId(99); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java index 10d7a11db..5b625d9c2 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java @@ -44,8 +44,7 @@ public class RemoteFileCopierTest { @Before public void setup() { - this.timerManager = new TimerManager(); - this.timerManager.init(5); + this.timerManager = new TimerManager(5); copier = new RemoteFileCopier(); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java index ec457067c..79fac431f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java @@ -68,8 +68,8 @@ import com.alipay.sofa.jraft.util.Describer; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ExecutorServiceHelper; -import com.alipay.sofa.jraft.util.MetricThreadPoolExecutor; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolMetricRegistry; import com.alipay.sofa.jraft.util.Utils; import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.Slf4jReporter; @@ -577,7 +577,7 @@ private void startMetricReporters(final long metricsReportPeriod) { this.metricsScheduler = StoreEngineHelper.createMetricsScheduler(); } // start threadPool metrics reporter - this.threadPoolMetricsReporter = Slf4jReporter.forRegistry(MetricThreadPoolExecutor.metricRegistry()) // + this.threadPoolMetricsReporter = Slf4jReporter.forRegistry(ThreadPoolMetricRegistry.metricRegistry()) // .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) // .outputTo(LOG) // .scheduleOn(this.metricsScheduler) //