Skip to content

Commit

Permalink
Merge pull request didi#341 from didi/dev
Browse files Browse the repository at this point in the history
Topic基本信息中增加retention.bytes信息
  • Loading branch information
ZQKC authored Jul 2, 2021
2 parents a2228d0 + 0f74691 commit 1cd524c
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class TopicCreationConstant {

public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";

public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";

public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;

public static Properties createNewProperties(Long retentionTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TopicBasicDTO {

private Long retentionTime;

private Long retentionBytes;

public Long getClusterId() {
return clusterId;
}
Expand Down Expand Up @@ -157,6 +159,14 @@ public void setRetentionTime(Long retentionTime) {
this.retentionTime = retentionTime;
}

public Long getRetentionBytes() {
return retentionBytes;
}

public void setRetentionBytes(Long retentionBytes) {
this.retentionBytes = retentionBytes;
}

@Override
public String toString() {
return "TopicBasicDTO{" +
Expand All @@ -166,7 +176,7 @@ public String toString() {
", principals='" + principals + '\'' +
", topicName='" + topicName + '\'' +
", description='" + description + '\'' +
", regionNameList='" + regionNameList + '\'' +
", regionNameList=" + regionNameList +
", score=" + score +
", topicCodeC='" + topicCodeC + '\'' +
", partitionNum=" + partitionNum +
Expand All @@ -175,6 +185,7 @@ public String toString() {
", modifyTime=" + modifyTime +
", createTime=" + createTime +
", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class TopicBasicVO {
@ApiModelProperty(value = "存储时间(ms)")
private Long retentionTime;

@ApiModelProperty(value = "单分区数据保存大小(Byte)")
private Long retentionBytes;

@ApiModelProperty(value = "创建时间")
private Long createTime;

Expand Down Expand Up @@ -62,12 +65,20 @@ public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}

public String getTopicCodeC() {
return topicCodeC;
public String getAppId() {
return appId;
}

public void setTopicCodeC(String topicCodeC) {
this.topicCodeC = topicCodeC;
public void setAppId(String appId) {
this.appId = appId;
}

public String getAppName() {
return appName;
}

public void setAppName(String appName) {
this.appName = appName;
}

public Integer getPartitionNum() {
Expand All @@ -86,76 +97,76 @@ public void setReplicaNum(Integer replicaNum) {
this.replicaNum = replicaNum;
}

public Long getModifyTime() {
return modifyTime;
public String getPrincipals() {
return principals;
}

public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
public void setPrincipals(String principals) {
this.principals = principals;
}

public Long getCreateTime() {
return createTime;
public Long getRetentionTime() {
return retentionTime;
}

public void setCreateTime(Long createTime) {
this.createTime = createTime;
public void setRetentionTime(Long retentionTime) {
this.retentionTime = retentionTime;
}

public String getPrincipals() {
return principals;
public Long getRetentionBytes() {
return retentionBytes;
}

public void setPrincipals(String principals) {
this.principals = principals;
public void setRetentionBytes(Long retentionBytes) {
this.retentionBytes = retentionBytes;
}

public String getDescription() {
return description;
public Long getCreateTime() {
return createTime;
}

public void setDescription(String description) {
this.description = description;
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}

public void setAppId(String appId) {
this.appId = appId;
public Long getModifyTime() {
return modifyTime;
}

public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
}

public String getAppId() {
return appId;
public Integer getScore() {
return score;
}

public String getBootstrapServers() {
return bootstrapServers;
public void setScore(Integer score) {
this.score = score;
}

public Long getRetentionTime() {
return retentionTime;
public String getTopicCodeC() {
return topicCodeC;
}

public void setRetentionTime(Long retentionTime) {
this.retentionTime = retentionTime;
public void setTopicCodeC(String topicCodeC) {
this.topicCodeC = topicCodeC;
}

public String getAppName() {
return appName;
public String getDescription() {
return description;
}

public void setAppName(String appName) {
this.appName = appName;
public void setDescription(String description) {
this.description = description;
}

public Integer getScore() {
return score;
public String getBootstrapServers() {
return bootstrapServers;
}

public void setScore(Integer score) {
this.score = score;
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public List<String> getRegionNameList() {
Expand All @@ -176,6 +187,7 @@ public String toString() {
", replicaNum=" + replicaNum +
", principals='" + principals + '\'' +
", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
", createTime=" + createTime +
", modifyTime=" + modifyTime +
", score=" + score +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
Expand Down Expand Up @@ -56,7 +58,7 @@ public class PhysicalClusterMetadataManager {

private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();

private final static Map<Long, Map<String, Long>> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();

private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -95,7 +97,7 @@ public synchronized void addNew(ClusterDO clusterDO) {

// 初始化topic-map
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());

// 初始化cluster-map
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
Expand Down Expand Up @@ -158,7 +160,7 @@ public void remove(Long clusterId) {
KAFKA_VERSION_MAP.remove(clusterId);

TOPIC_METADATA_MAP.remove(clusterId);
TOPIC_RETENTION_TIME_MAP.remove(clusterId);
TOPIC_PROPERTIES_MAP.remove(clusterId);
CLUSTER_MAP.remove(clusterId);
}

Expand Down Expand Up @@ -262,24 +264,45 @@ public static boolean isTopicExistStrictly(Long clusterId, String topicName) {

//---------------------------配置相关元信息--------------

public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
if (timeMap == null) {
public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
return;
}
timeMap.put(topicName, retentionTime);

Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return;
}
propertiesMap.put(topicName, properties);
}

public static Long getTopicRetentionTime(Long clusterId, String topicName) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
if (timeMap == null) {
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return null;
}

Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
}
return timeMap.get(topicName);

return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
}

public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return null;
}

Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
}

return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
}

//---------------------------Broker元信息相关--------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public TopicBasicDTO getTopicBasicDTO(Long clusterId, String topicName) {
basicDTO.setCreateTime(topicMetadata.getCreateTime());
basicDTO.setModifyTime(topicMetadata.getModifyTime());
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));

TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
if (!ValidateUtils.isNull(topicDO)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Properties;

/**
* @author zengqiao
* @date 20/7/23
*/
@Component
public class FlushTopicRetentionTime {
public class FlushTopicProperties {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);

@Autowired
Expand All @@ -33,30 +34,28 @@ public void flush() {
try {
flush(clusterDO);
} catch (Exception e) {
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
}
}
}

private void flush(ClusterDO clusterDO) {
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
if (ValidateUtils.isNull(zkConfig)) {
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
return;
}

for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
if (retentionTime == null) {
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName);
Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
if (ValidateUtils.isNull(properties)) {
LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
continue;
}
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
} catch (Exception e) {
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName, e);
LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ public class NormalTopicController {
@ApiOperation(value = "Topic基本信息", notes = "")
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
@ResponseBody
public Result<TopicBasicVO> getTopicBasic(
@PathVariable Long clusterId,
@PathVariable String topicName,
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
public Result<TopicBasicVO> getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static TopicBasicVO convert2TopicBasicVO(TopicBasicDTO dto, ClusterDO clu
vo.setReplicaNum(dto.getReplicaNum());
vo.setPrincipals(dto.getPrincipals());
vo.setRetentionTime(dto.getRetentionTime());
vo.setRetentionBytes(dto.getRetentionBytes());
vo.setCreateTime(dto.getCreateTime());
vo.setModifyTime(dto.getModifyTime());
vo.setScore(dto.getScore());
Expand Down

0 comments on commit 1cd524c

Please sign in to comment.