Skip to content

Commit

Permalink
curvefs/metaserver:fix s3compaction stackoverflow
Browse files Browse the repository at this point in the history
  • Loading branch information
h0hmj committed Apr 20, 2022
1 parent bce108b commit 36f7433
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 124 deletions.
175 changes: 67 additions & 108 deletions curvefs/src/metaserver/s3compact_wq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <algorithm>
#include <deque>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
Expand All @@ -33,8 +34,8 @@

#include "absl/cleanup/cleanup.h"
#include "curvefs/src/common/s3util.h"
#include "curvefs/src/metaserver/copyset/meta_operator.h"
#include "curvefs/src/metaserver/copyset/copyset_node_manager.h"
#include "curvefs/src/metaserver/copyset/meta_operator.h"

using curve::common::Configuration;
using curve::common::InitS3AdaptorOptionExceptS3InfoOption;
Expand Down Expand Up @@ -139,112 +140,77 @@ void S3CompactWorkQueueImpl::DeleteObjs(const std::vector<std::string>& objs,

std::list<struct S3CompactWorkQueueImpl::Node>
S3CompactWorkQueueImpl::BuildValidList(const S3ChunkInfoList& s3chunkinfolist,
uint64_t inodeLen) {
std::list<struct S3CompactWorkQueueImpl::Node> validList;
std::function<void(struct Node)> addToValidList;
// always add newer s3chunkinfo to list
// [begin, end]
addToValidList = [&](struct Node newNode) {
// maybe truncated smaller
if (newNode.end >= inodeLen) newNode.end = inodeLen - 1;
for (auto it = validList.begin(); it != validList.end();) {
// merge from list head to tail
// try merge B to existing A
// data from B is newer than A
uint64_t nodeBegin = it->begin;
uint64_t nodeEnd = it->end;
if (newNode.end < nodeBegin) {
// A,B no overlap, B is left A, just add to list
validList.emplace(it, newNode);
return;
} else if (newNode.begin > nodeEnd) {
// A,B no overlap, B is right of A
// skip current node to next node
++it;
} else if (newNode.begin <= nodeBegin && newNode.end >= nodeEnd) {
// B contains A, 1 part or split to 2 part
*it = newNode;
it->end = nodeEnd;
if (newNode.end > nodeEnd) {
auto n = newNode;
n.begin = nodeEnd + 1;
addToValidList(std::move(n));
uint64_t inodeLen, uint64_t index,
uint64_t chunkSize) {
std::list<std::pair<uint64_t, uint64_t>> freeList; // [begin, end]
freeList.emplace_back(chunkSize * index,
std::min(chunkSize * (index + 1) - 1,
inodeLen - 1)); // start with full chunk
std::map<uint64_t, std::pair<uint64_t, uint64_t>>
used; // begin -> pair(end, i)
auto fill = [&](uint64_t i) {
const auto& info = s3chunkinfolist.s3chunks(i);
VLOG(9) << "chunkid: " << info.chunkid() << ", offset:" << info.offset()
<< ", len:" << info.len()
<< ", compaction:" << info.compaction()
<< ", zero: " << info.zero();
const uint64_t begin = info.offset();
const uint64_t end = info.offset() + info.len() - 1;
for (auto it = freeList.begin(); it != freeList.end();) {
auto n = std::next(it);
// overlap means we can take this free
auto b = it->first;
auto e = it->second;
if (begin <= b) {
if (end < b) {
return;
} else if (end >= b && end < e) {
// free [it->begin, it->end] -> [end+1, it->end]
// used [it->begin, end]
*it = std::make_pair(end + 1, e);
used[b] = std::make_pair(end, i);
} else {
// free [it->begin, it->end] -> erase
// used [it->begin, it->end]
freeList.erase(it);
used[b] = std::make_pair(e, i);
}
return;
} else if (newNode.begin >= nodeBegin && newNode.end <= nodeEnd) {
// A contains B, 1 part or split to 2or3 part
// begin == nodeBegin && end == nodeEnd has already
// processed in previous elseif
auto n = *it;
if (newNode.end < nodeEnd) {
it->begin = newNode.end + 1;
auto front = validList.emplace(it, newNode);
if (nodeBegin < newNode.begin) {
n.end = newNode.begin - 1;
validList.emplace(front, n);
}
} else if (begin > b && begin <= e) {
if (end < e) {
// free [it-begin, it->end]
// -> [it->begin, begin-1], [end+1, it->end]
// used [begin, end]
*it = std::make_pair(end + 1, e);
freeList.insert(it, std::make_pair(b, begin - 1));
used[begin] = std::make_pair(end, i);
} else {
*it = newNode;
it->end = nodeEnd;
if (nodeBegin < newNode.begin) {
n.end = newNode.begin - 1;
validList.emplace(it, n);
}
// free [it->begin, it->end] -> [it->begin, begin-1]
// used [begin, it->end]
*it = std::make_pair(b, begin - 1);
used[begin] = std::make_pair(e, i);
}
return;
} else if (newNode.end > nodeEnd && newNode.begin > nodeBegin &&
newNode.begin <= nodeEnd) {
// A,B overlap, B right A split to 3 part
auto n1 = *it;
*it = newNode;
it->end = nodeEnd;
n1.end = newNode.begin - 1;
validList.emplace(it, n1);
auto n2 = newNode;
n2.begin = nodeEnd + 1;
addToValidList(std::move(n2));
return;
} else if (newNode.begin < nodeBegin && newNode.end < nodeEnd &&
newNode.end >= nodeBegin) {
// A,B overlap, B left A, split to 2 part
it->begin = newNode.end + 1;
validList.emplace(it, newNode);
return;
} else {
// begin > it->end
// do nothing
}
it = n;
}
validList.emplace_back(newNode);
};

VLOG(9) << "s3compact: list s3chunkinfo list";
for (auto i = 0; i < s3chunkinfolist.s3chunks_size(); i++) {
auto chunkinfo = s3chunkinfolist.s3chunks(i);
// print s3chunkinfolist
struct Node n {
chunkinfo.offset(), chunkinfo.offset() + chunkinfo.len() - 1,
chunkinfo.chunkid(), chunkinfo.compaction(), chunkinfo.offset(),
chunkinfo.len(), chunkinfo.zero()
};
VLOG(9) << "chunkid: " << chunkinfo.chunkid()
<< ", offset:" << chunkinfo.offset()
<< ", len:" << chunkinfo.len()
<< ", compaction:" << chunkinfo.compaction()
<< ", zero: " << chunkinfo.zero();
addToValidList(std::move(n));
}

// merge same chunkid continuous nodes
for (auto curr = validList.begin();;) {
auto next = std::next(curr);
if (next == validList.end()) break;
if (curr->end == next->begin - 1) {
if (curr->chunkid == next->chunkid &&
curr->compaction == next->compaction) {
next->begin = curr->begin;
validList.erase(curr);
}
}
curr = next;
for (auto i = s3chunkinfolist.s3chunks_size() - 1; i >= 0; i--) {
if (freeList.empty()) break;
fill(i);
}

std::list<struct S3CompactWorkQueueImpl::Node> validList;
for (const auto& v : used) {
const auto& info = s3chunkinfolist.s3chunks(v.second.second);
validList.emplace_back(v.first, v.second.first, info.chunkid(),
info.compaction(), info.offset(), info.len(),
info.zero());
}

return validList;
}

Expand Down Expand Up @@ -556,8 +522,8 @@ void S3CompactWorkQueueImpl::CompactChunk(
VLOG(6) << "s3compact: begin to compact index " << index;
const auto& s3chunkinfolist = inode.s3chunkinfomap().at(index);
// 1.1 build valid list
std::list<struct S3CompactWorkQueueImpl::Node> validList(
BuildValidList(s3chunkinfolist, inode.length()));
std::list<struct S3CompactWorkQueueImpl::Node> validList(BuildValidList(
s3chunkinfolist, inode.length(), index, compactCtx.chunkSize));
VLOG(6) << "s3compact: finish build valid list";
VLOG(9) << "s3compact: show valid list";
for (const auto& node : validList) {
Expand Down Expand Up @@ -672,14 +638,7 @@ void S3CompactWorkQueueImpl::CompactChunks(const struct S3CompactTask& task) {
VLOG(6) << "s3compact: begin to compact fsId:" << fsId
<< ", inodeId:" << inodeId;
for (const auto& index : needCompact) {
// sort s3chunks to make small chunkid -> big chunkid
// bigger chunkid means newer data
auto s3chunkVec =
inode.mutable_s3chunkinfomap()->at(index).mutable_s3chunks();
std::sort(s3chunkVec->begin(), s3chunkVec->end(),
[](const S3ChunkInfo& a, const S3ChunkInfo& b) {
return a.chunkid() < b.chunkid();
});
// s3chunklist order: from small chunkid to big chunkid
CompactChunk(compactCtx, index, inode, &objsAddedMap, &s3ChunkInfoAdd,
&s3ChunkInfoRemove);
}
Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/metaserver/s3compact_wq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ class S3CompactWorkQueueImpl : public TaskThreadPool<> {
void DeleteObjs(const std::vector<std::string>& objsAdded,
S3Adapter* s3adapter);
std::list<struct Node> BuildValidList(
const S3ChunkInfoList& s3chunkinfolist, uint64_t inodeLen);
const S3ChunkInfoList& s3chunkinfolist, uint64_t inodeLen,
uint64_t index, uint64_t chunkSize);
void GenS3ReadRequests(const struct S3CompactCtx& ctx,
const std::list<struct Node>& validList,
std::vector<struct S3Request>* reqs,
Expand Down
31 changes: 16 additions & 15 deletions curvefs/test/metaserver/s3compactwq_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

#include "curvefs/src/metaserver/s3compact_manager.h"
#include "curvefs/src/metaserver/s3compact_wq_impl.h"
#include "curvefs/src/metaserver/storage/rocksdb_storage.h"
#include "curvefs/src/metaserver/storage/storage.h"
#include "curvefs/test/metaserver/copyset/mock/mock_copyset_node_manager.h"
#include "curvefs/test/metaserver/mock_s3_adapter.h"
#include "curvefs/test/metaserver/mock_s3compactwq_impl.h"
#include "curvefs/test/metaserver/mock_s3infocache.h"
#include "curvefs/src/metaserver/storage/storage.h"
#include "curvefs/src/metaserver/storage/rocksdb_storage.h"
#include "curvefs/test/metaserver/storage/utils.h"

using ::curvefs::metaserver::copyset::CopysetNode;
Expand All @@ -47,9 +47,9 @@ using ::testing::SetArgPointee;
using ::testing::StrEq;

using ::curvefs::metaserver::storage::KVStorage;
using ::curvefs::metaserver::storage::StorageOptions;
using ::curvefs::metaserver::storage::RocksDBStorage;
using ::curvefs::metaserver::storage::RandomStoragePath;
using ::curvefs::metaserver::storage::RocksDBStorage;
using ::curvefs::metaserver::storage::StorageOptions;

namespace curvefs {
namespace metaserver {
Expand Down Expand Up @@ -80,8 +80,8 @@ class S3CompactWorkQueueImplTest : public ::testing::Test {
kvStorage_ = std::make_shared<RocksDBStorage>(options);
ASSERT_TRUE(kvStorage_->Open());

inodeStorage_ = std::make_shared<InodeStorage>(
kvStorage_, "partition:1");
inodeStorage_ =
std::make_shared<InodeStorage>(kvStorage_, "partition:1");
trash_ = std::make_shared<TrashImpl>(inodeStorage_);
inodeManager_ = std::make_shared<InodeManager>(inodeStorage_, trash_);
impl_ = std::make_shared<S3CompactWorkQueueImpl>(
Expand Down Expand Up @@ -258,6 +258,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_DeleteObjs) {
TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
std::list<struct S3CompactWorkQueueImpl::Node> validList;
uint64_t inodeLen = 100;
uint64_t chunkSize = 64 * 1024 * 1024;
S3ChunkInfo tmpl;
tmpl.set_compaction(0);
tmpl.set_zero(false);
Expand All @@ -271,7 +272,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c1.set_offset(0);
c1.set_len(1);
*l.add_s3chunks() = c1;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 1);
auto first = validList.begin();
ASSERT_EQ(first->chunkid, 0);
Expand All @@ -282,7 +283,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c1.set_offset(0);
c1.set_len(200); // bigger than inodeLen
*l.add_s3chunks() = c1;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 1);
first = validList.begin();
ASSERT_EQ(first->chunkid, 0);
Expand All @@ -302,7 +303,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c3.set_offset(0);
c3.set_len(2);
*l.add_s3chunks() = c3;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 1);
first = validList.begin();
ASSERT_EQ(first->chunkid, 1);
Expand All @@ -318,7 +319,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c3.set_offset(0);
c3.set_len(2);
*l.add_s3chunks() = c3;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 1);
first = validList.begin();
ASSERT_EQ(first->chunkid, 1);
Expand All @@ -338,7 +339,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c5.set_offset(1);
c5.set_len(1);
*l.add_s3chunks() = c5;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 3);
first = validList.begin();
ASSERT_EQ(first->chunkid, 0);
Expand Down Expand Up @@ -366,7 +367,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c7.set_offset(1);
c7.set_len(1);
*l.add_s3chunks() = c7;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 2);
first = validList.begin();
ASSERT_EQ(first->chunkid, 0);
Expand All @@ -390,7 +391,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c9.set_offset(0);
c9.set_len(1);
*l.add_s3chunks() = c9;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 2);
first = validList.begin();
ASSERT_EQ(first->chunkid, 1);
Expand All @@ -414,7 +415,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c11.set_offset(1);
c11.set_len(2);
*l.add_s3chunks() = c11;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 2);
first = validList.begin();
ASSERT_EQ(first->chunkid, 0);
Expand All @@ -438,7 +439,7 @@ TEST_F(S3CompactWorkQueueImplTest, test_BuildValidList) {
c13.set_offset(0);
c13.set_len(2);
*l.add_s3chunks() = c13;
validList = impl_->BuildValidList(l, inodeLen);
validList = impl_->BuildValidList(l, inodeLen, 0, chunkSize);
ASSERT_EQ(validList.size(), 2);
first = validList.begin();
ASSERT_EQ(first->chunkid, 1);
Expand Down

0 comments on commit 36f7433

Please sign in to comment.