Skip to content

Commit

Permalink
cls/fifo: Journal is flat_set, not multimap
Browse files Browse the repository at this point in the history
We don't really need the overhead and complexity of a multimap.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <[email protected]>
  • Loading branch information
adamemerson committed Jan 9, 2023
1 parent 32b514c commit fcaa45d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 35 deletions.
69 changes: 57 additions & 12 deletions src/cls/fifo/cls_fifo_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <string>
#include <vector>

#include <boost/container/flat_set.hpp>

#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
#include <fmt/format.h>
Expand Down Expand Up @@ -117,19 +119,33 @@ inline std::ostream& operator <<(std::ostream& m, const data_params& d) {

struct journal_entry {
enum class Op {
unknown = 0,
unknown = -1,
create = 1,
set_head = 2,
remove = 3,
} op{Op::unknown};

std::int64_t part_num{-1};

bool valid() const {
using enum Op;
switch (op) {
case create: [[fallthrough]];
case set_head: [[fallthrough]];
case remove:
return part_num >= 0;

default:
return false;
}
}

journal_entry() = default;
journal_entry(Op op, std::int64_t part_num)
: op(op), part_num(part_num) {}

void encode(ceph::buffer::list& bl) const {
ceph_assert(valid());
ENCODE_START(1, 1, bl);
encode((int)op, bl);
encode(part_num, bl);
Expand Down Expand Up @@ -308,8 +324,39 @@ struct info {
std::int64_t min_push_part_num{0};
std::int64_t max_push_part_num{-1};

std::multimap<int64_t, journal_entry> journal;
boost::container::flat_set<journal_entry> journal;
static_assert(journal_entry::Op::create < journal_entry::Op::set_head);

// So we can get rid of the multimap without breaking compatibility
void encode_journal(bufferlist& bl) const {
using ceph::encode;
assert(journal.size() <= std::numeric_limits<uint32_t>::max());
uint32_t n = static_cast<uint32_t>(journal.size());
encode(n, bl);
for (const auto& entry : journal) {
encode(entry.part_num, bl);
encode(entry, bl);
}
}

void decode_journal( bufferlist::const_iterator& p) {
using enum journal_entry::Op;
using ceph::decode;
uint32_t n;
decode(n, p);
journal.clear();
while (n--) {
decltype(journal_entry::part_num) dummy;
decode(dummy, p);
journal_entry e;
decode(e, p);
if (!e.valid()) {
throw ceph::buffer::malformed_input();
} else {
journal.insert(std::move(e));
}
}
}
bool need_new_head() const {
return (head_part_num < min_push_part_num);
}
Expand All @@ -332,7 +379,7 @@ struct info {
std::map<int64_t, std::string> tags;
encode(tags, bl);
encode(head_tag, bl);
encode(journal, bl);
encode_journal(bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
Expand All @@ -349,7 +396,7 @@ struct info {
std::map<int64_t, std::string> tags;
decode(tags, bl);
decode(head_tag, bl);
decode(journal, bl);
decode_journal(bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter* f) const;
Expand Down Expand Up @@ -379,19 +426,17 @@ struct info {
}

for (const auto& entry : update.journal_entries_add()) {
if (std::find_if(journal.begin(), journal.end(),
[&entry](const auto &x) { return x.second == entry; })
!= journal.end()) {
continue;
} else {
journal.emplace(entry.part_num, entry);
auto [iter, inserted] = journal.insert(entry);
if (inserted) {
changed = true;
}
}

for (const auto& entry : update.journal_entries_rm()) {
journal.erase(entry.part_num);
changed = true;
auto count = journal.erase(entry);
if (count > 0) {
changed = true;
}
}

if (update.head_part_num() && (head_part_num != *update.head_part_num())) {
Expand Down
41 changes: 18 additions & 23 deletions src/rgw/driver/rados/cls_fifo_legacy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*
*/

#include <algorithm>
#include <cstdint>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -602,7 +603,7 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
l.unlock();

int r = 0;
for (auto& [n, entry] : tmpjournal) {
for (auto& entry : tmpjournal) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << entry << " tid=" << tid
<< dendl;
Expand Down Expand Up @@ -689,13 +690,9 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
<< " update canceled, retrying: i=" << i << " tid="
<< tid << dendl;
for (auto& e : processed) {
auto jiter = info.journal.find(e.part_num);
/* journal entry was already processed */
if (jiter == info.journal.end() ||
!(jiter->second == e)) {
continue;
if (info.journal.contains(e)) {
new_processed.push_back(e);
}
new_processed.push_back(e);
}
processed = std::move(new_processed);
}
Expand All @@ -721,7 +718,7 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
if (info.journal.contains(jentries.front())) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
Expand Down Expand Up @@ -753,8 +750,8 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
r = _update_meta(dpp, u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
auto found = (info.journal.find(jentries.front().part_num) !=
info.journal.end());
auto found = (info.journal.contains({create, jentries.front().part_num}) ||
info.journal.contains({set_head, jentries.front().part_num}));
if ((info.max_push_part_num >= jentries.front().part_num &&
info.head_part_num >= new_head_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
Expand Down Expand Up @@ -880,12 +877,13 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
}

if (canceled) {
using enum fifo::journal_entry::Op;
std::unique_lock l(f->m);
auto iter = f->info.journal.find(jentries.front().part_num);
auto found = (f->info.journal.contains({create, jentries.front().part_num}) ||
f->info.journal.contains({set_head, jentries.front().part_num}));
auto max_push_part_num = f->info.max_push_part_num;
auto head_part_num = f->info.head_part_num;
auto version = f->info.version;
auto found = (iter != f->info.journal.end());
l.unlock();
if ((max_push_part_num >= jentries.front().part_num &&
head_part_num >= new_head_part_num)) {
Expand Down Expand Up @@ -926,7 +924,8 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
if (info.journal.contains({create, jentries.front().part_num}) &&
(!is_head || info.journal.contains({set_head, jentries.front().part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
Expand Down Expand Up @@ -2014,8 +2013,8 @@ struct JournalProcessor : public Completion<JournalProcessor> {
FIFO* const fifo;

std::vector<fifo::journal_entry> processed;
std::multimap<std::int64_t, fifo::journal_entry> journal;
std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
decltype(fifo->info.journal) journal;
decltype(journal)::iterator iter;
std::int64_t new_tail;
std::int64_t new_head;
std::int64_t new_max;
Expand Down Expand Up @@ -2173,13 +2172,9 @@ struct JournalProcessor : public Completion<JournalProcessor> {
std::vector<fifo::journal_entry> new_processed;
std::unique_lock l(fifo->m);
for (auto& e : processed) {
auto jiter = fifo->info.journal.find(e.part_num);
/* journal entry was already processed */
if (jiter == fifo->info.journal.end() ||
!(jiter->second == e)) {
continue;
if (fifo->info.journal.contains(e)) {
new_processed.push_back(e);
}
new_processed.push_back(e);
}
processed = std::move(new_processed);
}
Expand Down Expand Up @@ -2231,7 +2226,7 @@ struct JournalProcessor : public Completion<JournalProcessor> {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << *iter
<< " tid=" << tid << dendl;
const auto entry = iter->second;
const auto entry = *iter;
switch (entry.op) {
using enum fifo::journal_entry::Op;
case create:
Expand Down Expand Up @@ -2264,7 +2259,7 @@ struct JournalProcessor : public Completion<JournalProcessor> {
<< " entering: tid=" << tid << dendl;
switch (state) {
case entry_callback:
finish_je(dpp, std::move(p), r, iter->second);
finish_je(dpp, std::move(p), r, *iter);
return;
case pp_callback:
auto c = canceled;
Expand Down

0 comments on commit fcaa45d

Please sign in to comment.