Skip to content

Commit

Permalink
chunkserver: fix crc issue
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai authored and YunhuiChen committed Jul 13, 2021
1 parent 152f46b commit 6d3c1be
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 29 deletions.
3 changes: 2 additions & 1 deletion src/chunkserver/op_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,8 @@ void ScanChunkRequest::BuildAndSendScanMap(const ChunkRequest &request,
request.sendscanmapretrytimes(),
request.sendscanmapretryintervalus(),
cntl, channel);
LOG(INFO) << "Sending scanmap: " << scanMap->ShortDebugString()
LOG(INFO) << "logid = " << cntl->log_id()
<< "Sending scanmap: " << scanMap->ShortDebugString()
<< " to leader: " << peer_.addr;
stub.FollowScanMap(cntl, scanMapRequest, scanMapResponse, done);
}
Expand Down
57 changes: 38 additions & 19 deletions src/chunkserver/scan_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ void ScanManager::StartScanJob(ScanKey key) {
job->poolId = key.first;
job->id = key.second;
job->type = ScanType::Init;
job->currentChunkId = 0;
job->isFinished = true;
job->dataStore = nodePtr->GetDataStore();
nodePtr->SetScan(true);
Expand Down Expand Up @@ -231,7 +230,6 @@ int ScanManager::ScanJobProcess(const std::shared_ptr<ScanJob> job) {
iter++;
} else {
// split scan chunk request
job->currentChunkId = iter->first;
uint32_t currentOffset = 0;
bool scanChunkMetaPage = true;
while (currentOffset < chunkSize_) {
Expand All @@ -246,10 +244,14 @@ int ScanManager::ScanJobProcess(const std::shared_ptr<ScanJob> job) {
job->task.localMap.Clear();
job->task.followerMap.clear();
job->task.waitingNum = replicaNum;
job->task.chunkId = job->currentChunkId;
job->task.chunkId = iter->first;
job->task.offset = currentOffset;
if (scanChunkMetaPage) {
job->task.len = chunkMetaPageSize_;
} else {
job->task.len = scanSize_;
}
job->taskLock.Unlock();
job->currentOffset = currentOffset;
job->isFinished = false;

// construct scan task
Expand All @@ -258,7 +260,7 @@ int ScanManager::ScanJobProcess(const std::shared_ptr<ScanJob> job) {
request->set_optype(CHUNK_OP_TYPE::CHUNK_OP_SCAN);
request->set_logicpoolid(job->poolId);
request->set_copysetid(job->id);
request->set_chunkid(job->currentChunkId);
request->set_chunkid(iter->first);
request->set_offset(currentOffset);
request->set_sendscanmaptimeoutms(timeoutMs_);
request->set_sendscanmapretrytimes(retry_);
Expand Down Expand Up @@ -300,11 +302,17 @@ void ScanManager::SetLocalScanMap(ScanKey key, ScanMap map) {
<< " copysetId = " << key.second;
return;
}
if (job->currentChunkId != map.chunkid() ||
job->currentOffset != map.offset()) {
job->taskLock.RDLock();
bool matched = job->task.chunkId == map.chunkid() &&
job->task.offset == map.offset() &&
job->task.len == map.len();
job->taskLock.Unlock();

if (!matched) {
LOG(WARNING) << "SetLocalScanMap failed, mismatch scanmap."
<< " job->chunkid = " << job->currentChunkId
<< " job->offset = " << job->currentOffset
<< " scantask.chunkid = " << job->task.chunkId
<< " scantask.offset = " << job->task.offset
<< " scantask.len = " << job->task.len
<< "; scanmap: " << map.ShortDebugString();
return;
}
Expand All @@ -321,8 +329,21 @@ void ScanManager::DealFollowerScanMap(const FollowScanMapRequest &request,
ScanKey key(scanMap.logicalpoolid(), scanMap.copysetid());
auto job = GetJob(key);

if (nullptr != job && job->currentChunkId == scanMap.chunkid() &&
job->currentOffset == scanMap.offset()) {
if (nullptr == job) {
LOG(WARNING) << "DealFollowerScanMap failed, job not found."
<< " logical poolId = " << key.first
<< " copysetId = " << key.second;
response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST);
return;
}

job->taskLock.RDLock();
bool matched = job->task.chunkId == scanMap.chunkid() &&
job->task.offset == scanMap.offset() &&
job->task.len == scanMap.len();
job->taskLock.Unlock();

if (matched) {
job->taskLock.WRLock();
job->task.followerMap.emplace_back(scanMap);
job->task.waitingNum--;
Expand All @@ -331,15 +352,12 @@ void ScanManager::DealFollowerScanMap(const FollowScanMapRequest &request,
GenScanJobs(key);
response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS);
return;
}
if (nullptr == job) {
LOG(WARNING) << "DealFollowerScanMap failed, job not found."
<< " logical poolId = " << key.first
<< " copysetId = " << key.second;
} else {
ReadLockGuard readLockGuard(job->taskLock);
LOG(WARNING) << "DealFollowerScanMap failed, mismatch scanmap."
<< " job->chunkid = " << job->currentChunkId
<< " job->offset = " << job->currentOffset
<< " scantask.chunkid = " << job->task.chunkId
<< " scantask.offset = " << job->task.offset
<< " scantask.len = " << job->task.len
<< "; scanmap: " << scanMap.ShortDebugString();
}
response->set_retcode(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST);
Expand Down Expand Up @@ -381,7 +399,8 @@ void ScanManager::CompareMap(std::shared_ptr<ScanJob> job) {
<< " logicalpoolId = " << job->poolId
<< " copysetId = " << job->id
<< " chunkId = " << job->task.chunkId
<< " offset = " << job->task.offset;
<< " offset = " << job->task.offset
<< " len = " << job->task.len;
}
job->isFinished = true;
}
Expand Down
3 changes: 1 addition & 2 deletions src/chunkserver/scan_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum ScanType {
struct ScanTask {
ChunkID chunkId;
uint64_t offset;
uint64_t len;
uint8_t waitingNum;
ScanMap localMap;
std::vector<ScanMap> followerMap;
Expand All @@ -92,8 +93,6 @@ struct ScanJob {
ScanTask task;
bool isFinished;
RWLock taskLock;
ChunkID currentChunkId;
uint64_t currentOffset;
ChunkMap chunkMap;
std::shared_ptr<CSDataStore> dataStore;
ScanJob() : type(ScanType::Init) {}
Expand Down
31 changes: 24 additions & 7 deletions test/chunkserver/scan_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,14 @@ TEST_F(ScanManagerTest, CompareMapSuccessTest) {

// make scan job
std::shared_ptr<ScanJob> job = std::make_shared<ScanJob>();

job->poolId = 1;
job->id = 10000;
job->type = ScanType::NewMap;
job->chunkMap = chunkMap;
job->currentChunkId = 1;
job->currentOffset = 12582912;
job->task.chunkId = 1;
job->task.offset = 12582912;
job->task.len = 4194304;
ASSERT_EQ(3, job->task.waitingNum);
ASSERT_EQ(0, scanManager_->GetJobNum());
scanManager_->SetJob(key, job);
Expand Down Expand Up @@ -219,8 +221,9 @@ TEST_F(ScanManagerTest, CompareMapFailTest) {
job->id = 10000;
job->type = ScanType::NewMap;
job->chunkMap = chunkMap;
job->currentChunkId = 1;
job->currentOffset = 12582912;
job->task.chunkId = 1;
job->task.offset = 12582912;
job->task.len = 4194304;
ASSERT_EQ(3, job->task.waitingNum);
ASSERT_EQ(0, scanManager_->GetJobNum());
scanManager_->SetJob(key, job);
Expand Down Expand Up @@ -251,6 +254,14 @@ TEST_F(ScanManagerTest, CompareMapFailTest) {
scanMap1->set_crc(100);
scanMap1->set_offset(12582912);
scanMap1->set_len(4194304);
ScanMap *scanMap2 = new ScanMap();
scanMap2->set_logicalpoolid(1);
scanMap2->set_copysetid(10000);
scanMap2->set_chunkid(1);
scanMap2->set_index(1);
scanMap2->set_crc(200);
scanMap2->set_offset(0); // mismatch offset
scanMap2->set_len(4194304);
// set local scanmap
job->type = ScanType::WaitMap;
scanManager_->SetLocalScanMap(key, *localMap);
Expand All @@ -265,6 +276,12 @@ TEST_F(ScanManagerTest, CompareMapFailTest) {
ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST,
response.retcode());

// test uncorrect offset
request.set_allocated_scanmap(scanMap2);
scanManager_->DealFollowerScanMap(request, &response);
ASSERT_EQ(CHUNK_OP_STATUS::CHUNK_OP_STATUS_INVALID_REQUEST,
response.retcode());

// test uncorrect index
request.set_allocated_scanmap(localMap);
scanManager_->DealFollowerScanMap(request, &response);
Expand Down Expand Up @@ -306,8 +323,9 @@ TEST_F(ScanManagerTest, MismatchedCRCTest) {
job->id = 10000;
job->type = ScanType::NewMap;
job->chunkMap = chunkMap;
job->currentChunkId = 1;
job->currentOffset = 12582912;
job->task.chunkId = 1;
job->task.offset = 12582912;
job->task.len = 4194304;
ASSERT_EQ(3, job->task.waitingNum);
ASSERT_EQ(0, scanManager_->GetJobNum());
scanManager_->SetJob(key, job);
Expand Down Expand Up @@ -378,7 +396,6 @@ TEST_F(ScanManagerTest, CancelScanJobTest) {
job->poolId = 1;
job->id = 10000;
job->type = ScanType::NewMap;
job->currentChunkId = 1;
ASSERT_EQ(0, scanManager_->GetJobNum());
scanManager_->SetJob(key, job);
ASSERT_EQ(1, scanManager_->GetJobNum());
Expand Down

0 comments on commit 6d3c1be

Please sign in to comment.