Skip to content

Commit

Permalink
curvefs/client: fix cto bugs
Browse files Browse the repository at this point in the history
Signed-off-by: ilixiaocui <[email protected]>

related to issue: 1213/1582/1413
1. All mount points on the same file system are either enableCto, or they do not guarantee consistency.
2. If enableCto, attr and dentry cache will auto set invalid.
3. Consistency of multi-mount nodes under delete operation is not guaranteed. If one mount point deletes a file,
the behavior of the other mount point for all operations on that file is unknown.
  • Loading branch information
ilixiaocui committed Aug 4, 2022
1 parent 3084c8e commit f09fdf9
Show file tree
Hide file tree
Showing 23 changed files with 394 additions and 269 deletions.
2 changes: 2 additions & 0 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum FSStatusCode {
LOCK_TIMEOUT = 29;
COMMIT_TX_SEQUENCE_MISMATCH = 30;
VOLUME_INFO_ERROR = 31;
MOUNT_POINT_CONFLICT = 32;
}

// fs interface
Expand All @@ -77,6 +78,7 @@ message Mountpoint {
required string hostname = 1;
required uint32 port = 2;
required string path = 3;
optional bool cto = 4;
}

message FsInfo {
Expand Down
8 changes: 7 additions & 1 deletion curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,25 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
&clientOption->enableICacheMetrics);
conf->GetValueFatalIfFail("fuseClient.enableDCacheMetrics",
&clientOption->enableDCacheMetrics);
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);
conf->GetValueFatalIfFail("client.dummyserver.startport",
&clientOption->dummyServerStartPort);
conf->GetValueFatalIfFail("fuseClient.enableMultiMountPointRename",
&clientOption->enableMultiMountPointRename);
conf->GetValueFatalIfFail("fuseClient.disableXattr",
&clientOption->disableXattr);
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);

LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
&clientOption->enableFuseSplice))
<< "Not found `fuseClient.enableSplice` in conf, use default value `"
<< std::boolalpha << clientOption->enableFuseSplice << '`';

// if enableCto, attr and entry cache must invalid
if (FLAGS_enableCto) {
clientOption->attrTimeOut = 0;
clientOption->entryTimeOut = 0;
}

SetBrpcOpt(conf);
}

Expand Down
52 changes: 14 additions & 38 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,15 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
: mOpts->mountPoint);
std::string fsName = (mOpts->fsName == nullptr) ? "" : mOpts->fsName;

mountpoint_.set_cto(FLAGS_enableCto);

int retVal = SetHostPortInMountPoint(&mountpoint_);
if (retVal < 0) {
LOG(ERROR) << "Set Host and Port in MountPoint failed, ret = "
<< retVal;
return CURVEFS_ERROR::INTERNAL;
}

auto find = std::find_if(fsInfo_->mountpoints().begin(),
fsInfo_->mountpoints().end(),
[this](const Mountpoint& mp) {
return mp.path() == mountpoint_.path() &&
mp.hostname() == mountpoint_.hostname();
});
if (find != fsInfo_->mountpoints().end()) {
LOG(ERROR) << "MountFs found mountPoint exist";
return CURVEFS_ERROR::MOUNT_POINT_EXIST;
}

auto ret = mdsClient_->MountFs(fsName, mountpoint_, fsInfo_.get());
if (ret != FSStatusCode::OK && ret != FSStatusCode::MOUNT_POINT_EXIST) {
LOG(ERROR) << "MountFs failed, FSStatusCode = " << ret
Expand Down Expand Up @@ -329,12 +320,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
}

::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();

ret = inodeWrapper->Open();
if (ret != CURVEFS_ERROR::OK) {
return ret;
}

if (fi->flags & O_TRUNC) {
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
Expand Down Expand Up @@ -777,8 +762,18 @@ CURVEFS_ERROR FuseClient::FuseOpGetAttr(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi,
struct stat *attr) {
VLOG(1) << "FuseOpGetAttr ino = " << ino;
if (FLAGS_enableCto) {
CURVEFS_ERROR ret = inodeManager_->RefreshInode(ino);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = " << ret
<< ", inodeid = " << ino;
return ret;
}
}

InodeAttr inodeAttr;
CURVEFS_ERROR ret = inodeManager_->GetInodeAttr(ino, &inodeAttr);
CURVEFS_ERROR ret =
inodeManager_->GetInodeAttr(ino, &inodeAttr);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inodeAttr fail, ret = " << ret
<< ", inodeid = " << ino;
Expand Down Expand Up @@ -1175,26 +1170,7 @@ CURVEFS_ERROR FuseClient::FuseOpReadLink(fuse_req_t req, fuse_ino_t ino,
CURVEFS_ERROR FuseClient::FuseOpRelease(fuse_req_t req, fuse_ino_t ino,
struct fuse_file_info *fi) {
VLOG(1) << "FuseOpRelease, ino: " << ino;
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;
std::shared_ptr<InodeWrapper> inodeWrapper;
ret = inodeManager_->GetInode(ino, inodeWrapper);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager get inode fail, ret = " << ret
<< ", ino: " << ino;
return ret;
}

::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();

ret = inodeWrapper->Release();
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "inodeManager release inode fail, ret = " << ret
<< ", ino: " << ino;
return ret;
}

VLOG(1) << "FuseOpRelease, ino: " << ino << " success";
return ret;
return CURVEFS_ERROR::OK;
}

void FuseClient::FlushInode() { inodeManager_->FlushInodeOnce(); }
Expand Down
178 changes: 111 additions & 67 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,107 +107,120 @@ class TrimICacheAsyncDone : public MetaServerClientDone {
std::shared_ptr<InodeCacheManagerImpl> inodeCacheManager_;
};

CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) {
#define GET_INODE_REMOTE(FSID, INODEID, OUT, STREAMING) \
MetaStatusCode ret = metaClient_->GetInode(FSID, INODEID, OUT, STREAMING); \
if (ret != MetaStatusCode::OK) { \
LOG_IF(ERROR, ret != MetaStatusCode::NOT_FOUND) \
<< "metaClient_ GetInode failed, MetaStatusCode = " << ret \
<< ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret) \
<< ", inodeid = " << INODEID; \
return MetaStatusCodeToCurvefsErrCode(ret); \
}

#define PUT_INODE_CACHE(INODEID, INODEWRAPPER) \
std::shared_ptr<InodeWrapper> eliminatedOne; \
bool eliminated = iCache_->Put(INODEID, INODEWRAPPER, &eliminatedOne); \
if (eliminated) { \
VLOG(3) << "GetInode eliminate one inode, ino: " \
<< eliminatedOne->GetInodeId() \
<< ", iCache does not evict inodes via put interface "; \
assert(0); \
}

#define REFRESH_DATA_REMOTE(OUT, STREAMING) \
CURVEFS_ERROR rc = RefreshData(OUT, STREAMING); \
if (rc != CURVEFS_ERROR::OK) { \
return rc; \
}

CURVEFS_ERROR
InodeCacheManagerImpl::GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));
// get inode from cache
bool ok = iCache_->Get(inodeId, &out);
if (ok) {
// if enableCto, we need and is unopen, we need reload from metaserver
if (curvefs::client::common::FLAGS_enableCto && !out->IsOpen()) {
VLOG(6) << "InodeCacheManagerImpl, GetInode: enableCto and inode: "
<< inodeId << " opencount is 0";
iCache_->Remove(inodeId);
} else {
curve::common::UniqueLock lgGuard = out->GetUniqueLock();
if (out->NeedRefreshData()) {
CURVEFS_ERROR rc = out->RefreshS3ChunkInfo();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshS3ChunkInfo failed, retCode = " << rc;
return rc;
}
}
return CURVEFS_ERROR::OK;
}
curve::common::UniqueLock lgGuard = out->GetUniqueLock();
REFRESH_DATA_REMOTE(out, out->NeedRefreshData());
return CURVEFS_ERROR::OK;
}

// get inode from metaserver
Inode inode;
bool streaming;
bool streaming = false;
GET_INODE_REMOTE(fsId_, inodeId, &inode, &streaming);
out = std::make_shared<InodeWrapper>(
std::move(inode), metaClient_, s3ChunkInfoMetric_, option_.maxDataSize,
option_.refreshDataIntervalSec);

MetaStatusCode ret2 = metaClient_->GetInode(
fsId_, inodeId, &inode, &streaming);
// refresh data
REFRESH_DATA_REMOTE(out, streaming);

if (ret2 != MetaStatusCode::OK) {
LOG_IF(ERROR, ret2 != MetaStatusCode::NOT_FOUND)
<< "metaClient_ GetInode failed, MetaStatusCode = " << ret2
<< ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret2)
<< ", inodeid = " << inodeId;
return MetaStatusCodeToCurvefsErrCode(ret2);
}
// put to cache
PUT_INODE_CACHE(inodeId, out);

auto type = inode.type();
out = std::make_shared<InodeWrapper>(std::move(inode), metaClient_,
s3ChunkInfoMetric_, option_.maxDataSize,
option_.refreshDataIntervalSec);
return CURVEFS_ERROR::OK;
}

// NOTE: if the s3chunkinfo inside inode is too large,
// we should invoke RefreshS3ChunkInfo() to receive s3chunkinfo
// by streaming and padding its into inode.
if (type == FsFileType::TYPE_S3 && streaming) {
CURVEFS_ERROR rc = out->RefreshS3ChunkInfo();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc;
return rc;
}
} else if (type == FsFileType::TYPE_FILE) {
auto rc = out->RefreshVolumeExtent();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshVolumeExtent failed, error: " << rc;
return rc;
}
CURVEFS_ERROR
InodeCacheManagerImpl::RefreshInode(uint64_t inodeId) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));

// get inode from metaserver
Inode inode;
bool streaming = false;
GET_INODE_REMOTE(fsId_, inodeId, &inode, &streaming);

// get inode from cache
std::shared_ptr<InodeWrapper> out;
bool ok = iCache_->Get(inodeId, &out);
curve::common::UniqueLock lgGuard;
if (!ok) {
out = std::make_shared<InodeWrapper>(
std::move(inode), metaClient_, s3ChunkInfoMetric_,
option_.maxDataSize, option_.refreshDataIntervalSec);
} else {
lgGuard = out->GetUniqueLock();
}

std::shared_ptr<InodeWrapper> eliminatedOne;
bool eliminated = iCache_->Put(inodeId, out, &eliminatedOne);
if (eliminated) {
VLOG(3) << "GetInode eliminate one inode, ino: "
<< eliminatedOne->GetInodeId();
/* iCache does not evict inodes via put interface */
assert(0);
// refresh data
REFRESH_DATA_REMOTE(out, streaming);

// put to cache or refresh length
if (!ok) {
PUT_INODE_CACHE(inodeId, out);
} else {
out->SetLength(inode.length());
}

return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR InodeCacheManagerImpl::GetInodeAttr(uint64_t inodeId,
InodeAttr *out) {
InodeAttr *out) {
NameLockGuard lock(nameLock_, std::to_string(inodeId));
// 1. find in icache
std::shared_ptr<InodeWrapper> inodeWrapper;
bool ok = iCache_->Get(inodeId, &inodeWrapper);
if (ok) {
if (curvefs::client::common::FLAGS_enableCto &&
!inodeWrapper->IsOpen()) {
iCache_->Remove(inodeId);
} else {
inodeWrapper->GetInodeAttrLocked(out);
return CURVEFS_ERROR::OK;
}
inodeWrapper->GetInodeAttrLocked(out);
return CURVEFS_ERROR::OK;
}

// 2. get form metaserver
std::set<uint64_t> inodeIds;
std::list<InodeAttr> attrs;
inodeIds.emplace(inodeId);
MetaStatusCode ret = metaClient_->BatchGetInodeAttr(
fsId_, inodeIds, &attrs);
MetaStatusCode ret =
metaClient_->BatchGetInodeAttr(fsId_, inodeIds, &attrs);
if (MetaStatusCode::OK != ret) {
LOG(ERROR) << "metaClient BatchGetInodeAttr failed"
<< ", inodeId = " << inodeId
<< ", MetaStatusCode = " << ret
<< ", inodeId = " << inodeId << ", MetaStatusCode = " << ret
<< ", MetaStatusCode_Name = " << MetaStatusCode_Name(ret);
return MetaStatusCodeToCurvefsErrCode(ret);
}

if (attrs.size() != 1) {
if (attrs.size() != 1) {
LOG(ERROR) << "metaClient BatchGetInodeAttr error,"
<< " getSize is 1, inodeId = " << inodeId
<< "but real size = " << attrs.size();
Expand Down Expand Up @@ -468,5 +481,36 @@ void InodeCacheManagerImpl::TrimIcache(uint64_t trimSize) {
}
}

CURVEFS_ERROR
InodeCacheManagerImpl::RefreshData(std::shared_ptr<InodeWrapper> &inode,
bool streaming) {
auto type = inode->GetType();
CURVEFS_ERROR rc = CURVEFS_ERROR::OK;

switch (type) {
case FsFileType::TYPE_S3:
if (streaming) {
// NOTE: if the s3chunkinfo inside inode is too large,
// we should invoke RefreshS3ChunkInfo() to receive s3chunkinfo
// by streaming and padding its into inode.
rc = inode->RefreshS3ChunkInfo();
LOG_IF(ERROR, rc != CURVEFS_ERROR::OK)
<< "RefreshS3ChunkInfo() failed, retCode = " << rc;
}
break;

case FsFileType::TYPE_FILE:
rc = inode->RefreshVolumeExtent();
LOG_IF(ERROR, rc != CURVEFS_ERROR::OK)
<< "RefreshVolumeExtent failed, error: " << rc;
break;

default:
rc = CURVEFS_ERROR::OK;
}

return rc;
}

} // namespace client
} // namespace curvefs
13 changes: 10 additions & 3 deletions curvefs/src/client/inode_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ class InodeCacheManager {

virtual void Stop() = 0;

virtual CURVEFS_ERROR GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) = 0; // NOLINT
virtual CURVEFS_ERROR
GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) = 0; // NOLINT

virtual CURVEFS_ERROR RefreshInode(uint64_t inodeId) = 0;

virtual CURVEFS_ERROR GetInodeAttr(uint64_t inodeId, InodeAttr *out) = 0;

Expand Down Expand Up @@ -220,7 +223,9 @@ class InodeCacheManagerImpl : public InodeCacheManager,
}

CURVEFS_ERROR GetInode(uint64_t inodeId,
std::shared_ptr<InodeWrapper> &out) override;
std::shared_ptr<InodeWrapper> &out) override;

CURVEFS_ERROR RefreshInode(uint64_t inodeId) override;

CURVEFS_ERROR GetInodeAttr(uint64_t inodeId, InodeAttr *out) override;

Expand Down Expand Up @@ -258,6 +263,8 @@ class InodeCacheManagerImpl : public InodeCacheManager,
private:
virtual void FlushInodeBackground();
void TrimIcache(uint64_t trimSize);
CURVEFS_ERROR RefreshData(std::shared_ptr<InodeWrapper> &inode, // NOLINT
bool streaming = true);

private:
std::shared_ptr<MetaServerClient> metaClient_;
Expand Down
Loading

0 comments on commit f09fdf9

Please sign in to comment.