Skip to content

Commit

Permalink
curvefs client : fix bug of getleader always fails causes stack overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
xu-chaojie committed Feb 17, 2022
1 parent 731b85c commit 8854d9f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 31 deletions.
22 changes: 18 additions & 4 deletions curvefs/src/client/rpcclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,15 @@ void MetaServerClientImpl::UpdateInodeAsync(const Inode &inode,
MetaServerOpType::UpdateInode, task, inode.fsid(), inode.inodeid());
auto excutor = std::make_shared<UpdateInodeExcutor>(opt_,
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done);
excutor->DoAsyncRPCTask(taskDone);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
}

MetaStatusCode MetaServerClientImpl::GetOrModifyS3ChunkInfo(
Expand Down Expand Up @@ -741,8 +748,15 @@ void MetaServerClientImpl::GetOrModifyS3ChunkInfoAsync(
MetaServerOpType::GetOrModifyS3ChunkInfo, task, fsId, inodeId);
auto excutor = std::make_shared<GetOrModifyS3ChunkInfoExcutor>(opt_,
metaCache_, channelManager_, taskCtx);
TaskExecutorDone *taskDone = new TaskExecutorDone(excutor, done);
excutor->DoAsyncRPCTask(taskDone);
TaskExecutorDone *taskDone = new TaskExecutorDone(
excutor, done);
brpc::ClosureGuard taskDone_guard(taskDone);
int ret = excutor->DoAsyncRPCTask(taskDone);
if (ret < 0) {
taskDone->SetRetCode(ret);
return;
}
taskDone_guard.release();
}

MetaStatusCode MetaServerClientImpl::CreateInode(const InodeParam &param,
Expand Down
57 changes: 31 additions & 26 deletions curvefs/src/client/rpcclient/task_excutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,38 @@ int TaskExecutor::DoRPCTask() {
return retCode;
}

void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
brpc::ClosureGuard done_guard(done);
int TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
task_->rpcTimeoutMs = opt_.rpcTimeoutMS;

int retCode = -1;

if (task_->retryTimes++ > opt_.maxRetry) {
LOG(ERROR) << task_->TaskContextStr()
<< " retry times exceeds the limit";
done->SetRetCode(retCode);
return;
}
do {
if (task_->retryTimes++ > opt_.maxRetry) {
LOG(ERROR) << task_->TaskContextStr()
<< " retry times exceeds the limit";
break;
}

if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
done->SetRetCode(retCode);
return;
}
if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}

auto channel = channelManager_->GetOrCreateChannel(
task_->target.metaServerID, task_->target.endPoint);
if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
done->SetRetCode(retCode);
return;
}
auto channel = channelManager_->GetOrCreateChannel(
task_->target.metaServerID, task_->target.endPoint);
if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
continue;
}
retCode = ExcuteTask(channel.get(), done);
break;
} while (true);

ExcuteTask(channel.get(), done);
done_guard.release();
return;
return retCode;
}

bool TaskExecutor::OnReturn(int retCode) {
Expand Down Expand Up @@ -303,7 +304,11 @@ void TaskExecutorDone::Run() {
needRetry = excutor_->OnReturn(code_);
if (needRetry) {
excutor_->PreProcessBeforeRetry(code_);
excutor_->DoAsyncRPCTask(this);
code_ = excutor_->DoAsyncRPCTask(this);
if (code_ < 0) {
done_->SetMetaStatusCode(ConvertToMetaStatusCode(code_));
return;
}
self_guard.release();
done_guard.release();
} else {
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/client/rpcclient/task_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TaskExecutor {
}

int DoRPCTask();
void DoAsyncRPCTask(TaskExecutorDone *done);
int DoAsyncRPCTask(TaskExecutorDone *done);

bool OnReturn(int retCode);
void PreProcessBeforeRetry(int retCode);
Expand Down
63 changes: 63 additions & 0 deletions curvefs/test/client/rpcclient/metaserver_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,69 @@ TEST_F(MetaServerClientImplTest, test_UpdateInode) {
ASSERT_EQ(MetaStatusCode::RPC_ERROR, status);
}

TEST_F(MetaServerClientImplTest, test_GetOrModifyS3ChunkInfo) {
uint32_t fsId = 1;
uint64_t inodeId = 100;
google::protobuf::Map<
uint64_t, S3ChunkInfoList> s3ChunkInfos;
bool returnS3ChunkInfoMap = true;
google::protobuf::Map<
uint64_t, S3ChunkInfoList> out;
uint64_t applyIndex = 10;

// test1: success
curvefs::metaserver::GetOrModifyS3ChunkInfoResponse response;
response.set_statuscode(curvefs::metaserver::OK);
response.set_appliedindex(applyIndex);
EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _))
.WillOnce(DoAll(
SetArgPointee<2>(response),
Invoke(SetRpcService<
curvefs::metaserver::GetOrModifyS3ChunkInfoRequest,
curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>)));
EXPECT_CALL(*mockMetacache_.get(), GetTarget(_, _, _, _, _))
.WillRepeatedly(DoAll(SetArgPointee<2>(target_),
SetArgPointee<3>(applyIndex), Return(true)));
EXPECT_CALL(*mockMetacache_.get(), UpdateApplyIndex(_, _));

MetaStatusCode status = metaserverCli_.GetOrModifyS3ChunkInfo(
fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out);

ASSERT_EQ(MetaStatusCode::OK, status);

// test2: overload
response.set_statuscode(curvefs::metaserver::OVERLOAD);
EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _))
.WillRepeatedly(DoAll(
SetArgPointee<2>(response),
Invoke(SetRpcService<
curvefs::metaserver::GetOrModifyS3ChunkInfoRequest,
curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>)));
status = metaserverCli_.GetOrModifyS3ChunkInfo(
fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out);
ASSERT_EQ(MetaStatusCode::OVERLOAD, status);

// test3: has no applyIndex
response.set_statuscode(curvefs::metaserver::OK);
response.clear_appliedindex();
EXPECT_CALL(mockMetaServerService_, GetOrModifyS3ChunkInfo(_, _, _, _))
.WillRepeatedly(DoAll(
SetArgPointee<2>(response),
Invoke(SetRpcService<
curvefs::metaserver::GetOrModifyS3ChunkInfoRequest,
curvefs::metaserver::GetOrModifyS3ChunkInfoResponse>)));
status = metaserverCli_.GetOrModifyS3ChunkInfo(
fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out);
ASSERT_EQ(MetaStatusCode::RPC_ERROR, status);

// test4: get target always fail
EXPECT_CALL(*mockMetacache_.get(), GetTarget(_, _, _, _, _))
.WillRepeatedly(Return(false));
status = metaserverCli_.GetOrModifyS3ChunkInfo(
fsId, inodeId, s3ChunkInfos, returnS3ChunkInfoMap, &out);
ASSERT_EQ(MetaStatusCode::RPC_ERROR, status);
}

TEST_F(MetaServerClientImplTest, test_CreateInode) {
// in
InodeParam inode;
Expand Down
6 changes: 6 additions & 0 deletions curvefs/test/client/rpcclient/mock_metaserver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class MockMetaServerService : public curvefs::metaserver::MetaServerService {
const ::curvefs::metaserver::DeleteInodeRequest *request,
::curvefs::metaserver::DeleteInodeResponse *response,
::google::protobuf::Closure *done));

MOCK_METHOD4(GetOrModifyS3ChunkInfo,
void(::google::protobuf::RpcController *controller,
const ::curvefs::metaserver::GetOrModifyS3ChunkInfoRequest *request,
::curvefs::metaserver::GetOrModifyS3ChunkInfoResponse *response,
::google::protobuf::Closure *done));
};
} // namespace rpcclient
} // namespace client
Expand Down

0 comments on commit 8854d9f

Please sign in to comment.