Skip to content

Commit

Permalink
cherry-pick some changes from feature-snapshot to feature
Browse files Browse the repository at this point in the history
Change-Id: I92a6df831c7a863ed324fae35382ced4fe152733
  • Loading branch information
yangyaokai committed Nov 27, 2019
1 parent df3eb4d commit a8701bc
Show file tree
Hide file tree
Showing 22 changed files with 414 additions and 119 deletions.
6 changes: 6 additions & 0 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ message ChunkServerStatisticInfo {
required uint32 writeRate = 2;
required uint32 readIOPS = 3;
required uint32 writeIOPS = 4;
// 已使用的chunk占用的磁盘空间
required uint64 chunkSizeUsedBytes = 5;
// chunkfilepool中未使用的chunk占用的磁盘空间
required uint64 chunkSizeLeftBytes = 6;
// 回收站中chunk占用的磁盘空间
required uint64 chunkSizeTrashedBytes = 7;
};

message ChunkServerHeartbeatRequest {
Expand Down
37 changes: 17 additions & 20 deletions src/chunkserver/chunkserver_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ ChunkServerMetric::ChunkServerMetric()
, readMetric_(nullptr)
, writeMetric_(nullptr)
, leaderCount_(nullptr)
, chunkLeft_(nullptr) {}
, chunkLeft_(nullptr)
, chunkTrashed_(nullptr) {}

ChunkServerMetric* ChunkServerMetric::self_ = nullptr;

Expand Down Expand Up @@ -206,8 +207,8 @@ int ChunkServerMetric::Fini() {
writeMetric_ = nullptr;
leaderCount_ = nullptr;
chunkLeft_ = nullptr;
chunkTrashed_ = nullptr;
copysetMetricMap_.clear();
configMetric_.clear();
hasInited_ = false;
return 0;
}
Expand Down Expand Up @@ -343,6 +344,16 @@ void ChunkServerMetric::MonitorChunkFilePool(ChunkfilePool* chunkfilePool) {
chunkLeftPrefix, getChunkLeftFunc, chunkfilePool);
}

void ChunkServerMetric::MonitorTrash(Trash* trash) {
if (!option_.collectMetric) {
return;
}

std::string chunkTrashedPrefix = Prefix() + "_chunk_trashed";
chunkTrashed_ = std::make_shared<bvar::PassiveStatus<uint32_t>>(
chunkTrashedPrefix, getChunkTrashedFunc, trash);
}

void ChunkServerMetric::IncreaseLeaderCount() {
if (!option_.collectMetric) {
return;
Expand All @@ -359,28 +370,14 @@ void ChunkServerMetric::DecreaseLeaderCount() {
*leaderCount_ << -1;
}

void ChunkServerMetric::UpdateConfigMetric(const common::Configuration& conf) {
void ChunkServerMetric::UpdateConfigMetric(common::Configuration* conf) {
if (!option_.collectMetric) {
return;
}

std::string prefix = Prefix() + "_config";
std::map<std::string, std::string> configs = conf.ListConfig();
for (auto& config : configs) {
std::string configKey = config.first;
std::string configValue = config.second;
auto it = configMetric_.find(configKey);
// 如果配置项不存在,则新建配置项
if (it == configMetric_.end()) {
ConfigItemPtr configItem =
std::make_shared<bvar::Status<std::string>>(prefix,
configKey,
nullptr);
configMetric_[configKey] = configItem;
}
// 更新配置项
configMetric_[configKey]->set_value(configValue);
}
std::string exposeName = Prefix() + "_config";
conf->ExposeMetric(exposeName);
conf->UpdateMetric();
}

} // namespace chunkserver
Expand Down
45 changes: 38 additions & 7 deletions src/chunkserver/chunkserver_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace chunkserver {
class CopysetNodeManager;
class ChunkfilePool;
class CSDataStore;
class Trash;

template <typename Tp>
using PassiveStatusPtr = std::shared_ptr<bvar::PassiveStatus<Tp>>;
Expand Down Expand Up @@ -160,10 +161,16 @@ class CSCopysetMetric {
}

const uint32_t GetChunkCount() const {
if (chunkCount_ == nullptr) {
return 0;
}
return chunkCount_->get_value();
}

const uint32_t GetSnapshotCount() const {
if (snapshotCount_ == nullptr) {
return 0;
}
return snapshotCount_->get_value();
}

Expand Down Expand Up @@ -202,8 +209,6 @@ struct ChunkServerMetricOptions {

using CopysetMetricPtr = std::shared_ptr<CSCopysetMetric>;
using CopysetMetricMap = std::unordered_map<GroupId, CopysetMetricPtr>;
using ConfigItemPtr = std::shared_ptr<bvar::Status<std::string>>;
using ConfigMetricMap = std::unordered_map<std::string, ConfigItemPtr>;

class ChunkServerMetric : public Uncopyable {
public:
Expand Down Expand Up @@ -302,6 +307,12 @@ class ChunkServerMetric : public Uncopyable {
*/
void MonitorChunkFilePool(ChunkfilePool* chunkfilePool);

/**
* 监视回收站
* @param trash: trash的对象指针
*/
void MonitorTrash(Trash* trash);

/**
* 增加 leader count 计数
*/
Expand All @@ -316,7 +327,7 @@ class ChunkServerMetric : public Uncopyable {
* 更新配置项数据
* @param conf: 配置内容
*/
void UpdateConfigMetric(const common::Configuration& conf);
void UpdateConfigMetric(common::Configuration* conf);

// 下列函数用户获取各项metric 指标
const IOMetricPtr GetReadMetric() const {
Expand All @@ -338,14 +349,34 @@ class ChunkServerMetric : public Uncopyable {
return leaderCount_->get_value();
}

const uint32_t GetTotalChunkCount() {
uint32_t totalChunkCount = 0;
ReadLockGuard lockGuard(rwLock_);
for (auto& iter : copysetMetricMap_) {
totalChunkCount += iter.second->GetChunkCount();
}
return totalChunkCount;
}

const uint32_t GetTotalSnapshotCount() {
uint32_t totalSnapshotCount = 0;
ReadLockGuard lockGuard(rwLock_);
for (auto& iter : copysetMetricMap_) {
totalSnapshotCount += iter.second->GetSnapshotCount();
}
return totalSnapshotCount;
}

const uint32_t GetChunkLeftCount() const {
if (chunkLeft_ == nullptr)
return 0;
return chunkLeft_->get_value();
}

const ConfigMetricMap GetConfigMetric() const {
return configMetric_;
const uint32_t GetChunkTrashedCount() const {
if (chunkTrashed_ == nullptr)
return 0;
return chunkTrashed_->get_value();
}

private:
Expand All @@ -370,10 +401,10 @@ class ChunkServerMetric : public Uncopyable {
AdderPtr<uint32_t> leaderCount_;
// chunkfilepool 中剩余的 chunk 的数量
PassiveStatusPtr<uint32_t> chunkLeft_;
// trash 中的 chunk 的数量
PassiveStatusPtr<uint32_t> chunkTrashed_;
// 各复制组metric的映射表,用GroupId作为key
CopysetMetricMap copysetMetricMap_;
// chunkserver配置的metric
ConfigMetricMap configMetric_;
// 用于单例模式的自指指针
static ChunkServerMetric* self_;
};
Expand Down
6 changes: 4 additions & 2 deletions src/chunkserver/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ butil::Status GetLeader(const LogicPoolID &logicPoolId,
"Fail to get leader of copyset node %s",
ToGroupIdString(logicPoolId, copysetId).c_str());
leaderId->reset();
for (Configuration::const_iterator
iter = conf.begin(); iter != conf.end(); ++iter) {
Configuration::const_iterator iter = conf.begin();
for (; iter != conf.end(); ++iter) {
brpc::Channel channel;
if (channel.Init(iter->addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
Expand Down Expand Up @@ -60,6 +60,8 @@ butil::Status GetLeader(const LogicPoolID &logicPoolId,
if (leaderId->is_empty()) {
return st;
}
LOG(INFO) << "Get leader from " << iter->to_string().c_str()
<< " success, leader is " << *leaderId;
return butil::Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ bool CopysetNodeManager::DeleteCopysetNode(const LogicPoolID &logicPoolId,
if (copysetNodeMap_.end() != it) {
copysetNodeMap_.erase(it);
ret = true;
LOG(INFO) << "Delete copyset success, groupid: " << groupId;
}
}

Expand Down Expand Up @@ -290,6 +291,7 @@ bool CopysetNodeManager::PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
<< ", " << copysetId << "> persistently";
ret = false;
}
LOG(INFO) << "Move copyset to trash success, groupid: " << groupId;
copysetNodeMap_.erase(it);
ret = true;
}
Expand Down
18 changes: 13 additions & 5 deletions src/chunkserver/heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,27 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) {
diskState->set_errmsg("");
req->set_allocated_diskstate(diskState);

ChunkServerMetric* metric = ChunkServerMetric::GetInstance();
curve::mds::heartbeat::ChunkServerStatisticInfo* stats =
new curve::mds::heartbeat::ChunkServerStatisticInfo();
IOMetricPtr readMetric =
ChunkServerMetric::GetInstance()->GetReadMetric();
IOMetricPtr writeMetric =
ChunkServerMetric::GetInstance()->GetWriteMetric();
IOMetricPtr readMetric = metric->GetReadMetric();
IOMetricPtr writeMetric = metric->GetWriteMetric();
if (readMetric != nullptr && writeMetric != nullptr) {
stats->set_readrate(readMetric->bps_.get_value(1));
stats->set_writerate(writeMetric->bps_.get_value(1));
stats->set_readiops(readMetric->iops_.get_value(1));
stats->set_writeiops(writeMetric->iops_.get_value(1));
req->set_allocated_stats(stats);
}
CopysetNodeOptions opt = copysetMan_->GetCopysetNodeOptions();
uint64_t chunkFileSize = opt.maxChunkSize;
uint64_t usedChunkSize = metric->GetTotalSnapshotCount() * chunkFileSize
+ metric->GetTotalChunkCount() * chunkFileSize;
uint64_t trashedChunkSize = metric->GetChunkTrashedCount() * chunkFileSize;
uint64_t leftChunkSize = metric->GetChunkLeftCount() * chunkFileSize;
stats->set_chunksizeusedbytes(usedChunkSize);
stats->set_chunksizeleftbytes(leftChunkSize);
stats->set_chunksizetrashedbytes(trashedChunkSize);
req->set_allocated_stats(stats);

size_t cap, avail;
ret = GetFileSystemSpaces(&cap, &avail);
Expand Down
9 changes: 9 additions & 0 deletions src/chunkserver/passive_getfn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,14 @@ uint32_t getDatastoreSnapshotCountFunc(void* arg) {
return snapshotCount;
}

uint32_t getChunkTrashedFunc(void* arg) {
Trash* trash = reinterpret_cast<Trash*>(arg);
uint32_t chunkTrashed = 0;
if (trash != nullptr) {
chunkTrashed = trash->GetChunkNum();
}
return chunkTrashed;
}

} // namespace chunkserver
} // namespace curve
6 changes: 6 additions & 0 deletions src/chunkserver/passive_getfn.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef SRC_CHUNKSERVER_PASSIVE_GETFN_H_
#define SRC_CHUNKSERVER_PASSIVE_GETFN_H_

#include "src/chunkserver/trash.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/datastore/chunkfile_pool.h"

Expand All @@ -29,6 +30,11 @@ namespace chunkserver {
* @param arg: datastore的对象指针
*/
uint32_t getDatastoreSnapshotCountFunc(void* arg);
/**
* 获取trash中chunk的数量
* @param arg: trash的对象指针
*/
uint32_t getChunkTrashedFunc(void* arg);

} // namespace chunkserver
} // namespace curve
Expand Down
1 change: 1 addition & 0 deletions src/chunkserver/raftsnapshot_filesystem_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RaftSnapshotFilesystemAdaptor::RaftSnapshotFilesystemAdaptor(
RaftSnapshotFilesystemAdaptor::~RaftSnapshotFilesystemAdaptor() {
delete[] tempMetaPageContent;
tempMetaPageContent = nullptr;
LOG(INFO) << "release raftsnapshot filesystem adaptor!";
}

braft::FileAdaptor* RaftSnapshotFilesystemAdaptor::open(const std::string& path,
Expand Down
49 changes: 44 additions & 5 deletions src/chunkserver/trash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ int Trash::Init(TrashOptions options) {
scanPeriodSec_ = options.scanPeriodSec;
localFileSystem_ = options.localFileSystem;
chunkfilePool_ = options.chunkfilePool;
chunkNum_.store(0);

// 读取trash目录下的所有目录
std::vector<std::string> files;
localFileSystem_->List(trashPath_, &files);

// 遍历trash下的文件
for (auto &file : files) {
// 如果不是copyset目录,跳过
if (!IsCopysetInTrash(file)) {
continue;
}
std::string copysetDir = trashPath_ + "/" + file;
uint32_t chunkNum = CountChunkNumInCopyset(copysetDir);
chunkNum_.fetch_add(chunkNum);
}

return 0;
}
Expand Down Expand Up @@ -92,7 +108,8 @@ int Trash::RecycleCopySet(const std::string &dirPath) {
LOG(ERROR) << "rename " << dirPath << " to " << dst << " error";
return -1;
}

uint32_t chunkNum = CountChunkNumInCopyset(dst);
chunkNum_.fetch_add(chunkNum);
return 0;
}

Expand Down Expand Up @@ -127,7 +144,7 @@ void Trash::DeleteEligibleFileInTrash() {
// 遍历trash下的文件
for (auto &file : files) {
// 如果不是copyset目录,跳过
if (!IsCopySetDir(file)) {
if (!IsCopysetInTrash(file)) {
continue;
}

Expand All @@ -137,12 +154,11 @@ void Trash::DeleteEligibleFileInTrash() {
}

// 清理copyset目录
std::string copysetPath = trashPath_ + "/" + file;
CleanCopySet(copysetPath);
CleanCopySet(copysetDir);
}
}

bool Trash::IsCopySetDir(const std::string &dirName) {
bool Trash::IsCopysetInTrash(const std::string &dirName) {
// 合法的copyset目录: 高32位PoolId(>0)组成, 低32位由copysetId(>0)组成
// 目录是十进制形式
// 例如:2860448220024 (poolId: 666, copysetId: 888)
Expand Down Expand Up @@ -250,8 +266,31 @@ void Trash::RecycleChunks(const std::string &dataPath) {
if (0 != chunkfilePool_->RecycleChunk(chunkPath)) {
LOG(ERROR) << "Trash failed recycle chunk " << chunkPath
<< " to chunkfilePool";
continue;
}

chunkNum_.fetch_sub(1);
}
}

uint32_t Trash::CountChunkNumInCopyset(const std::string &copysetPath) {
std::string dataPath = copysetPath + "/" + RAFT_DATA_DIR;
std::vector<std::string> chunks;
localFileSystem_->List(dataPath, &chunks);

uint32_t chunkNum = 0;
// 遍历data下面的chunk
for (auto &chunk : chunks) {
// 不是chunkfile或者snapshotfile
if (!IsChunkOrSnapShotFile(chunk)) {
LOG(WARNING) << "Trash find a illegal file:"
<< chunk << " in " << dataPath
<< ", filename: " << chunk;
continue;
}
++chunkNum;
}
return chunkNum;
}

} // namespace chunkserver
Expand Down
Loading

0 comments on commit a8701bc

Please sign in to comment.