Skip to content

Commit

Permalink
The abstract thread pool dynamically changes the kernel layer (opengo…
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks authored May 22, 2023
1 parent bf9572f commit d1362fa
Show file tree
Hide file tree
Showing 34 changed files with 542 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public DynamicThreadPoolChangeHandlerSpring2x(ConfigurableApplicationContext con
super(context);
}

@Override
protected BootstrapConfigProperties bindProperties(Map<Object, Object> configInfo, ApplicationContext applicationContext) {
BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties();
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,21 @@

<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-config-spring-boot-starter</artifactId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<artifactId>hippo4j-threadpool-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.parser.ConfigFileTypeEnum;
import cn.hippo4j.config.springboot.starter.parser.ConfigParserHandler;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
Expand All @@ -43,19 +41,12 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;

/**
* Abstract dynamic thread poo change handler spring
Expand Down Expand Up @@ -93,8 +84,6 @@ public void registerListener() {
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace);
}

protected abstract BootstrapConfigProperties bindProperties(Map<Object, Object> configInfo, ApplicationContext applicationContext);

private void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap, ApplicationContext context) {
try {
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
Expand All @@ -104,34 +93,24 @@ private void dynamicRefresh(String configContent, Map<String, Object> newValueCh
if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
Optional.ofNullable(afterConfigMap).ifPresent(each -> each.putAll(newValueChangeMap));
}
BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context);

List<ExecutorProperties> executors = afterConfigProperties.getExecutors();
for (ExecutorProperties afterProperties : executors) {
String threadPoolId = afterProperties.getThreadPoolId();
AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId);
if (holder.isEmpty() || holder.getExecutor() == null) {
continue;
}
ExecutorProperties beforeProperties = convert(holder.getProperties());

if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) {
continue;
}

dynamicRefreshPool(beforeProperties, afterProperties);
holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh.
ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties);
LOGGER.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId,
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), changeRequest.getNowKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), changeRequest.getNowAllowsCoreThreadTimeOut()));
}
// TODO
/*
* BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context);
*
* List<ExecutorProperties> executors = afterConfigProperties.getExecutors(); for (ExecutorProperties afterProperties : executors) { String threadPoolId =
* afterProperties.getThreadPoolId(); AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId); if (holder.isEmpty() ||
* holder.getExecutor() == null) { continue; } ExecutorProperties beforeProperties = convert(holder.getProperties());
*
* if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) { continue; }
*
* dynamicRefreshPool(beforeProperties, afterProperties); holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh.
* ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties); LOGGER.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, String.format(CHANGE_DELIMITER,
* beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
* String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(),
* changeRequest.getNowKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), String.format(CHANGE_DELIMITER,
* beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(),
* changeRequest.getNowAllowsCoreThreadTimeOut())); }
*/
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] config mode dynamic refresh failed.", ex);
}
Expand Down Expand Up @@ -183,7 +162,7 @@ private void dynamicRefreshPool(ExecutorProperties beforeProperties, ExecutorPro
* Fail default executor properties.
*
* @param beforeProperties old properties
* @param afterProperties new properties
* @param afterProperties new properties
* @return executor properties
*/
private Properties failDefaultExecutorProperties(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) {
Expand Down Expand Up @@ -236,27 +215,20 @@ private Properties convert(ExecutorProperties executorProperties) {
* @param afterProperties new properties
* @return instance
*/
private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) {
ChangeParameterNotifyRequest changeParameterNotifyRequest = ChangeParameterNotifyRequest.builder()
.beforeCorePoolSize(beforeProperties.getCorePoolSize())
.beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize())
.beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut())
.beforeKeepAliveTime(beforeProperties.getKeepAliveTime())
.beforeQueueCapacity(beforeProperties.getQueueCapacity())
.beforeRejectedName(beforeProperties.getRejectedHandler())
.beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut())
.blockingQueueName(afterProperties.getBlockingQueue())
.nowCorePoolSize(Optional.ofNullable(afterProperties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
.nowMaximumPoolSize(Optional.ofNullable(afterProperties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
.nowAllowsCoreThreadTimeOut(Optional.ofNullable(afterProperties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
.nowKeepAliveTime(Optional.ofNullable(afterProperties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
.nowQueueCapacity(Optional.ofNullable(afterProperties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
.nowRejectedName(Optional.ofNullable(afterProperties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
.nowExecuteTimeOut(Optional.ofNullable(afterProperties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut()))
.build();
changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
return changeParameterNotifyRequest;
}
/*
* private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties afterProperties) { ChangeParameterNotifyRequest changeParameterNotifyRequest =
* ChangeParameterNotifyRequest.builder() .beforeCorePoolSize(beforeProperties.getCorePoolSize()) .beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize())
* .beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()) .beforeKeepAliveTime(beforeProperties.getKeepAliveTime()) .beforeQueueCapacity(beforeProperties.getQueueCapacity())
* .beforeRejectedName(beforeProperties.getRejectedHandler()) .beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut()) .blockingQueueName(afterProperties.getBlockingQueue())
* .nowCorePoolSize(Optional.ofNullable(afterProperties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
* .nowMaximumPoolSize(Optional.ofNullable(afterProperties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
* .nowAllowsCoreThreadTimeOut(Optional.ofNullable(afterProperties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
* .nowKeepAliveTime(Optional.ofNullable(afterProperties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
* .nowQueueCapacity(Optional.ofNullable(afterProperties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
* .nowRejectedName(Optional.ofNullable(afterProperties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
* .nowExecuteTimeOut(Optional.ofNullable(afterProperties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut())) .build();
* changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); return changeParameterNotifyRequest; }
*/

/**
* Check consistency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,9 +99,7 @@ public static void register(String threadPoolId, ThreadPoolExecutor executor) {
properties.put(ThreadPoolPropertyKey.THREAD_NAME_PREFIX, threadPoolId);
properties.put(ThreadPoolPropertyKey.REJECTED_HANDLER, RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName());
properties.put(ThreadPoolPropertyKey.EXECUTE_TIME_OUT, Constants.EXECUTE_TIME_OUT);

// register executor.
AgentThreadPoolInstanceRegistry.getInstance().putHolder(threadPoolId, executor, properties);

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
package cn.hippo4j.common.toolkit;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.toolkit;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -10,7 +26,7 @@
public class ClassUtilTest {

@Test
public void testGetClassLoader(){
public void testGetClassLoader() {
ClassLoader expectedClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader actualClassLoader = ClassUtil.getClassLoader(ClassUtilTest.class);
Assert.assertEquals(expectedClassLoader, actualClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cn.hippo4j.core.config;
package cn.hippo4j.threadpool.dynamic.api;

/**
* Bootstrap properties interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ default void registerListener() {
default void dynamicRefresh(String content) {
}

/**
* Dynamic refresh of configuration center data changes.
*
* @param configFileType config file type
* @param content changed data
*/
default void dynamicRefresh(String configFileType, String content) {
}

/**
* Dynamic refresh.
*
Expand All @@ -46,4 +55,14 @@ default void dynamicRefresh(String content) {
*/
default void dynamicRefresh(String content, Map<String, Object> newValueChangeMap) {
}

/**
* Dynamic refresh.
*
* @param configFileType config file type
* @param content changed data
* @param newValueChangeMap new value change map
*/
default void dynamicRefresh(String configFileType, String content, Map<String, Object> newValueChangeMap) {
}
}
8 changes: 8 additions & 0 deletions kernel/dynamic/mode/config/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@
</parent>

<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>

<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.threadpool.dynamic.mode.config.parser;

/**
* Abstract config parser
*/
public abstract class AbstractConfigParser implements ConfigParser {

@Override
public boolean supports(ConfigFileTypeEnum type) {
return getConfigFileTypes().contains(type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.threadpool.dynamic.mode.config.parser;

import lombok.Getter;

/**
* Config file type enum
*/
@Getter
public enum ConfigFileTypeEnum {

/**
* PROPERTIES
*/
PROPERTIES("properties"),

/**
* XML
*/
XML("xml"),

/**
* JSON
*/
JSON("json"),

/**
* YML
*/
YML("yml"),

/**
* YAML
*/
YAML("yaml"),

/**
* TXT
*/
TXT("txt");

private final String value;

ConfigFileTypeEnum(String value) {
this.value = value;
}

public static ConfigFileTypeEnum of(String value) {
for (ConfigFileTypeEnum typeEnum : ConfigFileTypeEnum.values()) {
if (typeEnum.value.equals(value)) {
return typeEnum;
}
}
return PROPERTIES;
}
}
Loading

0 comments on commit d1362fa

Please sign in to comment.