Skip to content

Commit

Permalink
fix lazy clone not return taskid & add py_client.conf
Browse files Browse the repository at this point in the history
Change-Id: If6713ae9d7216ac22b9e797df1e17400a39ee1c5
  • Loading branch information
xu-chaojie committed Apr 14, 2020
1 parent 867fa14 commit b49d5fb
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 3 deletions.
138 changes: 138 additions & 0 deletions conf/py_client.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
################### mds一侧配置信息 ##################
#

# mds的地址信息,对于mds集群,地址以逗号隔开
mds.listen.addr=127.0.0.1:6666

# 初始化阶段向mds注册开关,默认为开
mds.registerToMDS=false

# 与mds通信的rpc超时时间
mds.rpcTimeoutMS=500

# 与mds通信rpc最大的超时时间, 指数退避的超时间不能超过这个值
mds.maxRPCTimeoutMS=2000

# 与mds通信重试总时间
mds.maxRetryMS=8000

# 在当前mds上连续重试次数超过该限制就切换, 这个失败次数包含超时重试次数
mds.maxFailedTimesBeforeChangeMDS=2

# 与MDS一侧保持一个lease时间内多少次续约
mds.refreshTimesPerLease=4

# mds RPC接口每次重试之前需要先睡眠一段时间
mds.rpcRetryIntervalUS=100000

#
################# metacache配置信息 ################
#

# 获取leader的rpc超时时间
metacache.getLeaderTimeOutMS=500

# 获取leader的重试次数
metacache.getLeaderRetry=5

# 获取leader接口每次重试之前需要先睡眠一段时间
metacache.rpcRetryIntervalUS=100000

#
############### 调度层的配置信息 #############
#

# 调度层队列大小,每个文件对应一个队列
# 调度队列的深度会影响client端整体吞吐,这个队列存放的是异步IO任务。。
schedule.queueCapacity=1000000

# 队列的执行线程数量
# 执行线程所要做的事情就是将IO取出,然后发到网络就返回取下一个网络任务。一个任务从
# 队列取出到发送完rpc请求大概在(20us-100us),20us是正常情况下不需要获取leader的时候
# 如果在发送的时候需要获取leader,时间会在100us左右,一个线程的吞吐在10w-50w
# 性能已经满足需求
schedule.threadpoolSize=1

# 为隔离qemu侧线程引入的任务队列,因为qemu一侧只有一个IO线程
# 当qemu一侧调用aio接口的时候直接将调用push到任务队列就返回,
# 这样libcurve不占用qemu的线程,不阻塞其异步调用
isolation.taskQueueCapacity=1000000

# 隔离qemu线程的任务队列线程池大小, 默认值为1个线程
isolation.taskThreadPoolSize=1


#
################ 与chunkserver通信相关配置 #############
#
# 读写接口失败的OP之间重试睡眠
chunkserver.opRetryIntervalUS=100000

# 失败的OP重试次数
chunkserver.opMaxRetry=2500000

# 与chunkserver通信的rpc超时时间
chunkserver.rpcTimeoutMS=1000

# 开启基于appliedindex的读,用于性能优化
chunkserver.enableAppliedIndexRead=1

# 重试请求之间睡眠最长时间
# 因为当网络拥塞的时候或者chunkserver出现过载的时候,需要增加睡眠时间
# 这个时间最大为maxRetrySleepIntervalUs
chunkserver.maxRetrySleepIntervalUS=8000000

# 重试请求的超时rpc时间最大值,超时时间会遵循指数退避策略
# 因为当网络拥塞的时候出现超时,需要增加RPC超时时间
# 这个时间最大为maxTimeoutMS
chunkserver.maxRPCTimeoutMS=8000

# 同一个chunkserver连续超时上限次数
# 如果超过这个值,就会进行健康检查,健康检查失败后,会标记为unstable
chunkserver.maxStableTimeoutTimes=10
# chunkserver上rpc连续超时后,健康检查请求的超时间
chunkserver.checkHealthTimeoutMs=100
# 同一个server上unstable的chunkserver数量超过这个值之后
# 所有的chunkserver都会标记为unstable
chunkserver.serverStableThreshold=3

# 当底层chunkserver压力大时,可能也会触发unstable
# 由于copyset leader may change,会导致请求超时时间设置为默认值,从而导致IO hang
# 真正宕机的情况下,请求重试一定次数后会处理完成
# 如果一直重试,则不是宕机情况,这时候超时时间还是要进入指数退避逻辑
# 当一个请求重试次数超过这个值时,其超时时间一定进入指数退避
chunkserver.minRetryTimesForceTimeoutBackoff=5

# 当一个rpc重试超过次数maxRetryTimesBeforeConsiderSuspend的时候
# 记为悬挂IO,metric会报警
chunkserver.maxRetryTimesBeforeConsiderSuspend=20

#
################# 文件级别配置项 #############
#
# libcurve底层rpc调度允许最大的未返回rpc数量,每个文件的inflight RPC独立
global.fileMaxInFlightRPCNum=64

# 文件IO下发到底层chunkserver最大的分片KB
global.fileIOSplitMaxSizeKB=64

#
################# log相关配置 ###############
#
# log等级 INFO=0/WARNING=1/ERROR=2/FATAL=3
global.logLevel=0
# 设置log的路径
global.logPath=/data/log/curve/
# 单元测试情况下
# logpath=./runlog/

#
############### metric 配置信息 #############
#
global.metricDummyServerStartPort=10000

#
# session map文件,存储打开文件的filename到path的映射
#
global.sessionMapPath=./session_map.json
8 changes: 8 additions & 0 deletions src/snapshotcloneserver/clone/clone_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,18 @@ class CloneClosure : public Closure {
taskId_ = taskId;
}

TaskIdType GetTaskId() {
return taskId_;
}

void SetErrCode(int retCode) {
retCode_ = retCode;
}

int GetErrCode() {
return retCode_;
}

void Run() {
if (done_ != nullptr && bcntl_ != nullptr) {
brpc::ClosureGuard done_guard(done_);
Expand Down
2 changes: 2 additions & 0 deletions src/snapshotcloneserver/clone/clone_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ int CloneServiceManager::CloneFile(const UUID &source,
if (kErrCodeTaskExist == ret) {
// 任务已存在的情况下返回成功,使接口幂等
*taskId = cloneInfo.GetTaskId();
closure->SetTaskId(*taskId);
closure->SetErrCode(kErrCodeSuccess);
return kErrCodeSuccess;
}
Expand Down Expand Up @@ -89,6 +90,7 @@ int CloneServiceManager::RecoverFile(const UUID &source,
if (kErrCodeTaskExist == ret) {
// 任务已存在的情况下返回成功,使接口幂等
*taskId = cloneInfo.GetTaskId();
closure->SetTaskId(*taskId);
closure->SetErrCode(kErrCodeSuccess);
return kErrCodeSuccess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ TEST_F(SnapshotCloneServerTest, TestSnapSameClone1Success) {

TaskCloneInfo info2;
int retCode = GetCloneTaskInfo(testUser1_, uuid2, &info2);
ASSERT_EQ(kErrCodeFileNotExist, retCode);
ASSERT_EQ(0, retCode);
}

TEST_F(SnapshotCloneServerTest, TestSnap2Clone2Success) {
Expand Down
31 changes: 29 additions & 2 deletions test/snapshotcloneserver/test_clone_service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,30 @@ TEST_F(TestCloneServiceManager,
ASSERT_EQ(kErrCodeInternalError, ret);
}

struct FakeCloneClosure : public CloneClosure {
FakeCloneClosure()
: taskId_(""),
errorCode_(-1),
callNum_(0) {}
void Run() {
callNum_++;
taskId_ = GetTaskId();
errorCode_ = GetErrCode();
}
TaskIdType taskId_;
int errorCode_;
int callNum_;
};

TEST_F(TestCloneServiceManager, TestCloneFileSuccessByTaskExist) {
const UUID source = "uuid1";
const std::string user = "user1";
const std::string destination = "file1";
bool lazyFlag = true;

TaskIdType expectUuid = "abc321";
CloneInfo cloneInfo;
cloneInfo.SetTaskId(expectUuid);

EXPECT_CALL(*cloneCore_, CloneOrRecoverPre(
source, user, destination, lazyFlag, CloneTaskType::kClone, _))
Expand All @@ -166,7 +183,7 @@ TEST_F(TestCloneServiceManager, TestCloneFileSuccessByTaskExist) {
Return(kErrCodeTaskExist)));

TaskIdType taskId;
auto closure = std::make_shared<CloneClosure>();
auto closure = std::make_shared<FakeCloneClosure>();
int ret = manager_->CloneFile(
source,
user,
Expand All @@ -175,6 +192,10 @@ TEST_F(TestCloneServiceManager, TestCloneFileSuccessByTaskExist) {
closure,
&taskId);
ASSERT_EQ(kErrCodeSuccess, ret);
ASSERT_EQ(expectUuid, taskId);
ASSERT_EQ(1, closure->callNum_);
ASSERT_EQ(expectUuid, closure->taskId_);
ASSERT_EQ(kErrCodeSuccess, closure->errorCode_);
}

TEST_F(TestCloneServiceManager,
Expand Down Expand Up @@ -317,7 +338,9 @@ TEST_F(TestCloneServiceManager,
const std::string destination = "file1";
bool lazyFlag = true;

TaskIdType expectUuid = "abc321";
CloneInfo cloneInfo;
cloneInfo.SetTaskId(expectUuid);

EXPECT_CALL(*cloneCore_, CloneOrRecoverPre(
source, user, destination, lazyFlag, CloneTaskType::kRecover, _))
Expand All @@ -326,7 +349,7 @@ TEST_F(TestCloneServiceManager,
Return(kErrCodeTaskExist)));

TaskIdType taskId;
auto closure = std::make_shared<CloneClosure>();
auto closure = std::make_shared<FakeCloneClosure>();
int ret = manager_->RecoverFile(
source,
user,
Expand All @@ -335,6 +358,10 @@ TEST_F(TestCloneServiceManager,
closure,
&taskId);
ASSERT_EQ(kErrCodeSuccess, ret);
ASSERT_EQ(expectUuid, taskId);
ASSERT_EQ(1, closure->callNum_);
ASSERT_EQ(expectUuid, closure->taskId_);
ASSERT_EQ(kErrCodeSuccess, closure->errorCode_);
}

TEST_F(TestCloneServiceManager,
Expand Down

0 comments on commit b49d5fb

Please sign in to comment.