Skip to content

Commit

Permalink
cls/fifo: Remove all use of part tags
Browse files Browse the repository at this point in the history
Part tags make part creation and setting the head non-idempotent,
leading to issues where racing RGWs may get confused about the correct
tag for a part. (Or worse, potentially have the metadata header hold
different value for a part than the part's header.)

Consistently only requires that all nodes agree on the number.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <[email protected]>
  • Loading branch information
adamemerson committed Jan 9, 2023
1 parent dae8e78 commit f89a6d1
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 3,566 deletions.
35 changes: 1 addition & 34 deletions src/cls/fifo/cls_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ int read_part_header(cls_method_context_t hctx,
std::ostringstream ss;
ss << part_header->max_time;
CLS_LOG(5, "%s:%d read part_header:\n"
"\ttag=%s\n"
"\tmagic=0x%" PRIx64 "\n"
"\tmin_ofs=%" PRId64 "\n"
"\tlast_ofs=%" PRId64 "\n"
Expand All @@ -126,7 +125,6 @@ int read_part_header(cls_method_context_t hctx,
"\tmax_index=%" PRId64 "\n"
"\tmax_time=%s\n",
__PRETTY_FUNCTION__, __LINE__,
part_header->tag.c_str(),
part_header->magic,
part_header->min_ofs,
part_header->last_ofs,
Expand Down Expand Up @@ -406,11 +404,6 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in,

std::uint64_t size;

if (op.tag.empty()) {
CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
return -EINVAL;
}

int r = cls_cxx_stat2(hctx, &size, nullptr);
if (r < 0 && r != -ENOENT) {
CLS_ERR("ERROR: %s: cls_cxx_stat2() on obj returned %d", __PRETTY_FUNCTION__, r);
Expand All @@ -424,8 +417,7 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in,
return r;
}

if (!(part_header.tag == op.tag &&
part_header.params == op.params)) {
if (!(part_header.params == op.params)) {
CLS_ERR("%s: failed to re-create existing part with different "
"params", __PRETTY_FUNCTION__);
return -EEXIST;
Expand All @@ -436,7 +428,6 @@ int init_part(cls_method_context_t hctx, ceph::buffer::list* in,

part_header part_header;

part_header.tag = op.tag;
part_header.params = op.params;

part_header.min_ofs = CLS_FIFO_MAX_PART_HEADER_SIZE;
Expand Down Expand Up @@ -475,23 +466,13 @@ int push_part(cls_method_context_t hctx, ceph::buffer::list* in,
return -EINVAL;
}

if (op.tag.empty()) {
CLS_ERR("%s: tag required", __PRETTY_FUNCTION__);
return -EINVAL;
}

part_header part_header;
int r = read_part_header(hctx, &part_header);
if (r < 0) {
CLS_ERR("%s: failed to read part header", __PRETTY_FUNCTION__);
return r;
}

if (!(part_header.tag == op.tag)) {
CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
return -EINVAL;
}

std::uint64_t effective_len = op.total_len + op.data_bufs.size() *
part_entry_overhead;

Expand Down Expand Up @@ -782,12 +763,6 @@ int trim_part(cls_method_context_t hctx,
return r;
}

if (op.tag &&
!(part_header.tag == *op.tag)) {
CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
return -EINVAL;
}

if (op.ofs < part_header.min_ofs) {
return 0;
}
Expand Down Expand Up @@ -866,12 +841,6 @@ int list_part(cls_method_context_t hctx, ceph::buffer::list* in,
return r;
}

if (op.tag &&
!(part_header.tag == *op.tag)) {
CLS_ERR("%s: bad tag", __PRETTY_FUNCTION__);
return -EINVAL;
}

EntryReader reader(hctx, part_header, op.ofs);

if (op.ofs >= part_header.min_ofs &&
Expand All @@ -885,8 +854,6 @@ int list_part(cls_method_context_t hctx, ceph::buffer::list* in,

op::list_part_reply reply;

reply.tag = part_header.tag;

auto max_entries = std::min(op.max_entries, op::MAX_LIST_ENTRIES);

for (int i = 0; i < max_entries && !reader.end(); ++i) {
Expand Down
15 changes: 10 additions & 5 deletions src/cls/fifo/cls_fifo_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,18 @@ WRITE_CLASS_ENCODER(update_meta)

struct init_part
{
std::string tag;
data_params params;

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::string tag;
encode(tag, bl);
encode(params, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::string tag;
decode(tag, bl);
decode(params, bl);
DECODE_FINISH(bl);
Expand All @@ -169,19 +170,20 @@ WRITE_CLASS_ENCODER(init_part)

struct push_part
{
std::string tag;
std::deque<ceph::buffer::list> data_bufs;
std::uint64_t total_len{0};

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::string tag;
encode(tag, bl);
encode(data_bufs, bl);
encode(total_len, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::string tag;
decode(tag, bl);
decode(data_bufs, bl);
decode(total_len, bl);
Expand All @@ -192,19 +194,20 @@ WRITE_CLASS_ENCODER(push_part)

struct trim_part
{
std::optional<std::string> tag;
std::uint64_t ofs{0};
bool exclusive = false;

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::optional<std::string> tag;
encode(tag, bl);
encode(ofs, bl);
encode(exclusive, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::optional<std::string> tag;
decode(tag, bl);
decode(ofs, bl);
decode(exclusive, bl);
Expand All @@ -215,19 +218,20 @@ WRITE_CLASS_ENCODER(trim_part)

struct list_part
{
std::optional<std::string> tag;
std::uint64_t ofs{0};
int max_entries{100};

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::optional<std::string> tag;
encode(tag, bl);
encode(ofs, bl);
encode(max_entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::optional<std::string> tag;
decode(tag, bl);
decode(ofs, bl);
decode(max_entries, bl);
Expand All @@ -239,14 +243,14 @@ inline constexpr int MAX_LIST_ENTRIES = 512;

struct list_part_reply
{
std::string tag;
std::vector<part_list_entry> entries;
bool more{false};
bool full_part{false}; /* whether part is full or still can be written to.
A non full part is by definition head part */

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::string tag;
encode(tag, bl);
encode(entries, bl);
encode(more, bl);
Expand All @@ -255,6 +259,7 @@ struct list_part_reply
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::string tag;
decode(tag, bl);
decode(entries, bl);
decode(more, bl);
Expand Down
50 changes: 16 additions & 34 deletions src/cls/fifo/cls_fifo_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,17 @@ struct journal_entry {
remove = 3,
} op{Op::unknown};

std::int64_t part_num{0};
std::string part_tag;
std::int64_t part_num{-1};

journal_entry() = default;
journal_entry(Op op, std::int64_t part_num)
: op(op), part_num(part_num) {}

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode((int)op, bl);
encode(part_num, bl);
std::string part_tag;
encode(part_tag, bl);
ENCODE_FINISH(bl);
}
Expand All @@ -142,15 +146,15 @@ struct journal_entry {
decode(i, bl);
op = static_cast<Op>(i);
decode(part_num, bl);
std::string part_tag;
decode(part_tag, bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter* f) const;

friend bool operator ==(const journal_entry& lhs, const journal_entry& rhs) {
return (lhs.op == rhs.op &&
lhs.part_num == rhs.part_num &&
lhs.part_tag == rhs.part_tag);
lhs.part_num == rhs.part_num);
}
};
WRITE_CLASS_ENCODER(journal_entry)
Expand All @@ -169,8 +173,7 @@ inline std::ostream& operator <<(std::ostream& m, const journal_entry::Op& o) {
}
inline std::ostream& operator <<(std::ostream& m, const journal_entry& j) {
return m << "op: " << j.op << ", "
<< "part_num: " << j.part_num << ", "
<< "part_tag: " << j.part_tag;
<< "part_num: " << j.part_num;
}

// This is actually a useful builder, since otherwise we end up with
Expand Down Expand Up @@ -311,9 +314,6 @@ struct info {
std::int64_t min_push_part_num{0};
std::int64_t max_push_part_num{-1};

std::string head_tag;
std::map<int64_t, std::string> tags;

std::multimap<int64_t, journal_entry> journal;

bool need_new_head() const {
Expand All @@ -334,6 +334,8 @@ struct info {
encode(head_part_num, bl);
encode(min_push_part_num, bl);
encode(max_push_part_num, bl);
std::string head_tag;
std::map<int64_t, std::string> tags;
encode(tags, bl);
encode(head_tag, bl);
encode(journal, bl);
Expand All @@ -349,6 +351,8 @@ struct info {
decode(head_part_num, bl);
decode(min_push_part_num, bl);
decode(max_push_part_num, bl);
std::string head_tag;
std::map<int64_t, std::string> tags;
decode(tags, bl);
decode(head_tag, bl);
decode(journal, bl);
Expand All @@ -361,14 +365,6 @@ struct info {
return fmt::format("{}.{}", oid_prefix, part_num);
}

journal_entry next_journal_entry(std::string tag) const {
journal_entry entry;
entry.op = journal_entry::Op::create;
entry.part_num = max_push_part_num + 1;
entry.part_tag = std::move(tag);
return entry;
}

std::optional<std::string>
apply_update(const update& update) {
if (update.tail_part_num()) {
Expand All @@ -393,10 +389,6 @@ struct info {
"allowed, part num={}", entry.part_num);
}

if (entry.op == journal_entry::Op::create) {
tags[entry.part_num] = entry.part_tag;
}

journal.emplace(entry.part_num, entry);
}

Expand All @@ -405,14 +397,7 @@ struct info {
}

if (update.head_part_num()) {
tags.erase(head_part_num);
head_part_num = *update.head_part_num();
auto iter = tags.find(head_part_num);
if (iter != tags.end()) {
head_tag = iter->second;
} else {
head_tag.erase();
}
}

return std::nullopt;
Expand All @@ -428,8 +413,6 @@ inline std::ostream& operator <<(std::ostream& m, const info& i) {
<< "head_part_num: " << i.head_part_num << ", "
<< "min_push_part_num: " << i.min_push_part_num << ", "
<< "max_push_part_num: " << i.max_push_part_num << ", "
<< "head_tag: " << i.head_tag << ", "
<< "tags: {" << i.tags << "}, "
<< "journal: {" << i.journal;
}

Expand Down Expand Up @@ -470,8 +453,6 @@ inline std::ostream& operator <<(std::ostream& m,
}

struct part_header {
std::string tag;

data_params params;

std::uint64_t magic{0};
Expand All @@ -485,6 +466,7 @@ struct part_header {

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
std::string tag;
encode(tag, bl);
encode(params, bl);
encode(magic, bl);
Expand All @@ -498,6 +480,7 @@ struct part_header {
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
std::string tag;
decode(tag, bl);
decode(params, bl);
decode(magic, bl);
Expand All @@ -513,8 +496,7 @@ struct part_header {
WRITE_CLASS_ENCODER(part_header)
inline std::ostream& operator <<(std::ostream& m, const part_header& p) {
using ceph::operator <<;
return m << "tag: " << p.tag << ", "
<< "params: {" << p.params << "}, "
return m << "params: {" << p.params << "}, "
<< "magic: " << p.magic << ", "
<< "min_ofs: " << p.min_ofs << ", "
<< "last_ofs: " << p.last_ofs << ", "
Expand Down
3 changes: 0 additions & 3 deletions src/neorados/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,3 @@ target_link_libraries(libneorados PRIVATE
# ${BLKID_LIBRARIES} ${CRYPTO_LIBS} ${EXTRALIBS})
# target_link_libraries(libneorados ${rados_libs})
# install(TARGETS libneorados DESTINATION ${CMAKE_INSTALL_LIBDIR})
add_library(neorados_cls_fifo STATIC cls/fifo.cc)
target_link_libraries(neorados_cls_fifo PRIVATE
libneorados ceph-common fmt::fmt)
Loading

0 comments on commit f89a6d1

Please sign in to comment.