Skip to content

Commit

Permalink
add lane filter & spi loader
Browse files Browse the repository at this point in the history
  • Loading branch information
zilongTong committed Aug 1, 2022
1 parent 166a625 commit b2a612d
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import com.google.common.collect.Sets;
import com.tencent.tsf.femas.common.constant.FemasConstant;
import com.tencent.tsf.femas.common.context.Context;
import com.tencent.tsf.femas.common.context.ContextConstant;
import com.tencent.tsf.femas.common.context.FemasContext;
import com.tencent.tsf.femas.common.context.factory.ContextFactory;
import com.tencent.tsf.femas.common.entity.Service;
import com.tencent.tsf.femas.common.entity.ServiceInstance;
import com.tencent.tsf.femas.governance.lane.entity.LaneRule;
import com.tencent.tsf.femas.common.exception.FemasRuntimeException;
import com.tencent.tsf.femas.common.tag.engine.TagEngine;
import com.tencent.tsf.femas.common.util.CollectionUtil;
import com.tencent.tsf.femas.common.util.StringUtils;
import com.tencent.tsf.femas.governance.lane.LaneFilter;
import com.tencent.tsf.femas.governance.lane.entity.LaneRule;
import com.tencent.tsf.femas.governance.plugin.context.ConfigContext;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,11 +58,26 @@ public class FemasLaneFilter implements LaneFilter {
*/
private static volatile Map<String, LaneInfo> LANE_ID_LANE_INFO_MAP = new ConcurrentHashMap<>();




private volatile static List<String> currentGroupLaneIds = null;

// 所有泳道涉及的部署组id列表
private volatile static Map<String, Boolean> allLaneConfiguredGroupsIds = new ConcurrentHashMap<>();
// laneId -> 对应泳道配置过泳道的服务列表(applicationId#namespaceId),便于快速判断当前服务是否配置过对应的泳道
private volatile static Map<String, Set<String>> laneConfiguredServicesMap = new ConcurrentHashMap<>();
// laneId -> 对应泳道配置的 groupIds,便于快速判断当前节点是否属于该泳道
private volatile static Map<String, Set<String>> laneConfiguredGroupIdsMap = new ConcurrentHashMap<>();

/**
* 命名空间-landIds 映射map
*/
private static volatile Map<String, Set<String>> NAMESPACE_LANE_INFO_MAP = new ConcurrentHashMap<>();

private volatile static ContextConstant contextConstant = ContextFactory.getContextConstantInstance();

private static String groupId = Context.getSystemTag(contextConstant.getGroupId());
/**
* 部署组id-landIds 映射map
* 供colorless逻辑使用
Expand Down Expand Up @@ -373,7 +391,7 @@ public void preProcessLaneId() {

@Override
public String getName() {
return "tsfLane";
return "femasLane";
}

@Override
Expand All @@ -390,4 +408,78 @@ public void init(ConfigContext conf) throws FemasRuntimeException {
public void destroy() {

}

public static LaneInfo getTsfLaneInfo(String laneId) {
return LANE_ID_LANE_INFO_MAP.get(laneId);
}

public static Map<String, LaneInfo> getTsfLaneInfoMap() {
return LANE_ID_LANE_INFO_MAP;
}

public static List<String> getCurrentGroupLaneIds() {
return currentGroupLaneIds;
}

/**
* 从 LANE_ID_LANE_INFO_MAP 更新 currentGroupLaneId
* @return
*/
public static void updateLaneShortCutInfo() {
List<String> laneIds = Collections.synchronizedList(new ArrayList());
Map<String, Boolean> groupIds = new ConcurrentHashMap<>();

Map<String, Set<String>> tempLaneConfiguredServicesMap = new ConcurrentHashMap<>();
Map<String, Set<String>> tempLaneConfiguredGroupIdsMap = new ConcurrentHashMap<>();

if (StringUtils.isNotEmpty(groupId) || LANE_ID_LANE_INFO_MAP != null || LANE_ID_LANE_INFO_MAP.size() > 0) {
for (Map.Entry<String, LaneInfo> entry : LANE_ID_LANE_INFO_MAP.entrySet()) {
List<LaneGroup> laneGroupList = entry.getValue().getLaneGroupList();
Set<String> tempLaneConfiguredServices = new HashSet<>();
Set<String> tempLaneConfiguredGroupIds = new HashSet<>();

if (CollectionUtils.isNotEmpty(laneGroupList)) {
for (LaneGroup group : laneGroupList) {
// 理论上 groupId、applicationId、nsId 都不为空,但端云联调等自己注册的可能会有缺失
if (StringUtils.isNotEmpty(group.getGroupId())) {
groupIds.put(group.getGroupId(), true);
tempLaneConfiguredGroupIds.add(group.getGroupId());
}
if (StringUtils.isNotEmpty(group.getNamespaceId())
&& StringUtils.isNotEmpty(group.getApplicationId())) {
tempLaneConfiguredServices.add(getLaneConfiguredServiceKey(group.getApplicationId(), group.getNamespaceId()));
}

if (groupId.equals(group.getGroupId())) {
laneIds.add(entry.getKey());
}
}
tempLaneConfiguredServicesMap.put(entry.getKey(), tempLaneConfiguredServices);
tempLaneConfiguredGroupIdsMap.put(entry.getKey(), tempLaneConfiguredGroupIds);
}
}
}

currentGroupLaneIds = laneIds;
allLaneConfiguredGroupsIds = groupIds;
laneConfiguredServicesMap = tempLaneConfiguredServicesMap;
laneConfiguredGroupIdsMap = tempLaneConfiguredGroupIdsMap;
}

public static Set<String> getAllLaneConfiguredGroupsIds() {
return allLaneConfiguredGroupsIds.keySet();
}

public static Set<String> getLaneConfiguredServices(String laneId) {
return laneConfiguredServicesMap.get(laneId);
}

public static Set<String> getLaneConfiguredGroupIds(String laneId) {
return laneConfiguredGroupIdsMap.get(laneId);
}

// 根据 applicationId 和 namespaceId 判断一个服务是否配置过泳道
public static String getLaneConfiguredServiceKey(String applicationId, String namespaceId) {
return String.format("%s#%s", applicationId, namespaceId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package com.tencent.tsf.femas.adaptor.paas.governance.lane;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tencent.tsf.femas.adaptor.paas.config.FemasPaasConfigManager;
import com.tencent.tsf.femas.common.entity.Service;
import com.tencent.tsf.femas.common.tag.Tag;
import com.tencent.tsf.femas.common.tag.TagExpression;
import com.tencent.tsf.femas.common.tag.TagRule;
import com.tencent.tsf.femas.common.tag.constant.TagConstant;
import com.tencent.tsf.femas.common.util.CollectionUtil;
import com.tencent.tsf.femas.common.util.StringUtils;
import com.tencent.tsf.femas.config.Config;
import com.tencent.tsf.femas.config.ConfigChangeListener;
import com.tencent.tsf.femas.config.enums.PropertyChangeType;
import com.tencent.tsf.femas.config.model.ConfigChangeEvent;
import com.tencent.tsf.femas.governance.lane.entity.LaneRule;
import com.tencent.tsf.femas.governance.lane.LaneService;
import com.tencent.tsf.femas.governance.plugin.config.ConfigHandler;
import com.tencent.tsf.femas.governance.plugin.config.enums.ConfigHandlerTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class FemasLaneHandler extends ConfigHandler {
private static final Logger logger = LoggerFactory.getLogger(FemasLaneHandler.class);

private static AtomicBoolean SUBSCRIBED_LANE_CONFIG = new AtomicBoolean(false);
private static AtomicBoolean SUBSCRIBED_LANE_RULE_CONFIG = new AtomicBoolean(false);

/**
* @see com.tencent.tsf.femas.common.spi.SpiExtensionClass#getType()
*/
@Override
public String getType() {
return ConfigHandlerTypeEnum.LANE.getType();
}

@Override
public synchronized void subscribeServiceConfig(Service service) {
subscribeLaneConfig();
subscribeLaneRuleConfig();
}

public void subscribeLaneConfig() {
if (SUBSCRIBED_LANE_CONFIG.get()) {
return;
}
FemasLaneFilter femasLaneFilter = new FemasLaneFilter();
LaneService.refreshLaneFilter(femasLaneFilter);
String laneKey = "lane/info/";

Config config = FemasPaasConfigManager.getConfig();
// 初始化时,同步获取 lane info
// List<ConfigChangeEvent<String>> laneInfoEvents = config.getDirectory(laneKey);
// if (CollectionUtil.isNotEmpty(laneInfoEvents)) {
// for (ConfigChangeEvent<String> configChangeEvent : laneInfoEvents) {
// //新增逻辑
// LaneInfo laneInfo = parseLaneInfo(configChangeEvent.getNewValue());
// FemasLaneFilter.addLaneInfo(laneInfo);
// }
// }
// FemasLaneFilter.updateLaneShortCutInfo();
// logger.info("init group lane id:{}", LaneIdHolder.getCurrentGroupLaneId());

config.subscribe(laneKey, new ConfigChangeListener<String>() {
@Override
public void onChange(List<ConfigChangeEvent<String>> configChangeEvents) {
logger.info("[FEMAS TSF ADAPTOR LANE INFO] Starting process lane info change event. Changed event size : " + configChangeEvents.size());
if (CollectionUtil.isEmpty(configChangeEvents)) {
return;
}

for (ConfigChangeEvent<String> configChangeEvent : configChangeEvents) {
try {
// 删除规则
if (configChangeEvent.getChangeType() == PropertyChangeType.DELETED) {
LaneInfo laneInfo = parseLaneInfo(configChangeEvent.getOldValue());
FemasLaneFilter.removeLaneInfo(laneInfo);

logger.info("[FEMAS TSF ADAPTOR LANE INFO] Remove Lane info. Lane info : " + laneInfo);
} else if (configChangeEvent.getChangeType() == PropertyChangeType.MODIFIED) {
// 修改
LaneInfo oldLaneInfo = parseLaneInfo(configChangeEvent.getOldValue());
FemasLaneFilter.removeLaneInfo(oldLaneInfo);

LaneInfo laneInfo = parseLaneInfo(configChangeEvent.getNewValue());
FemasLaneFilter.addLaneInfo(laneInfo);

logger.info("[FEMAS TSF ADAPTOR LANE INFO] Update Lane info. Lane info : " + laneInfo);
} else {
//新增逻辑
LaneInfo laneInfo = parseLaneInfo(configChangeEvent.getNewValue());
// 因为初始化时会设置一次,为了避免重复设置有问题,先尝试清除一次
FemasLaneFilter.removeLaneInfo(laneInfo);
FemasLaneFilter.addLaneInfo(laneInfo);

logger.info("[FEMAS TSF ADAPTOR LANE INFO] ADD Lane info. Lane info : " + laneInfo);
}
// 任何变化时,都更新当前部署组所在泳道信息
FemasLaneFilter.updateLaneShortCutInfo();
} catch (Exception ex) {
logger.error("[FEMAS TSF ADAPTOR LANE INFO] tsf Lane info load error.", ex);
}

}
}

@Override
public void onChange(ConfigChangeEvent<String> changeEvent) {
}
});

SUBSCRIBED_LANE_CONFIG.set(true);
}

public void subscribeLaneRuleConfig() {
if (SUBSCRIBED_LANE_RULE_CONFIG.get()) {
return;
}

String laneKey = "lane/rule/";

Config config = FemasPaasConfigManager.getConfig();

// 初始化时,同步获取 lane rule
// List<ConfigChangeEvent<String>> laneRuleEvents = config.getDirectory(laneKey);
// if (CollectionUtil.isNotEmpty(laneRuleEvents)) {
// for (ConfigChangeEvent<String> configChangeEvent : laneRuleEvents) {
// //新增逻辑
// LaneRule laneRule = parseLaneRule(configChangeEvent.getNewValue());
// FemasLaneFilter.removeLaneRule(laneRule);
//
// FemasLaneFilter.addLaneRule(laneRule);
// }
// }

config.subscribe(laneKey, new ConfigChangeListener<String>() {
@Override
public void onChange(List<ConfigChangeEvent<String>> configChangeEvents) {
logger.info("[FEMAS TSF ADAPTOR LANE RULE] Starting process lane rule change event. Changed event size : " + configChangeEvents.size());
if (CollectionUtil.isEmpty(configChangeEvents)) {
return;
}

for (ConfigChangeEvent<String> configChangeEvent : configChangeEvents) {
try {
// 删除规则
if (configChangeEvent.getChangeType() == PropertyChangeType.DELETED) {
LaneRule laneRule = parseLaneRule(configChangeEvent.getOldValue());
FemasLaneFilter.removeLaneRule(laneRule);

logger.info("[FEMAS TSF ADAPTOR LANE RULE] Remove Lane Rule. Lane rule : " + laneRule);
} else {
// 修改或者新增逻辑
LaneRule laneRule = parseLaneRule(configChangeEvent.getNewValue());
FemasLaneFilter.removeLaneRule(laneRule);

FemasLaneFilter.addLaneRule(laneRule);

logger.info("[FEMAS TSF ADAPTOR LANE RULE] Update Lane Rule. Lane rule : " + laneRule);
}
} catch (Exception ex) {
logger.error("[FEMAS TSF ADAPTOR LANE] tsf Lane info load error.", ex);
}

}
}

@Override
public void onChange(ConfigChangeEvent<String> changeEvent) {
}
});

SUBSCRIBED_LANE_RULE_CONFIG.set(true);
}

public static void unsubscribeLaneRuleConfig() {
String laneKey = "lane/rule/";

Config config = FemasPaasConfigManager.getConfig();

config.unsubscribe(laneKey);
}


private static LaneInfo parseLaneInfo(String laneInfoString) {
try {
if (!StringUtils.isEmpty(laneInfoString)) {
Yaml yaml = new Yaml();
ObjectMapper mapper = new ObjectMapper();
// 配置 ObjectMapper在反序列化时,忽略目标对象没有的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

String laneInfoJsonString = mapper.writeValueAsString(yaml.load(laneInfoString));
LaneInfo laneInfo = mapper.readValue(laneInfoJsonString, new TypeReference<LaneInfo>() {
});
return laneInfo;
}

throw new RuntimeException("Lane rule is null.");
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static LaneRule parseLaneRule(String laneRuleString) {
try {
if (!StringUtils.isEmpty(laneRuleString)) {
Yaml yaml = new Yaml();
ObjectMapper mapper = new ObjectMapper();
// 配置 ObjectMapper在反序列化时,忽略目标对象没有的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

String laneInfoJsonString = mapper.writeValueAsString(yaml.load(laneRuleString));
com.tencent.tsf.femas.adaptor.paas.governance.lane.LaneRule laneRule = mapper.readValue(laneInfoJsonString, new TypeReference<com.tencent.tsf.femas.adaptor.paas.governance.lane.LaneRule>() {
});

LaneRule femasLaneRule = new LaneRule();
femasLaneRule.setCreateTime(laneRule.getCreateTime());
femasLaneRule.setLaneId(laneRule.getLaneId());
femasLaneRule.setPriority(laneRule.getPriority());
femasLaneRule.setRuleId(laneRule.getRuleId());

TagRule tagRule = new TagRule();
String expression = laneRule.getRuleTagRelationship() == RuleTagRelationship.RELEATION_AND ? TagExpression.RELATION_AND : TagExpression.RELATION_OR;
tagRule.setExpression(expression);

List<Tag> tags = new ArrayList<>();
for (LaneRuleTag laneRuleTag : laneRule.getRuleTagList()) {
Tag tag = new Tag();
tag.setTagValue(laneRuleTag.getTagValue());
tag.setTagOperator(laneRuleTag.getTagOperator());
tag.setTagField(laneRuleTag.getTagName());
tag.setTagType(TagConstant.TYPE.CUSTOM);

tags.add(tag);

}
tagRule.setTags(tags);

femasLaneRule.setTagRule(tagRule);

return femasLaneRule;
}

throw new RuntimeException("Lane rule is null.");
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit b2a612d

Please sign in to comment.