Skip to content

Commit

Permalink
[Optimize]增加Connector运行状态指标 (didi#1110)
Browse files Browse the repository at this point in the history
1、增加Connector运行状态指标;
2、将Connector指标上报普罗米修斯;
3、调整代码继承关系;
  • Loading branch information
ZQKC authored Aug 2, 2023
1 parent bdffc10 commit 55161e4
Showing 14 changed files with 156 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -12,20 +11,18 @@
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConnectClusterMetrics extends BaseMetrics {
private Long connectClusterId;
protected Long connectClusterId;

public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId ){
super(clusterPhyId);
this.connectClusterId = connectClusterId;
}

public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) {
ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId);
brokerMetrics.putMetric(metric, value);
return brokerMetrics;
public ConnectClusterMetrics(Long connectClusterId, String metricName, Float metricValue) {
this(null, connectClusterId);
this.putMetric(metricName, metricValue);
}

@Override
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -11,25 +9,19 @@
* @date 2022/11/2
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ConnectWorkerMetrics extends BaseMetrics {

private Long connectClusterId;

public class ConnectWorkerMetrics extends ConnectClusterMetrics {
private String workerId;

public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) {
ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics();
connectWorkerMetrics.setConnectClusterId(connectClusterId);
connectWorkerMetrics.setWorkerId(workerId);
connectWorkerMetrics.putMetric(metric, value);
return connectWorkerMetrics;
public ConnectWorkerMetrics(Long connectClusterId, String workerId, String metricName, Float metricValue) {
super(null, connectClusterId);
this.workerId = workerId;
this.putMetric(metricName, metricValue);
}

@Override
public String unique() {
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
return "KCW@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -12,24 +11,21 @@
@Data
@NoArgsConstructor
@ToString
public class ConnectorMetrics extends BaseMetrics {
private Long connectClusterId;
public class ConnectorMetrics extends ConnectClusterMetrics {
protected String connectorName;

private String connectorName;

private String connectorNameAndClusterId;
protected String connectorNameAndClusterId;

public ConnectorMetrics(Long connectClusterId, String connectorName) {
super(null);
super(null, connectClusterId);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}

public static ConnectorMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) {
ConnectorMetrics metrics = new ConnectorMetrics(connectClusterId, connectorName);
metrics.putMetric(metricName, value);
return metrics;
public ConnectorMetrics(Long connectClusterId, String connectorName, String metricName, Float metricValue) {
this(connectClusterId, connectorName);
this.putMetric(metricName, metricValue);
}

@Override
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;

import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -12,11 +11,7 @@
@Data
@NoArgsConstructor
@ToString
public class ConnectorTaskMetrics extends BaseMetrics {
private Long connectClusterId;

private String connectorName;

public class ConnectorTaskMetrics extends ConnectorMetrics {
private Integer taskId;

public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId) {
@@ -25,14 +20,13 @@ public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer
this.taskId = taskId;
}

public static ConnectorTaskMetrics initWithMetric(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float value) {
ConnectorTaskMetrics metrics = new ConnectorTaskMetrics(connectClusterId, connectorName, taskId);
metrics.putMetric(metricName,value);
return metrics;
public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float metricValue) {
this(connectClusterId, connectorName, taskId);
this.putMetric(metricName, metricValue);
}

@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + connectorName + "@" + taskId;
return "KCORT@" + connectClusterId + "@" + connectorName + "@" + taskId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.xiaojukeji.know.streaming.km.common.enums.connect;

import org.apache.kafka.connect.runtime.AbstractStatus;

/**
* connector运行状态
* @see AbstractStatus
*/
public enum ConnectStatusEnum {
UNASSIGNED(0, "UNASSIGNED"),

RUNNING(1,"RUNNING"),

PAUSED(2,"PAUSED"),

FAILED(3, "FAILED"),

DESTROYED(4, "DESTROYED"),

UNKNOWN(-1, "UNKNOWN")

;

ConnectStatusEnum(int status, String value) {
this.status = status;
this.value = value;
}

private final int status;

private final String value;

public static ConnectStatusEnum getByValue(String value) {
for (ConnectStatusEnum statusEnum: ConnectStatusEnum.values()) {
if (statusEnum.value.equals(value)) {
return statusEnum;
}
}

return ConnectStatusEnum.UNKNOWN;
}

public int getStatus() {
return status;
}

public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -24,9 +24,9 @@
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectClusterMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.cluster.ConnectClusterMetricESDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -43,7 +43,7 @@
* @author didi
*/
@Service
public class ConnectClusterMetricServiceImpl extends BaseConnectorMetricService implements ConnectClusterMetricService {
public class ConnectClusterMetricServiceImpl extends BaseConnectMetricService implements ConnectClusterMetricService {
protected static final ILog LOGGER = LogFactory.getLog(ConnectClusterMetricServiceImpl.class);

public static final String CONNECT_CLUSTER_METHOD_GET_WORKER_METRIC_AVG = "getWorkerMetricAvg";
@@ -86,8 +86,7 @@ public Result<ConnectClusterMetrics> collectConnectClusterMetricsFromKafkaWithCa
String connectClusterMetricKey = CollectedMetricsLocalCache.genConnectClusterMetricCacheKey(connectClusterPhyId, metric);
Float keyValue = CollectedMetricsLocalCache.getConnectClusterMetrics(connectClusterMetricKey);
if (keyValue != null) {
ConnectClusterMetrics connectClusterMetrics = ConnectClusterMetrics.initWithMetric(connectClusterPhyId,metric,keyValue);
return Result.buildSuc(connectClusterMetrics);
return Result.buildSuc(new ConnectClusterMetrics(connectClusterPhyId, metric, keyValue));
}

Result<ConnectClusterMetrics> ret = this.collectConnectClusterMetricsFromKafka(connectClusterPhyId, metric);
@@ -209,8 +208,7 @@ private Result<ConnectWorkerMetrics> getConnectWorkerMetricByJMX(Long connectClu
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
ConnectWorkerMetrics connectWorkerMetrics = ConnectWorkerMetrics.initWithMetric(connectClusterId, workerId, metric, Float.valueOf(value));
return Result.buildSuc(connectWorkerMetrics);
return Result.buildSuc(new ConnectWorkerMetrics(connectClusterId, workerId, metric, Float.valueOf(value)));
} catch (Exception e) {
LOGGER.error("method=getConnectWorkerMetricsByJMX||connectClusterId={}||workerId={}||metrics={}||jmx={}||msg={}",
connectClusterId, workerId, metric, jmxInfo.getJmxObjectName(), e.getClass().getName());
@@ -231,8 +229,8 @@ private List<Long> listTopNConnectClusterIdList(Long clusterPhyId, Integer topN)
.collect(Collectors.toList());
}

protected List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
private List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Long, List<MetricPointVO>>> map){
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
if (map == null || map.isEmpty()) {
// 如果为空,则直接返回
Loading

0 comments on commit 55161e4

Please sign in to comment.