Skip to content

Commit

Permalink
osd/: add projected_log to do client dup detection on not yet committ…
Browse files Browse the repository at this point in the history
…ed log entries

Log entries don't get added to the log for ECBackend until reads are
done, yet we still want any other requests with the same id to wait.

ReplicatedPG::update_range should consider the projected log as well.

Signed-off-by: Samuel Just <[email protected]>
  • Loading branch information
athanatos committed Nov 17, 2016
1 parent 907b357 commit ccbc90d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 39 deletions.
43 changes: 37 additions & 6 deletions src/osd/PG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ void PG::clear_primary_state()
min_last_complete_ondisk = eversion_t();
pg_trim_to = eversion_t();
might_have_unfound.clear();
projected_log = PGLog::IndexedLog();

last_update_ondisk = eversion_t();

Expand Down Expand Up @@ -2470,6 +2471,18 @@ void PG::update_heartbeat_peers()
osd->need_heartbeat_peer_update();
}


bool PG::check_in_progress_op(
const osd_reqid_t &r,
eversion_t *replay_version,
version_t *user_version,
int *return_code) const
{
return (
projected_log.get_request(r, replay_version, user_version, return_code) ||
pg_log.get_log().get_request(r, replay_version, user_version, return_code));
}

void PG::_update_calc_stats()
{
info.stats.version = info.last_update;
Expand Down Expand Up @@ -3062,6 +3075,12 @@ void PG::append_log(
pg_log.roll_forward(&handler);
}
}
auto last = logv.rbegin();
if (is_primary() && last != logv.rend()) {
projected_log.skip_can_rollback_to_to_head();
projected_log.trim(last->version, nullptr);
}

if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) {
pg_log.roll_forward_to(
roll_forward_to,
Expand Down Expand Up @@ -4194,16 +4213,28 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
}

// walk the log to find the latest update that affects our chunk
scrubber.subset_last_update = pg_log.get_tail();
for (list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
p != pg_log.get_log().log.rend();
++p) {
scrubber.subset_last_update = eversion_t();
for (auto p = projected_log.log.rbegin();
p != projected_log.log.rend();
++p) {
if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 &&
cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) {
scrubber.subset_last_update = p->version;
break;
}
}
}
}
if (scrubber.subset_last_update == eversion_t()) {
for (list<pg_log_entry_t>::const_reverse_iterator p =
pg_log.get_log().log.rbegin();
p != pg_log.get_log().log.rend();
++p) {
if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 &&
cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) {
scrubber.subset_last_update = p->version;
break;
}
}
}

// ask replicas to wait until
// last_update_applied >= scrubber.subset_last_update and then scan
Expand Down
6 changes: 6 additions & 0 deletions src/osd/PG.h
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,12 @@ class PG : protected DoutPrefixProvider {
PerfCounters *logger = NULL);
void write_if_dirty(ObjectStore::Transaction& t);

PGLog::IndexedLog projected_log;
bool check_in_progress_op(
const osd_reqid_t &r,
eversion_t *replay_version,
version_t *user_version,
int *return_code) const;
eversion_t projected_last_update;
eversion_t get_next_version() const {
eversion_t at_version(
Expand Down
15 changes: 15 additions & 0 deletions src/osd/PGLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ struct PGLog : DoutPrefixProvider {
return divergent;
}

template <typename T>
void scan_log_after(
const eversion_t &bound, ///< [in] scan entries > bound
T &&f) const {
auto iter = log.rbegin();
while (iter != log.rend() && iter->version > bound)
++iter;

while (true) {
if (iter == log.rbegin())
break;
f(*(--iter));
}
}

/****/
void claim_log_and_clear_rollback_info(const pg_log_t& o) {
// we must have already trimmed the old entries
Expand Down
64 changes: 31 additions & 33 deletions src/osd/ReplicatedPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ void ReplicatedPG::do_op(OpRequestRef& op)
eversion_t replay_version;
version_t user_version;
int return_code = 0;
bool got = pg_log.get_log().get_request(
bool got = check_in_progress_op(
m->get_reqid(), &replay_version, &user_version, &return_code);
if (got) {
dout(3) << __func__ << " dup " << m->get_reqid()
Expand Down Expand Up @@ -8573,6 +8573,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
assert(ctx->at_version >= projected_last_update);
projected_last_update = ctx->at_version;
}
for (auto &&entry: ctx->log) {
projected_log.add(entry);
}
pgbackend->submit_transaction(
soid,
ctx->delta_stats,
Expand Down Expand Up @@ -11358,11 +11361,11 @@ void ReplicatedPG::update_range(
scan_range(local_min, local_max, bi, handle);
}

if (bi->version >= info.last_update) {
if (bi->version >= projected_last_update) {
dout(10) << __func__<< ": bi is current " << dendl;
assert(bi->version == info.last_update);
assert(bi->version == projected_last_update);
} else if (bi->version >= info.log_tail) {
if (pg_log.get_log().empty()) {
if (pg_log.get_log().empty() && projected_log.empty()) {
/* Because we don't move log_tail on split, the log might be
* empty even if log_tail != last_update. However, the only
* way to get here with an empty log is if log_tail is actually
Expand All @@ -11372,41 +11375,36 @@ void ReplicatedPG::update_range(
assert(bi->version == eversion_t());
return;
}
assert(!pg_log.get_log().empty());

dout(10) << __func__<< ": bi is old, (" << bi->version
<< ") can be updated with log" << dendl;
list<pg_log_entry_t>::const_iterator i =
pg_log.get_log().log.end();
--i;
while (i != pg_log.get_log().log.begin() &&
i->version > bi->version) {
--i;
}
if (i->version == bi->version)
++i;
<< ") can be updated with log to projected_last_update "
<< projected_last_update << dendl;

assert(i != pg_log.get_log().log.end());
dout(10) << __func__ << ": updating from version " << i->version
<< dendl;
for (; i != pg_log.get_log().log.end(); ++i) {
const hobject_t &soid = i->soid;
auto func = [&](const pg_log_entry_t &e) {
dout(10) << __func__ << ": updating from version " << e.version
<< dendl;
const hobject_t &soid = e.soid;
if (cmp(soid, bi->begin, get_sort_bitwise()) >= 0 &&
cmp(soid, bi->end, get_sort_bitwise()) < 0) {
if (i->is_update()) {
dout(10) << __func__ << ": " << i->soid << " updated to version "
<< i->version << dendl;
bi->objects.erase(i->soid);
if (e.is_update()) {
dout(10) << __func__ << ": " << e.soid << " updated to version "
<< e.version << dendl;
bi->objects.erase(e.soid);
bi->objects.insert(
make_pair(
i->soid,
i->version));
} else if (i->is_delete()) {
dout(10) << __func__ << ": " << i->soid << " removed" << dendl;
bi->objects.erase(i->soid);
}
}
}
bi->version = info.last_update;
e.soid,
e.version));
} else if (e.is_delete()) {
dout(10) << __func__ << ": " << e.soid << " removed" << dendl;
bi->objects.erase(e.soid);
}
}
};
dout(10) << "scanning pg log first" << dendl;
pg_log.get_log().scan_log_after(bi->version, func);
dout(10) << "scanning projected log" << dendl;
projected_log.scan_log_after(bi->version, func);
bi->version = projected_last_update;
} else {
assert(0 == "scan_range should have raised bi->version past log_tail");
}
Expand Down

0 comments on commit ccbc90d

Please sign in to comment.