Skip to content

Commit

Permalink
curvefs/client: use separate thread to call SetDiskInitUsedBytes(open…
Browse files Browse the repository at this point in the history
…curve#1842)

SetDiskInitUsedBytes may takes a long time, so use a separate thread to do this.
otherwise it will block FuseOpInit

signed-off-by: hzwuhongsong [email protected]
  • Loading branch information
wuhongsong committed Aug 25, 2022
1 parent 6180f78 commit de5102a
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 35 deletions.
2 changes: 1 addition & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void FuseClient::WarmUpTask() {
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
const_cast<char*>(pDelimiter.c_str()), &pSave);
if (nullptr == pToken) {
VLOG(3) << "warmUpTask nullptr";
VLOG(6) << "warmUpTask nullptr";
continue;
}
Dentry dentry;
Expand Down
6 changes: 3 additions & 3 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,18 +326,18 @@ class FuseClient {
pToken = strtok_r(const_cast<char*>(srcStr.c_str()),
const_cast<char*>(delimiter.c_str()), &pSave);
if (nullptr == pToken) {
VLOG(3) << "whsdel lookpath end";
VLOG(6) << "del lookpath end";
return;
}
splitPath->push_back(pToken);
while (true) {
pToken = strtok_r(NULL, const_cast<char*>(
delimiter.c_str()), &pSave);
if (nullptr == pToken) {
VLOG(3) << "whsdel lookpath end";
VLOG(6) << "del lookpath end";
break;
}
VLOG(9) << "whsdel pToken is:" << pToken
VLOG(9) << "del pToken is:" << pToken
<< "pSave:" << pSave;
splitPath->push_back(pToken);
}
Expand Down
22 changes: 16 additions & 6 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ DiskCacheManager::DiskCacheManager(std::shared_ptr<PosixWrapper> posixWrapper,
diskFsUsedRatio_ = 0;
fullRatio_ = 0;
safeRatio_ = 0;
diskUsedInit_ = false;
maxUsableSpaceBytes_ = 0;
// cannot limit the size,
// because cache is been delete must after upload to s3
Expand Down Expand Up @@ -174,6 +175,7 @@ bool DiskCacheManager::IsCached(const std::string name) {
int DiskCacheManager::UmountDiskCache() {
LOG(INFO) << "umount disk cache.";
int ret;
diskInitThread_.join();
ret = cacheWrite_->UploadAllCacheWriteFile();
if (ret < 0) {
LOG(ERROR) << "umount disk cache error.";
Expand Down Expand Up @@ -277,7 +279,7 @@ int64_t DiskCacheManager::SetDiskFsUsedRatio() {
return -1;
}
int64_t usedPercent = 100 * usedBytes / (usedBytes + availableBytes) + 1;
diskFsUsedRatio_.store(usedPercent, std::memory_order_seq_cst);
diskFsUsedRatio_.store(usedPercent);
return usedPercent;
}

Expand All @@ -297,15 +299,16 @@ void DiskCacheManager::SetDiskInitUsedBytes() {
<< "get disk used size failed.";
return;
}
usedBytes_.fetch_add(usedBytes, std::memory_order_seq_cst);
usedBytes_.fetch_add(usedBytes);
if (metric_.get() != nullptr)
metric_->diskUsedBytes.set_value(usedBytes_);
diskUsedInit_.store(true);
VLOG(9) << "cache disk used size is: " << result;
return;
}

bool DiskCacheManager::IsDiskCacheFull() {
int64_t ratio = diskFsUsedRatio_.load(std::memory_order_seq_cst);
int64_t ratio = diskFsUsedRatio_.load();
uint64_t usedBytes = GetDiskUsedbytes();
if (ratio >= fullRatio_ || usedBytes >= maxUsableSpaceBytes_) {
VLOG(6) << "disk cache is full"
Expand All @@ -325,7 +328,7 @@ bool DiskCacheManager::IsDiskCacheSafe() {
if (IsExceedFileNums()) {
return false;
}
int64_t ratio = diskFsUsedRatio_.load(std::memory_order_seq_cst);
int64_t ratio = diskFsUsedRatio_.load();
uint64_t usedBytes = GetDiskUsedbytes();
if ((usedBytes < (safeRatio_ * maxUsableSpaceBytes_ / 100))
&& (ratio < safeRatio_)) {
Expand Down Expand Up @@ -354,6 +357,10 @@ void DiskCacheManager::TrimCache() {
const std::chrono::seconds sleepSec(trimCheckIntervalSec_);
LOG(INFO) << "trim function start.";
waitIntervalSec_.Init(trimCheckIntervalSec_ * 1000);
// trim will start after get the disk size
while (!IsDiskUsedInited()) {
waitIntervalSec_.WaitForNextExcution();
}
// 1. check cache disk usage every sleepSec seconds.
// 2. if cache disk is full,
// then remove disk file until cache disk is lower than safeRatio_.
Expand Down Expand Up @@ -450,8 +457,11 @@ void DiskCacheManager::InitMetrics(const std::string &fsName) {
cacheWrite_->InitMetrics(metric_);
cacheRead_->InitMetrics(metric_);
// this function move to here from init,
// Otherwise, you can't get the original metric
SetDiskInitUsedBytes();
// Otherwise, you can't get the original metric.
// SetDiskInitUsedBytes may takes a long time,
// so use a separate thread to do this.
diskInitThread_ = curve::common::Thread(
&DiskCacheManager::SetDiskInitUsedBytes, this);
}

} // namespace client
Expand Down
12 changes: 12 additions & 0 deletions curvefs/src/client/s3/disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,16 @@ class DiskCacheManager {
* @brief: stop trim thread.
*/
int TrimStop();

void InitMetrics(const std::string &fsName);

/**
* @brief: has geted the origin used size or not.
*/
virtual bool IsDiskUsedInited() {
return diskUsedInit_.load();
}

private:
/**
* @brief add the used bytes of disk cache.
Expand Down Expand Up @@ -179,6 +187,10 @@ class DiskCacheManager {
Throttle diskCacheThrottle_;

S3ClientAdaptorOption option_;

// has geted the origin used size or not
std::atomic<bool> diskUsedInit_;
curve::common::Thread diskInitThread_;
};


Expand Down
8 changes: 5 additions & 3 deletions curvefs/src/client/s3/disk_cache_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ int DiskCacheManagerImpl::WriteDiskFile(const std::string name, const char *buf,
uint64_t length) {
VLOG(9) << "write name = " << name << ", length = " << length;
// if cache disk is full
if (diskCacheManager_->IsDiskCacheFull()) {
if (!diskCacheManager_->IsDiskUsedInited() ||
diskCacheManager_->IsDiskCacheFull()) {
VLOG(6) << "write disk file fail, disk full.";
return -1;
}
Expand All @@ -115,12 +116,13 @@ int DiskCacheManagerImpl::WriteDiskFile(const std::string name, const char *buf,

// notify async load to s3
diskCacheManager_->AsyncUploadEnqueue(name);
return writeRet;
return 0;
}

int DiskCacheManagerImpl::WriteReadDirect(const std::string fileName,
const char *buf, uint64_t length) {
if (diskCacheManager_->IsDiskCacheFull()) {
if (!diskCacheManager_->IsDiskUsedInited() ||
diskCacheManager_->IsDiskCacheFull()) {
VLOG(6) << "write disk file fail, disk full.";
return -1;
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/test/client/mock_disk_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class MockDiskCacheManager : public DiskCacheManager {
MOCK_METHOD3(WriteReadDirect,
int(const std::string fileName,
const char* buf, uint64_t length));
MOCK_METHOD0(IsDiskUsedInited,
bool());
};

class MockDiskCacheManager2 : public DiskCacheManager {
Expand Down
81 changes: 60 additions & 21 deletions curvefs/test/client/test_disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,20 @@ TEST_F(TestDiskCacheManager, TrimStop) {
}

TEST_F(TestDiskCacheManager, TrimRun_1) {
S3ClientAdaptorOption option;
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
EXPECT_CALL(*wrapper, statfs(NotNull(), NotNull()))
.WillRepeatedly(Return(-1));
int ret = diskCacheManager_->TrimRun();
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, TrimCache_2) {
Expand All @@ -298,15 +307,19 @@ TEST_F(TestDiskCacheManager, TrimCache_2) {
.WillRepeatedly(Return(buf));
EXPECT_CALL(*diskCacheRead_, GetCacheIoFullDir())
.WillRepeatedly(Return(buf));
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull()))
.Times(3)
.WillOnce(Return(0))
.WillOnce(Return(-1))
S3ClientAdaptorOption option;
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
int ret = diskCacheManager_->TrimRun();
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, TrimCache_4) {
Expand All @@ -331,10 +344,19 @@ TEST_F(TestDiskCacheManager, TrimCache_4) {
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull()))
.WillRepeatedly(Return(-1));
EXPECT_CALL(*wrapper, remove(_)).WillRepeatedly(Return(-1));
S3ClientAdaptorOption option;
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
int ret = diskCacheManager_->TrimRun();
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, TrimCache_5) {
Expand All @@ -360,17 +382,26 @@ TEST_F(TestDiskCacheManager, TrimCache_5) {
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull()))
.WillRepeatedly(Return(-1));
EXPECT_CALL(*wrapper, remove(_)).WillRepeatedly(Return(0));
S3ClientAdaptorOption option;
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull())).WillOnce(Return(-1));
EXPECT_CALL(*wrapper, mkdir(_, _)).WillOnce(Return(-1));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->Init(client_, option);
diskCacheManager_->InitMetrics("test");
diskCacheManager_->AddCache("test");
int ret = diskCacheManager_->TrimRun();
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, TrimCache_noexceed) {
S3ClientAdaptorOption option;
S3ClientAdaptorOption option;
option.diskCacheOpt.maxFileNums = 5;
option.diskCacheOpt.diskCacheType = (DiskCacheType)2;
option.diskCacheOpt.cacheDir = "/mnt/test_unit";
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
option.diskCacheOpt.fullRatio = 0;
option.diskCacheOpt.safeRatio = 0;
Expand All @@ -397,25 +428,26 @@ TEST_F(TestDiskCacheManager, TrimCache_noexceed) {
.WillOnce(Return(0))
.WillOnce(Return(-1))
.WillOnce(Return(0));

EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
int ret = diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, TrimCache_exceed) {
S3ClientAdaptorOption option;
option.diskCacheOpt.maxFileNums = 5;
option.diskCacheOpt.diskCacheType = (DiskCacheType)2;
option.diskCacheOpt.cacheDir = "/mnt/test_unit";
option.diskCacheOpt.cacheDir = "/tmp";
option.diskCacheOpt.trimCheckIntervalSec = 1;
option.diskCacheOpt.fullRatio = 0;
option.diskCacheOpt.safeRatio = 5;
option.diskCacheOpt.maxUsableSpaceBytes = 100;
option.diskCacheOpt.fullRatio = 90;
option.diskCacheOpt.safeRatio = 70;
option.diskCacheOpt.maxUsableSpaceBytes =
std::numeric_limits<uint64_t>::max();
option.diskCacheOpt.cmdTimeoutSec = 5;
option.diskCacheOpt.asyncLoadPeriodMs = 10;
option.diskCacheOpt.maxFileNums =
std::numeric_limits<uint64_t>::max();
diskCacheManager_->Init(client_, option);

std::string buf = "test";
Expand All @@ -430,16 +462,23 @@ TEST_F(TestDiskCacheManager, TrimCache_exceed) {
stat.f_bavail = 0;
EXPECT_CALL(*wrapper, statfs(NotNull(), _))
.WillRepeatedly(DoAll(SetArgPointee<1>(stat), Return(-1)));
EXPECT_CALL(*wrapper, remove(_)).WillRepeatedly(Return(0));

diskCacheManager_->AddCache("test00");
diskCacheManager_->AddCache("test01");
diskCacheManager_->AddCache("test02");
diskCacheManager_->AddCache("test03");
diskCacheManager_->AddCache("test04");
diskCacheManager_->AddCache("test05");
int ret = diskCacheManager_->TrimRun();
EXPECT_CALL(*wrapper, stat(NotNull(), NotNull()))
.Times(2)
.WillOnce(Return(-1))
.WillOnce(Return(0));
EXPECT_CALL(*diskCacheWrite_, UploadAllCacheWriteFile())
.WillOnce(Return(0));
diskCacheManager_->TrimRun();
diskCacheManager_->InitMetrics("test");
sleep(6);
diskCacheManager_->TrimStop();
diskCacheManager_->UmountDiskCache();
}

TEST_F(TestDiskCacheManager, WriteReadDirect) {
Expand Down
Loading

0 comments on commit de5102a

Please sign in to comment.