Skip to content

Commit

Permalink
make io smooth
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhongsong authored and ilixiaocui committed Nov 24, 2021
1 parent f6fb441 commit 9271fe4
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 10 deletions.
4 changes: 3 additions & 1 deletion curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ volume.fsBlockSize=4096
#### s3
s3.blocksize=1048576
s3.chunksize=4194304
# the max size that fuse send
s3.fuseMaxSize=131072
# prefetch blocks that disk cache use
s3.prefetchBlocks=4
# prefetch threads
Expand All @@ -92,7 +94,7 @@ s3.baseSleepUs=500

# TODO(huyao): use more meaningfull name
# background thread schedule time
s3.intervalSec=3
s3.intervalSec=0
# data cache flush wait time
s3.flushIntervalSec=5
s3.writeCacheMaxByte=838860800
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) {
&s3Opt->s3ClientAdaptorOpt.blockSize);
conf->GetValueFatalIfFail("s3.chunksize",
&s3Opt->s3ClientAdaptorOpt.chunkSize);
conf->GetValueFatalIfFail("s3.fuseMaxSize",
&s3Opt->s3ClientAdaptorOpt.fuseMaxSize);
conf->GetValueFatalIfFail("s3.prefetchBlocks",
&s3Opt->s3ClientAdaptorOpt.prefetchBlocks);
conf->GetValueFatalIfFail("s3.prefetchExecQueueNum",
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct DiskCacheOption {
struct S3ClientAdaptorOption {
uint64_t blockSize;
uint64_t chunkSize;
uint32_t fuseMaxSize;
uint32_t prefetchBlocks;
uint32_t prefetchExecQueueNum;
uint32_t intervalSec;
Expand Down
27 changes: 20 additions & 7 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ CURVEFS_ERROR
S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
std::shared_ptr<InodeCacheManager> inodeManager,
std::shared_ptr<MdsClient> mdsClient) {
pendingReq_ = 0;
blockSize_ = option.blockSize;
chunkSize_ = option.chunkSize;
if (chunkSize_ % blockSize_ != 0) {
Expand All @@ -43,6 +44,7 @@ S3ClientAdaptorImpl::Init(const S3ClientAdaptorOption &option, S3Client *client,
<< blockSize_;
return CURVEFS_ERROR::INVALIDPARAM;
}
fuseMaxSize_ = option.fuseMaxSize;
prefetchBlocks_ = option.prefetchBlocks;
prefetchExecQueueNum_ = option.prefetchExecQueueNum;
diskCacheType_ = option.diskCacheOpt.diskCacheType;
Expand Down Expand Up @@ -105,9 +107,17 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,

FileCacheManagerPtr fileCacheManager =
fsCacheManager_->FindOrCreateFileCacheManager(fsId_, inodeId);
if (fsCacheManager_->WriteCacheIsFull()) {
LOG(INFO) << "write cache is full, wait flush";
fsCacheManager_->WaitFlush();
{
std::lock_guard<std::mutex> lockguard(ioMtx_);
pendingReq_.fetch_add(1, std::memory_order_seq_cst);
VLOG(6) << "pendingReq_ is: " << pendingReq_;
uint64_t pendingReq = pendingReq_.load(std::memory_order_seq_cst);
fsCacheManager_->DataCacheByteInc(length);
if ((fsCacheManager_->GetDataCacheSize() + pendingReq*fuseMaxSize_)
>= fsCacheManager_->GetDataCacheMaxSize()) {
LOG(INFO) << "write cache is full, wait flush";
fsCacheManager_->WaitFlush();
}
}
uint64_t memCacheRatio = fsCacheManager_->MemCacheRatio();
int64_t exceedRatio = memCacheRatio - memCacheNearfullRatio_;
Expand All @@ -123,7 +133,10 @@ int S3ClientAdaptorImpl::Write(uint64_t inodeId, uint64_t offset,
}
}
int ret = fileCacheManager->Write(offset, length, buf);
VLOG(6) << "write end inodeId:" << inodeId << ",ret:" << ret;
pendingReq_.fetch_sub(1, std::memory_order_seq_cst);
fsCacheManager_->DataCacheByteDec(length);
VLOG(6) << "write end inodeId:" << inodeId << ",ret:" << ret
<< ", pendingReq_ is: " << pendingReq_;
return ret;
}

Expand Down Expand Up @@ -240,18 +253,18 @@ void S3ClientAdaptorImpl::BackGroundFlush() {
}
}
if (fsCacheManager_->MemCacheRatio() > memCacheNearfullRatio_) {
LOG(INFO) << "BackGroundFlush radically, write cache num is: "
VLOG(3) << "BackGroundFlush radically, write cache num is: "
<< fsCacheManager_->GetDataCacheNum()
<< "cache ratio is: " << fsCacheManager_->MemCacheRatio();
fsCacheManager_->FsSync(true);

} else {
waitIntervalSec_.WaitForNextExcution();
VLOG(3) << "BackGroundFlush, write cache num is:"
VLOG(6) << "BackGroundFlush, write cache num is:"
<< fsCacheManager_->GetDataCacheNum()
<< "cache ratio is: " << fsCacheManager_->MemCacheRatio();
fsCacheManager_->FsSync(false);
VLOG(3) << "background fssync end";
VLOG(6) << "background fssync end";
}
}
return;
Expand Down
3 changes: 3 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
S3Client *client_;
uint64_t blockSize_;
uint64_t chunkSize_;
uint32_t fuseMaxSize_;
uint32_t prefetchBlocks_;
uint32_t prefetchExecQueueNum_;
std::string allocateServerEps_;
Expand All @@ -189,12 +190,14 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
Thread bgFlushThread_;
std::atomic<bool> toStop_;
std::mutex mtx_;
std::mutex ioMtx_;
std::condition_variable cond_;
curve::common::WaitInterval waitIntervalSec_;
std::shared_ptr<FsCacheManager> fsCacheManager_;
std::shared_ptr<InodeCacheManager> inodeManager_;
std::shared_ptr<DiskCacheManagerImpl> diskCacheManagerImpl_;
DiskCacheType diskCacheType_;
std::atomic<uint64_t> pendingReq_;
std::shared_ptr<MdsClient> mdsClient_;
uint32_t fsId_;
std::vector<bthread::ExecutionQueueId<AsyncDownloadTask>>
Expand Down
8 changes: 8 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ class FsCacheManager {
return wDataCacheNum_.load(std::memory_order_relaxed);
}

uint64_t GetDataCacheSize() {
return wDataCacheByte_.load(std::memory_order_relaxed);
}

uint64_t GetDataCacheMaxSize() {
return writeCacheMaxByte_;
}

void DataCacheNumInc() {
VLOG(9) << "DataCacheNumInc() v: 1,wDataCacheNum:"
<< wDataCacheNum_.load(std::memory_order_relaxed);
Expand Down
6 changes: 4 additions & 2 deletions curvefs/test/client/client_s3_adaptor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "curvefs/test/client/mock_mds_client.h"
#include "curvefs/test/client/mock_metaserver_service.h"
#include "curvefs/test/client/mock_spacealloc_service.h"
#include "src/common/curve_define.h"

namespace curvefs {
namespace client {
Expand All @@ -41,6 +42,7 @@ using ::testing::Return;
using ::testing::SetArgPointee;
using ::testing::SetArgReferee;
using ::testing::WithArg;
using ::curve::common::kMB;

using rpcclient::MockMdsClient;

Expand Down Expand Up @@ -101,7 +103,7 @@ class ClientS3AdaptorTest : public testing::Test {
option.intervalSec = 5000;
option.flushIntervalSec = 5000;
option.readCacheMaxByte = 104857600;
option.writeCacheMaxByte = 104857600;
option.writeCacheMaxByte = 10485760000;
option.diskCacheOpt.diskCacheType = (DiskCacheType)0;
// auto metaClient = std::make_shared<MetaServerClientImpl>();
std::shared_ptr<MockInodeCacheManager> mockInodeManager(
Expand Down Expand Up @@ -176,7 +178,7 @@ TEST_F(ClientS3AdaptorTest, test_first_write_2) {
S3ClientAdaptorImpl* s3ClientAdaptor2;
S3ClientAdaptorOption option2;
option2.nearfullRatio = 100;
option2.writeCacheMaxByte = 104857600;
option2.writeCacheMaxByte = 8000*kMB;
option2.blockSize = 1 * 1024 * 1024;
option2.chunkSize = 4 * 1024 * 1024;
s3ClientAdaptor2 = new S3ClientAdaptorImpl();
Expand Down

0 comments on commit 9271fe4

Please sign in to comment.