Skip to content

Commit

Permalink
clone lazy allocate
Browse files Browse the repository at this point in the history
Change-Id: I2eb9cb5b3a73062b23b90d387b1b2a52ccffb56b
  • Loading branch information
hzchenwei7 committed Apr 30, 2020
1 parent e72748e commit 83478e4
Show file tree
Hide file tree
Showing 22 changed files with 262 additions and 150 deletions.
1 change: 1 addition & 0 deletions proto/chunk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ enum CHUNK_OP_STATUS {
CHUNK_OP_STATUS_FAILURE_UNKNOWN = 8; // 其他错误
CHUNK_OP_STATUS_OVERLOAD = 9; // 过载,表示服务端有过多请求未处理返回
CHUNK_OP_STATUS_BACKWARD = 10; // 请求的版本落后当前chunk的版本
CHUNK_OP_STATUS_CHUNK_EXIST = 11; // chunk已存在
};

message ChunkResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/datastore/chunkserver_datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ CSErrorCode CSDataStore::CreateCloneChunk(ChunkID id,
if (info.location.compare(location) != 0
|| info.curSn != sn
|| info.correctedSn != correctedSn) {
LOG(ERROR) << "Conflict chunk already exists."
LOG(WARNING) << "Conflict chunk already exists."
<< "sn in arg = " << sn
<< ", correctedSn in arg = " << correctedSn
<< ", location in arg = " << location
Expand Down
21 changes: 21 additions & 0 deletions src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,16 @@ void CreateCloneChunkRequest::OnApply(uint64_t index,
<< " location: " << request_->location();
response_->set_status(
CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN);
} else if (CSErrorCode::ChunkConflictError == ret) {
LOG(WARNING) << "create clone chunk exist: "
<< " logic pool id: " << request_->logicpoolid()
<< " copyset id: " << request_->copysetid()
<< " chunkid: " << request_->chunkid()
<< " sn " << request_->sn()
<< " correctedSn: " << request_->correctedsn()
<< " location: " << request_->location();
response_->set_status(
CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_EXIST);
} else {
LOG(ERROR) << "create clone failed: "
<< " logic pool id: " << request_->logicpoolid()
Expand Down Expand Up @@ -706,6 +716,17 @@ void CreateCloneChunkRequest::OnApplyFromLog(std::shared_ptr<CSDataStore> datast
if (CSErrorCode::Success == ret)
return;

if (CSErrorCode::ChunkConflictError == ret) {
LOG(WARNING) << "create clone chunk exist: "
<< " logic pool id: " << request.logicpoolid()
<< " copyset id: " << request.copysetid()
<< " chunkid: " << request.chunkid()
<< " sn " << request.sn()
<< " correctedSn: " << request.correctedsn()
<< " location: " << request.location();
return;
}

if (CSErrorCode::InternalError == ret ||
CSErrorCode::CrcCheckError == ret ||
CSErrorCode::FileFormatError == ret) {
Expand Down
17 changes: 17 additions & 0 deletions src/client/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ void ClientClosure::Run() {
}
break;

// 2.6 返回chunk exist,直接返回,不用重试
case CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_EXIST:
OnChunkExist();
break;

default:
needRetry = true;
LOG_EVERY_N(ERROR, 10) << OpTypeToString(reqCtx_->optype_)
Expand Down Expand Up @@ -362,6 +367,18 @@ void ClientClosure::OnChunkNotExist() {
fileMetric_, reqCtx_->rawlength_, reqCtx_->optype_);
}

void ClientClosure::OnChunkExist() {
reqDone_->SetFailed(status_);

LOG(WARNING) << OpTypeToString(reqCtx_->optype_)
<< " exists, " << *reqCtx_
<< ", status=" << status_
<< ", retried times = " << reqDone_->GetRetriedTimes()
<< ", IO id = " << reqDone_->GetIOTracker()->GetID()
<< ", request id = " << reqCtx_->id_
<< ", remote side = " << remoteAddress_;
}

void ClientClosure::OnRedirected() {
LOG(WARNING) << OpTypeToString(reqCtx_->optype_) << " redirected, "
<< *reqCtx_
Expand Down
3 changes: 3 additions & 0 deletions src/client/chunk_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class ClientClosure : public Closure {
// 返回chunk不存在 处理函数
virtual void OnChunkNotExist();

// 返回chunk存在 处理函数
void OnChunkExist();

// 非法参数
void OnInvalidRequest();

Expand Down
3 changes: 3 additions & 0 deletions src/client/io_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ void IOTracker::ChunkServerErr2LibcurveErr(CHUNK_OP_STATUS errcode,
case CHUNK_OP_STATUS::CHUNK_OP_STATUS_NOSPACE:
*errout = LIBCURVE_ERROR::NO_SPACE;
break;
case CHUNK_OP_STATUS::CHUNK_OP_STATUS_CHUNK_EXIST:
*errout = LIBCURVE_ERROR::EXISTS;
break;
default:
*errout = LIBCURVE_ERROR::FAILED;
break;
Expand Down
51 changes: 42 additions & 9 deletions src/snapshotcloneserver/clone/clone_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ void CloneCoreImpl::HandleCloneOrRecoverTask(
task->SetProgress(kProgressCreateCloneMeta);
break;
case CloneStep::kCreateCloneChunk:
ret = CreateCloneChunk(task, newFileInfo, segInfos);
ret = CreateCloneChunk(task, newFileInfo, &segInfos);
if (ret < 0) {
HandleCloneError(task, ret);
return;
Expand Down Expand Up @@ -459,6 +459,7 @@ int CloneCoreImpl::BuildFileInfoFromSnapshot(
uint64_t segmentIndex = chunkIndex / chunkPerSegment;
CloneChunkInfo info;
info.location = chunkDataName.ToDataChunkKey();
info.needRecover = true;
if (IsRecover(task)) {
info.seqNum = chunkDataName.chunkSeqNum_;
} else {
Expand Down Expand Up @@ -543,6 +544,7 @@ int CloneCoreImpl::BuildFileInfoFromFile(
CloneChunkInfo info;
info.location = std::to_string(offset + j * chunkSize);
info.seqNum = kInitializeSeqNum;
info.needRecover = true;
segInfo.emplace(j, info);
}
segInfos->emplace(i, segInfo);
Expand All @@ -561,9 +563,10 @@ int CloneCoreImpl::CreateCloneFile(
uint64_t fileLength = fInfo.length;
uint64_t seqNum = fInfo.seqnum;
uint32_t chunkSize = fInfo.chunksize;
std::string source = task->GetCloneInfo().GetSrc();

FInfo fInfoOut;
int ret = client_->CreateCloneFile(fileName,
int ret = client_->CreateCloneFile(source, fileName,
mdsRootUser_, fileLength, seqNum, chunkSize, &fInfoOut);
if (ret == LIBCURVE_ERROR::OK) {
// nothing
Expand Down Expand Up @@ -614,7 +617,15 @@ int CloneCoreImpl::CreateCloneMeta(
if (ret < 0) {
return ret;
}
task->GetCloneInfo().SetNextStep(CloneStep::kCreateCloneChunk);

// 如果是lazy&非快照,先不要createCloneChunk
// 等后面stage2阶段recoveryChunk之前去createCloneChunk
if (IsLazy(task) && IsFile(task)) {
task->GetCloneInfo().SetNextStep(CloneStep::kCompleteCloneMeta);
} else {
task->GetCloneInfo().SetNextStep(CloneStep::kCreateCloneChunk);
}

ret = metaStore_->UpdateCloneInfo(task->GetCloneInfo());
if (ret < 0) {
LOG(ERROR) << "UpdateCloneInfo after CreateCloneMeta error."
Expand All @@ -628,7 +639,7 @@ int CloneCoreImpl::CreateCloneMeta(
int CloneCoreImpl::CreateCloneChunk(
std::shared_ptr<CloneTaskInfo> task,
const FInfo &fInfo,
const CloneSegmentMap &segInfos) {
CloneSegmentMap *segInfos) {
int ret = kErrCodeSuccess;
uint32_t chunkSize = fInfo.chunksize;
uint32_t correctSn = 0;
Expand All @@ -639,7 +650,7 @@ int CloneCoreImpl::CreateCloneChunk(
correctSn = fInfo.seqnum;
}
auto tracker = std::make_shared<CreateCloneChunkTaskTracker>();
for (auto & cloneSegmentInfo : segInfos) {
for (auto & cloneSegmentInfo : *segInfos) {
for (auto & cloneChunkInfo : cloneSegmentInfo.second) {
std::string location;
if (IsSnapshot(task)) {
Expand All @@ -655,6 +666,7 @@ int CloneCoreImpl::CreateCloneChunk(
auto context = std::make_shared<CreateCloneChunkContext>();
context->location = location;
context->cidInfo = cidInfo;
context->cloneChunkInfo = &cloneChunkInfo.second;
context->sn = cloneChunkInfo.second.seqNum;
context->csn = correctSn;
context->chunkSize = chunkSize;
Expand Down Expand Up @@ -694,7 +706,11 @@ int CloneCoreImpl::CreateCloneChunk(
}
} while (true);

task->GetCloneInfo().SetNextStep(CloneStep::kCompleteCloneMeta);
if (IsLazy(task) && IsFile(task)) {
task->GetCloneInfo().SetNextStep(CloneStep::kRecoverChunk);
} else {
task->GetCloneInfo().SetNextStep(CloneStep::kCompleteCloneMeta);
}
ret = metaStore_->UpdateCloneInfo(task->GetCloneInfo());
if (ret < 0) {
LOG(ERROR) << "UpdateCloneInfo after CreateCloneChunk error."
Expand Down Expand Up @@ -748,7 +764,17 @@ int CloneCoreImpl::HandleCreateCloneChunkResultsAndRetry(
const std::list<CreateCloneChunkContextPtr> &results) {
int ret = kErrCodeSuccess;
for (auto context : results) {
if (context->retCode != LIBCURVE_ERROR::OK) {
if (context->retCode == -LIBCURVE_ERROR::EXISTS) {
LOG(INFO) << "CreateCloneChunk chunk exist"
<< ", location = " << context->location
<< ", logicalPoolId = " << context->cidInfo.lpid_
<< ", copysetId = " << context->cidInfo.cpid_
<< ", chunkId = " << context->cidInfo.cid_
<< ", seqNum = " << context->sn
<< ", csn = " << context->csn
<< ", taskid = " << task->GetTaskId();
context->cloneChunkInfo->needRecover = false;
} else if (context->retCode != LIBCURVE_ERROR::OK) {
uint64_t nowTime = TimeUtility::GetTimeofDaySec();
if (nowTime - context->startTime <
context->clientAsyncMethodRetryTimeSec) {
Expand Down Expand Up @@ -830,6 +856,9 @@ int CloneCoreImpl::RecoverChunk(
// 为避免发往同一个chunk碰撞,异步请求不同的chunk
for (auto & cloneSegmentInfo : segInfos) {
for (auto & cloneChunkInfo : cloneSegmentInfo.second) {
if (!cloneChunkInfo.second.needRecover) {
continue;
}
// 当前并发工作的chunk数已大于要求的并发数时,先消化一部分
while (workingChunkNum >= recoverChunkConcurrency_) {
uint64_t completeChunkNum = 0;
Expand Down Expand Up @@ -1053,7 +1082,11 @@ int CloneCoreImpl::RenameCloneFile(
}

if (IsLazy(task)) {
task->GetCloneInfo().SetNextStep(CloneStep::kRecoverChunk);
if (IsFile(task)) {
task->GetCloneInfo().SetNextStep(CloneStep::kCreateCloneChunk);
} else {
task->GetCloneInfo().SetNextStep(CloneStep::kRecoverChunk);
}
} else {
task->GetCloneInfo().SetNextStep(CloneStep::kEnd);
}
Expand Down Expand Up @@ -1126,8 +1159,8 @@ void CloneCoreImpl::HandleLazyCloneStage1Finish(
LOG(INFO) << "Task Lazy Stage1 Success"
<< ", TaskInfo : " << *task;
task->GetClosure()->SetErrCode(ret);
task->GetClosure()->Run();
task->Finish();
task->GetClosure()->Run();
return;
}

Expand Down
4 changes: 3 additions & 1 deletion src/snapshotcloneserver/clone/clone_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ struct CloneChunkInfo {
std::string location;
// 该chunk的版本号
uint64_t seqNum;
// chunk是否需要recover
bool needRecover;
};

// 克隆/恢复所需segment信息,key是ChunkIndex In Segment, value是chunk信息
Expand Down Expand Up @@ -351,7 +353,7 @@ class CloneCoreImpl : public CloneCore {
int CreateCloneChunk(
std::shared_ptr<CloneTaskInfo> task,
const FInfo &fInfo,
const CloneSegmentMap &segInfos);
CloneSegmentMap *segInfos);

/**
* @brief 开始CreateCloneChunk的异步请求
Expand Down
2 changes: 2 additions & 0 deletions src/snapshotcloneserver/clone/clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ struct CreateCloneChunkContext {
uint64_t startTime;
// 异步请求重试总时间
uint64_t clientAsyncMethodRetryTimeSec;
// chunk信息
struct CloneChunkInfo *cloneChunkInfo;
};

using CreateCloneChunkContextPtr = std::shared_ptr<CreateCloneChunkContext>;
Expand Down
5 changes: 3 additions & 2 deletions src/snapshotcloneserver/common/curvefs_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,17 @@ int CurveFsClientImpl::GetChunkInfo(
}

int CurveFsClientImpl::CreateCloneFile(
const std::string &source,
const std::string &filename,
const std::string &user,
uint64_t size,
uint64_t sn,
uint32_t chunkSize,
FInfo* fileInfo) {
UserInfo userInfo = GetUserInfo(user);
RetryMethod method = [this, &filename,
RetryMethod method = [this, &source, &filename,
userInfo, size, sn, chunkSize, fileInfo] () {
return snapClient_->CreateCloneFile(filename,
return snapClient_->CreateCloneFile(source, filename,
userInfo, size,
sn, chunkSize, fileInfo);
};
Expand Down
3 changes: 3 additions & 0 deletions src/snapshotcloneserver/common/curvefs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class CurveFsClient {
* - 若是clone,sn重置为初始值
* - 若是recover,sn不变
*
* @param source clone源文件名
* @param filename clone目标文件名
* @param user 用户信息
* @param size 文件大小
Expand All @@ -208,6 +209,7 @@ class CurveFsClient {
* @return 错误码
*/
virtual int CreateCloneFile(
const std::string &source,
const std::string &filename,
const std::string &user,
uint64_t size,
Expand Down Expand Up @@ -429,6 +431,7 @@ class CurveFsClientImpl : public CurveFsClient {
ChunkInfoDetail *chunkInfo) override;

int CreateCloneFile(
const std::string &source,
const std::string &filename,
const std::string &user,
uint64_t size,
Expand Down
9 changes: 5 additions & 4 deletions src/snapshotcloneserver/common/task_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <string>
#include <memory>
#include <mutex> //NOLINT
#include <atomic>

#include "src/common/concurrent/concurrent.h"

Expand Down Expand Up @@ -53,7 +54,7 @@ class TaskInfo {
* @brief 完成任务
*/
void Finish() {
isFinish_ = true;
isFinish_.store(true);
}

/**
Expand All @@ -63,7 +64,7 @@ class TaskInfo {
* @retval false 任务未完成
*/
bool IsFinish() const {
return isFinish_;
return isFinish_.load();
}

/**
Expand All @@ -87,7 +88,7 @@ class TaskInfo {
* @brief 重置任务
*/
void Reset() {
isFinish_ = false;
isFinish_.store(false);
isCanceled_ = false;
}

Expand All @@ -110,7 +111,7 @@ class TaskInfo {
// 任务完成度百分比
uint32_t progress_;
// 任务任务是否结束
bool isFinish_;
std::atomic_bool isFinish_;
// 任务是否被取消
bool isCanceled_;
mutable curve::common::Mutex lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ int FakeCurveFsClient::GetChunkInfo(const ChunkIDInfo &cidinfo,
}

int FakeCurveFsClient::CreateCloneFile(
const std::string &source,
const std::string &filename,
const std::string &user,
uint64_t size,
Expand Down
1 change: 1 addition & 0 deletions test/integration/snapshotcloneserver/fake_curvefs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class FakeCurveFsClient : public CurveFsClient {
ChunkInfoDetail *chunkInfo) override;

int CreateCloneFile(
const std::string &source,
const std::string &filename,
const std::string &user,
uint64_t size,
Expand Down
Loading

0 comments on commit 83478e4

Please sign in to comment.