Skip to content

Commit

Permalink
[Optimize]健康巡检增加ClusterParam, 从而拆分Kafka和Connect相关的巡检任务
Browse files Browse the repository at this point in the history
  • Loading branch information
zengqiao committed Nov 10, 2022
1 parent e456be9 commit 7661826
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster;

import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;

/**
* @author wyc
* @date 2022/11/9
*/
public class ClusterParam extends VersionItemParam {
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ClusterPhyParam extends VersionItemParam {
public class ClusterPhyParam extends ClusterParam {
protected Long clusterPhyId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
Expand All @@ -21,15 +22,15 @@ public abstract class AbstractHealthCheckService {

protected static final Map<
String,
Function<Tuple<ClusterPhyParam, BaseClusterHealthConfig>, HealthCheckResult>
Function<Tuple<ClusterParam, BaseClusterHealthConfig>, HealthCheckResult>
> functionMap = new ConcurrentHashMap<>();

public abstract List<ClusterPhyParam> getResList(Long clusterPhyId);
public abstract List<ClusterParam> getResList(Long clusterPhyId);

public abstract HealthCheckDimensionEnum getHealthCheckDimensionEnum();

public HealthCheckResult checkAndGetResult(ClusterPhyParam clusterPhyParam, BaseClusterHealthConfig clusterHealthConfig) {
if (ValidateUtils.anyNull(clusterPhyParam.getClusterPhyId(), clusterPhyParam, clusterHealthConfig)) {
public HealthCheckResult checkAndGetResult(ClusterParam clusterParam, BaseClusterHealthConfig clusterHealthConfig) {
if (ValidateUtils.anyNull( clusterParam, clusterHealthConfig)) {
return null;
}

Expand All @@ -39,16 +40,16 @@ public HealthCheckResult checkAndGetResult(ClusterPhyParam clusterPhyParam, Base
return null;
}

Function<Tuple<ClusterPhyParam, BaseClusterHealthConfig>, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName());
Function<Tuple<ClusterParam, BaseClusterHealthConfig>, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName());
if (function == null) {
return null;
}

try {
return function.apply(new Tuple<>(clusterPhyParam, clusterHealthConfig));
return function.apply(new Tuple<>(clusterParam, clusterHealthConfig));
} catch (Exception e) {
log.error("method=checkAndGetResult||clusterPhyParam={}||clusterHealthConfig={}||errMsg=exception!",
clusterPhyParam, clusterHealthConfig, e);
clusterParam, clusterHealthConfig, e);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.broker.BrokerParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
Expand Down Expand Up @@ -45,8 +46,8 @@ private void init() {
}

@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
List<ClusterPhyParam> paramList = new ArrayList<>();
public List<ClusterParam> getResList(Long clusterPhyId) {
List<ClusterParam> paramList = new ArrayList<>();
for (Broker broker: brokerService.listAliveBrokersFromDB(clusterPhyId)) {
paramList.add(new BrokerParam(clusterPhyId, broker.getBrokerId()));
}
Expand All @@ -61,7 +62,7 @@ public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
/**
* Broker网络处理线程平均值过低
*/
private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
BrokerParam param = (BrokerParam) paramTuple.getV1();
HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2();

Expand Down Expand Up @@ -96,7 +97,7 @@ private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple<Cluster
/**
* Broker请求队列满
*/
private HealthCheckResult checkBrokerRequestQueueFull(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkBrokerRequestQueueFull(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
BrokerParam param = (BrokerParam) paramTuple.getV1();
HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
Expand Down Expand Up @@ -34,7 +35,7 @@ private void init() {
}

@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
return Arrays.asList(new ClusterPhyParam(clusterPhyId));
}

Expand All @@ -46,8 +47,8 @@ public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
/**
* 检查NoController
*/
private HealthCheckResult checkClusterNoController(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ClusterPhyParam param = singleConfigSimpleTuple.getV1();
private HealthCheckResult checkClusterNoController(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ClusterPhyParam param =(ClusterPhyParam) singleConfigSimpleTuple.getV1();
HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2();

Result<ClusterMetrics> clusterMetricsResult = clusterMetricService.getLatestMetricsFromES(param.getClusterPhyId(), Arrays.asList(ClusterMetricVersionItems.CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.GroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
Expand Down Expand Up @@ -43,7 +44,7 @@ private void init() {
}

@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
return groupService.getGroupsFromDB(clusterPhyId).stream().map(elem -> new GroupParam(clusterPhyId, elem)).collect(Collectors.toList());
}

Expand All @@ -55,7 +56,7 @@ public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
/**
* 检查Group re-balance太频繁
*/
private HealthCheckResult checkReBalanceTooFrequently(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkReBalanceTooFrequently(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
GroupParam param = (GroupParam) paramTuple.getV1();
HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
Expand Down Expand Up @@ -49,8 +50,8 @@ private void init() {
}

@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
List<ClusterPhyParam> paramList = new ArrayList<>();
public List<ClusterParam> getResList(Long clusterPhyId) {
List<ClusterParam> paramList = new ArrayList<>();
for (Topic topic: topicService.listTopicsFromDB(clusterPhyId)) {
paramList.add(new TopicParam(clusterPhyId, topic.getTopicName()));
}
Expand All @@ -65,7 +66,7 @@ public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
/**
* 检查Topic长期未同步
*/
private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
TopicParam param = (TopicParam) paramTuple.getV1();
HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2();

Expand Down Expand Up @@ -97,7 +98,7 @@ private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple<ClusterPhyPar
/**
* 检查NoLeader
*/
private HealthCheckResult checkTopicNoLeader(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkTopicNoLeader(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
TopicParam param = (TopicParam) singleConfigSimpleTuple.getV1();
List<Partition> partitionList = partitionService.listPartitionFromCacheFirst(param.getClusterPhyId(), param.getTopicName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam;
Expand Down Expand Up @@ -58,7 +59,7 @@ private void init() {
}

@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return new ArrayList<>();
Expand All @@ -82,7 +83,7 @@ public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
return HealthCheckDimensionEnum.ZOOKEEPER;
}

private HealthCheckResult checkBrainSplit(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkBrainSplit(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2();

Expand All @@ -100,7 +101,7 @@ private HealthCheckResult checkBrainSplit(Tuple<ClusterPhyParam, BaseClusterHeal
return checkResult;
}

private HealthCheckResult checkOutstandingRequests(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkOutstandingRequests(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

Expand Down Expand Up @@ -135,7 +136,7 @@ private HealthCheckResult checkOutstandingRequests(Tuple<ClusterPhyParam, BaseCl
return checkResult;
}

private HealthCheckResult checkWatchCount(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkWatchCount(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

Expand Down Expand Up @@ -171,7 +172,7 @@ private HealthCheckResult checkWatchCount(Tuple<ClusterPhyParam, BaseClusterHeal
return checkResult;
}

private HealthCheckResult checkAliveConnections(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkAliveConnections(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

Expand Down Expand Up @@ -207,7 +208,7 @@ private HealthCheckResult checkAliveConnections(Tuple<ClusterPhyParam, BaseClust
return checkResult;
}

private HealthCheckResult checkApproximateDataSize(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkApproximateDataSize(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

Expand Down Expand Up @@ -243,7 +244,7 @@ private HealthCheckResult checkApproximateDataSize(Tuple<ClusterPhyParam, BaseCl
return checkResult;
}

private HealthCheckResult checkSentRate(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkSentRate(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
Expand Down Expand Up @@ -41,15 +42,15 @@ private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long tri
List<HealthCheckResult> resultList = new ArrayList<>();

// 遍历Check-Service
List<ClusterPhyParam> paramList = this.getCheckService().getResList(clusterPhy.getId());
List<ClusterParam> paramList = this.getCheckService().getResList(clusterPhy.getId());
if (ValidateUtils.isEmptyList(paramList)) {
// 当前无该维度的资源,则直接设置为
resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap));
}

// 遍历资源
for (ClusterPhyParam clusterPhyParam: paramList) {
resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap));
for (ClusterParam clusterParam: paramList) {
resultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap));
}

try {
Expand Down Expand Up @@ -93,13 +94,13 @@ private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealth
return resultList;
}

private List<HealthCheckResult> checkAndGetResult(ClusterPhyParam clusterPhyParam,
private List<HealthCheckResult> checkAndGetResult(ClusterParam clusterParam,
Map<String, BaseClusterHealthConfig> healthConfigMap) {
List<HealthCheckResult> resultList = new ArrayList<>();

// 进行检查
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterPhyParam, clusterHealthConfig);
HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterParam, clusterHealthConfig);
if (healthCheckResult == null) {
continue;
}
Expand Down

0 comments on commit 7661826

Please sign in to comment.