Skip to content

Commit

Permalink
optimize cluster Heartbeat. (apache#5817)
Browse files Browse the repository at this point in the history
* optimize cluster Heartbeat.

* Heartbeat thread may be one.
  • Loading branch information
yu199195 authored May 28, 2020
1 parent 5e4fa1a commit 80330ef
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ public final class ClusterConfigurationYamlSwapper implements YamlSwapper<YamlCl
public YamlClusterConfiguration swap(final ClusterConfiguration clusterConfiguration) {
final YamlClusterConfiguration yamlClusterConfiguration = new YamlClusterConfiguration();
final YamlHeartbeatConfiguration yamlHeartBeatConfiguration = new YamlHeartbeatConfiguration();
yamlHeartBeatConfiguration.setSql(clusterConfiguration.getHeartbeat().getSql());
yamlHeartBeatConfiguration.setInterval(clusterConfiguration.getHeartbeat().getInterval());
yamlHeartBeatConfiguration.setRetryEnable(clusterConfiguration.getHeartbeat().getRetryEnable());
yamlHeartBeatConfiguration.setRetryMaximum(clusterConfiguration.getHeartbeat().getRetryMaximum());
yamlHeartBeatConfiguration.setRetryInterval(clusterConfiguration.getHeartbeat().getRetryInterval());
yamlHeartBeatConfiguration.setThreadCount(clusterConfiguration.getHeartbeat().getThreadCount());
HeartbeatConfiguration heartbeat = clusterConfiguration.getHeartbeat();
yamlHeartBeatConfiguration.setSql(heartbeat.getSql());
yamlHeartBeatConfiguration.setInterval(heartbeat.getInterval());
yamlHeartBeatConfiguration.setRetryEnable(heartbeat.getRetryEnable());
yamlHeartBeatConfiguration.setRetryMaximum(heartbeat.getRetryMaximum());
yamlHeartBeatConfiguration.setRetryInterval(heartbeat.getRetryInterval());
yamlHeartBeatConfiguration.setThreadCount(heartbeat.getThreadCount());
yamlClusterConfiguration.setHeartbeat(yamlHeartBeatConfiguration);
return yamlClusterConfiguration;
}
Expand All @@ -46,12 +47,13 @@ public YamlClusterConfiguration swap(final ClusterConfiguration clusterConfigura
public ClusterConfiguration swap(final YamlClusterConfiguration yamlConfiguration) {
final ClusterConfiguration clusterConfiguration = new ClusterConfiguration();
final HeartbeatConfiguration heartBeatConfiguration = new HeartbeatConfiguration();
heartBeatConfiguration.setSql(yamlConfiguration.getHeartbeat().getSql());
heartBeatConfiguration.setInterval(yamlConfiguration.getHeartbeat().getInterval());
heartBeatConfiguration.setRetryEnable(yamlConfiguration.getHeartbeat().getRetryEnable());
heartBeatConfiguration.setRetryMaximum(yamlConfiguration.getHeartbeat().getRetryMaximum());
heartBeatConfiguration.setRetryInterval(yamlConfiguration.getHeartbeat().getRetryInterval());
heartBeatConfiguration.setThreadCount(yamlConfiguration.getHeartbeat().getThreadCount());
YamlHeartbeatConfiguration heartbeat = yamlConfiguration.getHeartbeat();
heartBeatConfiguration.setSql(heartbeat.getSql());
heartBeatConfiguration.setInterval(heartbeat.getInterval());
heartBeatConfiguration.setRetryEnable(heartbeat.getRetryEnable());
heartBeatConfiguration.setRetryMaximum(heartbeat.getRetryMaximum());
heartBeatConfiguration.setRetryInterval(heartbeat.getRetryInterval());
heartBeatConfiguration.setThreadCount(null == heartbeat.getThreadCount() ? Runtime.getRuntime().availableProcessors() << 1 : heartbeat.getThreadCount());
clusterConfiguration.setHeartbeat(heartBeatConfiguration);
return clusterConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Map<String, DataSourceState> buildDataSourceStateMap(final InstanceState

private void buildDataSourceState(final String schemaName, final Collection<HeartbeatResult> heartbeatResults,
final Map<String, DataSourceState> dataSourceStateMap, final InstanceState instanceState) {
heartbeatResults.stream().forEach(each -> {
heartbeatResults.forEach(each -> {
String dataSourceName = Joiner.on(".").join(schemaName, each.getDataSourceName());
DataSourceState dataSourceState = null == instanceState.getDataSources()
|| null == instanceState.getDataSources().get(dataSourceName) ? new DataSourceState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static ClusterHeartbeatInstance getInstance() {
*/
public void init(final HeartbeatConfiguration configuration) {
Preconditions.checkNotNull(configuration, "heartbeat configuration can not be null.");
heartbeatTaskManager = new HeartbeatTaskManager(configuration.getInterval(), configuration.getThreadCount());
heartbeatTaskManager = new HeartbeatTaskManager(configuration.getInterval());
HeartbeatTask task = new HeartbeatTask(new HeartbeatDetectNoticeEvent());
heartbeatTaskManager.start(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public final class HeartbeatTaskManager {

private ScheduledExecutorService executorService;

public HeartbeatTaskManager(final Integer interval, final Integer threadCount) {
public HeartbeatTaskManager(final Integer interval) {
this.interval = interval;
executorService = Executors.newScheduledThreadPool(threadCount);
executorService = Executors.newScheduledThreadPool(1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class HeartbeatTaskManagerTest {
@Before
@SneakyThrows({NoSuchFieldException.class, SecurityException.class})
public void init() {
heartbeatTaskManager = new HeartbeatTaskManager(60, 1);
heartbeatTaskManager = new HeartbeatTaskManager(60);
FieldSetter.setField(heartbeatTaskManager, heartbeatTaskManager.getClass().getDeclaredField("executorService"), executorService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,27 @@
import org.apache.shardingsphere.kernal.context.SchemaContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;


/**
* Heartbeat handler.
*/
@Slf4j
public final class HeartbeatHandler {

private static final int FUTURE_GET_TIME_OUT_MILLISECONDS = 1000;

private HeartbeatConfiguration configuration;

/**
Expand Down Expand Up @@ -71,37 +74,24 @@ public static HeartbeatHandler getInstance() {
* @param schemaContexts schema contexts
*/
public void handle(final Map<String, SchemaContext> schemaContexts) {
ExecutorService executorService = Executors.newFixedThreadPool(countDataSource(schemaContexts));
List<FutureTask<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(configuration.getThreadCount());
List<Future<Map<String, HeartbeatResult>>> futureTasks = new ArrayList<>();
schemaContexts.forEach((key, value) -> value.getSchema().getDataSources().forEach((innerKey, innerValue) -> {
FutureTask<Map<String, HeartbeatResult>> futureTask = new FutureTask<>(new HeartbeatDetect(key, innerKey, innerValue, configuration));
futureTasks.add(futureTask);
executorService.submit(futureTask);
futureTasks.add(executorService.submit(new HeartbeatDetect(key, innerKey, innerValue, configuration)));
}));
reportHeartbeat(futureTasks);
closeExecutor(executorService);
}

private Integer countDataSource(final Map<String, SchemaContext> schemaContexts) {
return Long.valueOf(schemaContexts.values().stream()
.collect(Collectors.summarizingInt(entry -> entry.getSchema().getDataSources().keySet().size())).getSum()).intValue();
}

private void reportHeartbeat(final List<FutureTask<Map<String, HeartbeatResult>>> futureTasks) {
Map<String, Collection<HeartbeatResult>> heartbeatResultMap = new HashMap<>();
futureTasks.stream().forEach(each -> {
private void reportHeartbeat(final List<Future<Map<String, HeartbeatResult>>> futureTasks) {
Map<String, Collection<HeartbeatResult>> heartbeatResultMap = futureTasks.stream().map(e -> {
try {
each.get().entrySet().forEach(entry -> {
if (Objects.isNull(heartbeatResultMap.get(entry.getKey()))) {
heartbeatResultMap.put(entry.getKey(), new ArrayList<>(Arrays.asList(entry.getValue())));
} else {
heartbeatResultMap.get(entry.getKey()).add(entry.getValue());
}
});
} catch (InterruptedException | ExecutionException ex) {
return e.get(FUTURE_GET_TIME_OUT_MILLISECONDS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
log.error("Heartbeat report error", ex);
return new HashMap<String, HeartbeatResult>();
}
});
}).flatMap(map -> map.entrySet().stream()).collect(Collectors.groupingBy(Map.Entry::getKey, HashMap::new, Collectors.mapping(Map.Entry::getValue, Collectors.toCollection(ArrayList::new))));
ClusterFacade.getInstance().reportHeartbeat(new HeartbeatResponse(heartbeatResultMap));
}

Expand Down

0 comments on commit 80330ef

Please sign in to comment.