Skip to content

Commit

Permalink
add shared raft timer, default false (sofastack#414)
Browse files Browse the repository at this point in the history
* add shared raft timer, default false

* more robust

* add shared scheduler

* clear necessary code

* minor change

* fix unit test

* add channel init low/high write buf water mark for bolt impl

* minor fix

* minor fix for CR
  • Loading branch information
fengjiachun authored Apr 8, 2020
1 parent c924b49 commit 5d47497
Show file tree
Hide file tree
Showing 35 changed files with 1,044 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

import com.alipay.sofa.jraft.util.FileOutputSignalHandler;
import com.alipay.sofa.jraft.util.MetricReporter;
import com.alipay.sofa.jraft.util.MetricThreadPoolExecutor;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadPoolMetricRegistry;

/**
*
Expand All @@ -48,10 +48,10 @@ public void handle(final String signalName) {
LOG.info("Printing thread pools metrics with signal: {} to file: {}.", signalName, file);

try (final PrintStream out = new PrintStream(new FileOutputStream(file, true))) {
final MetricReporter reporter = MetricReporter.forRegistry(MetricThreadPoolExecutor.metricRegistry()) //
MetricReporter.forRegistry(ThreadPoolMetricRegistry.metricRegistry()) //
.outputTo(out) //
.build();
reporter.report();
.build() //
.report();
}
} catch (final IOException e) {
LOG.error("Fail to print thread pools metrics.", e);
Expand Down
39 changes: 24 additions & 15 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.RpcResponseFactory;
import com.alipay.sofa.jraft.rpc.impl.core.BoltRaftClientService;
import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.LogStorage;
import com.alipay.sofa.jraft.storage.RaftMetaStorage;
Expand All @@ -116,6 +116,7 @@
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;
import com.google.protobuf.Message;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -152,6 +153,11 @@ public class NodeImpl implements Node, RaftServerService {
}
}

private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader
.load(
RaftTimerFactory.class) //
.first();

// Max retry times when applying tasks.
private static final int MAX_APPLY_RETRY_TIMES = 3;

Expand Down Expand Up @@ -194,7 +200,7 @@ public class NodeImpl implements Node, RaftServerService {
private RaftClientService rpcService;
private ReadOnlyService readOnlyService;
/** Timers */
private TimerManager timerManager;
private Scheduler timerManager;
private RepeatedTimer electionTimer;
private RepeatedTimer voteTimer;
private RepeatedTimer stepDownTimer;
Expand Down Expand Up @@ -865,15 +871,14 @@ public boolean init(final NodeOptions opts) {
return false;
}

this.timerManager = new TimerManager();
if (!this.timerManager.init(this.options.getTimerPoolSize())) {
LOG.error("Fail to init timer manager.");
return false;
}
this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(),
this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");

// Init timers
final String suffix = getNodeId().toString();
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer-" + suffix, this.options.getElectionTimeoutMs()) {
String name = "JRaft-VoteTimer-" + suffix;
this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer(
this.options.isSharedVoteTimer(), name)) {

@Override
protected void onTrigger() {
Expand All @@ -885,7 +890,9 @@ protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer-" + suffix, this.options.getElectionTimeoutMs()) {
name = "JRaft-ElectionTimer-" + suffix;
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {

@Override
protected void onTrigger() {
Expand All @@ -897,16 +904,18 @@ protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer-" + suffix,
this.options.getElectionTimeoutMs() >> 1) {
name = "JRaft-StepDownTimer-" + suffix;
this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1,
TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) {

@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer-" + suffix,
this.options.getSnapshotIntervalSecs() * 1000) {
name = "JRaft-SnapshotTimer-" + suffix;
this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {

private volatile boolean firstSchedule = true;

Expand Down Expand Up @@ -1000,7 +1009,7 @@ protected int adjustTimeout(final int timeoutMs) {

// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it
this.replicatorGroup = new ReplicatorGroupImpl();
this.rpcService = new BoltRaftClientService(this.replicatorGroup);
this.rpcService = new DefaultRaftClientService(this.replicatorGroup);
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
Expand Down Expand Up @@ -2300,7 +2309,7 @@ public NodeOptions getOptions() {
return this.options;
}

public TimerManager getTimerManager() {
public Scheduler getTimerManager() {
return this.timerManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class Replicator implements ThreadId.OnError {
private ScheduledFuture<?> heartbeatTimer;
private volatile SnapshotReader reader;
private CatchUpClosure catchUpClosure;
private final TimerManager timerManager;
private final Scheduler timerManager;
private final NodeMetrics nodeMetrics;
private volatile State state;

Expand Down
88 changes: 88 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.core;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
*
* @author jiachun.fjc
*/
public interface Scheduler {

/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
*
* @param command the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @return a ScheduledFuture representing pending completion of
* the task and whose {@code get()} method will return
* {@code null} upon completion
* scheduled for execution
*/
ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit);

/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is executions will commence after
* {@code initialDelay} then {@code initialDelay+period}, then
* {@code initialDelay + 2 * period}, and so on.
* If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor. If any execution of this task
* takes longer than its period, then subsequent executions
* may start late, but will not concurrently execute.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose {@code get()} method will throw an
* exception upon cancellation
*/
ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
final TimeUnit unit);

/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the
* given delay between the termination of one execution and the
* commencement of the next. If any execution of the task
* encounters an exception, subsequent executions are suppressed.
* Otherwise, the task will only terminate via cancellation or
* termination of the executor.
*
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
* execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
* @return a ScheduledFuture representing pending completion of
* the task, and whose {@code get()} method will throw an
* exception upon cancellation
*/
ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
final TimeUnit unit);

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package com.alipay.sofa.jraft.core;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;

/**
* The global timer manager.
Expand All @@ -31,45 +30,42 @@
*
* 2018-Mar-30 3:24:34 PM
*/
public class TimerManager implements Lifecycle<Integer> {
public class TimerManager implements Scheduler {

private ScheduledExecutorService executor;
private final ScheduledExecutorService executor;

@Override
public boolean init(Integer coreSize) {
this.executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory(
"JRaft-Node-ScheduleThreadPool-", true));
return true;
}

@Override
public void shutdown() {
if (this.executor != null) {
this.executor.shutdownNow();
this.executor = null;
}
public TimerManager(int workerNum) {
this(workerNum, "JRaft-Node-ScheduleThreadPool");
}

private void checkStarted() {
if (this.executor == null) {
throw new IllegalStateException("Please init timer manager.");
}
public TimerManager(int workerNum, String name) {
this.executor = ThreadPoolUtil.newScheduledBuilder() //
.poolName(name) //
.coreThreads(workerNum) //
.enableMetric(true) //
.threadFactory(new NamedThreadFactory(name, true)) //
.build();
}

@Override
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
checkStarted();
return this.executor.schedule(command, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
final TimeUnit unit) {
checkStarted();
return this.executor.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
final TimeUnit unit) {
checkStarted();
return this.executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void shutdown() {
this.executor.shutdownNow();
}
}
Loading

0 comments on commit 5d47497

Please sign in to comment.