Skip to content

Commit

Permalink
Merge PR ceph#24804 into nautilus
Browse files Browse the repository at this point in the history
* refs/pull/24804/head:
	include/types: fixed compile warning for signed/unsigned comparison
	osd/PrimaryLogPG: uncommitted dup ops should respond with logged return code
	osd/PrimaryLogPG: propagate error return codes on object copy_get ops
	osd/PGLog: optionally record error return codes for extra_reqids
	osd/osd_types: include PG log return codes in object copy data
  • Loading branch information
liewegas committed Oct 30, 2018
2 parents bc7cfe0 + abba091 commit 5f51cd2
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ inline std::ostream& operator<<(std::ostream& out, const std::deque<A,Alloc>& v)

template<typename... Ts>
inline std::ostream& operator<<(std::ostream& out, const std::tuple<Ts...> &t) {
auto f = [n = sizeof...(Ts), i = 0, &out](const auto& e) mutable {
auto f = [n = sizeof...(Ts), i = 0U, &out](const auto& e) mutable {
out << e;
if (++i != n)
out << ",";
Expand Down
2 changes: 1 addition & 1 deletion src/osd/PG.h
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,7 @@ class PG : public DoutPrefixProvider {
map<hobject_t, list<Context*>> callbacks_for_degraded_object;

map<eversion_t,
list<pair<OpRequestRef, version_t> > > waiting_for_ondisk;
list<tuple<OpRequestRef, version_t, int> > > waiting_for_ondisk;

void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m);
void requeue_op(OpRequestRef op);
Expand Down
12 changes: 11 additions & 1 deletion src/osd/PGLog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,20 @@ void PGLog::IndexedLog::trim(
}
dups.push_back(pg_log_dup_t(e));
index(dups.back());
uint32_t idx = 0;
for (const auto& extra : e.extra_reqids) {
int return_code = e.return_code;
if (return_code >= 0) {
auto it = e.extra_reqid_return_codes.find(idx);
if (it != e.extra_reqid_return_codes.end()) {
return_code = it->second;
}
}
++idx;

// note: extras have the same version as outer op
dups.push_back(pg_log_dup_t(e.version, extra.second,
extra.first, e.return_code));
extra.first, return_code));
index(dups.back());
}
}
Expand Down
21 changes: 18 additions & 3 deletions src/osd/PGLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,20 @@ struct PGLog : DoutPrefixProvider {
}
p = extra_caller_ops.find(r);
if (p != extra_caller_ops.end()) {
uint32_t idx = 0;
for (auto i = p->second->extra_reqids.begin();
i != p->second->extra_reqids.end();
++i) {
++idx, ++i) {
if (i->first == r) {
*version = p->second->version;
*user_version = i->second;
*return_code = p->second->return_code;
if (*return_code >= 0) {
auto it = p->second->extra_reqid_return_codes.find(idx);
if (it != p->second->extra_reqid_return_codes.end()) {
*return_code = it->second;
}
}
return true;
}
}
Expand All @@ -318,20 +325,28 @@ struct PGLog : DoutPrefixProvider {

/// get a (bounded) list of recent reqids for the given object
void get_object_reqids(const hobject_t& oid, unsigned max,
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls) const {
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls,
mempool::osd_pglog::map<uint32_t, int> *return_codes) const {
// make sure object is present at least once before we do an
// O(n) search.
if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
index_objects();
}
if (objects.count(oid) == 0)
return;

for (list<pg_log_entry_t>::const_reverse_iterator i = log.rbegin();
i != log.rend();
++i) {
if (i->soid == oid) {
if (i->reqid_is_indexed())
if (i->reqid_is_indexed()) {
if (i->op == pg_log_entry_t::ERROR) {
// propagate op errors to the cache tier's PG log
return_codes->emplace(pls->size(), i->return_code);
}
pls->push_back(make_pair(i->reqid, i->user_version));
}

pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
if (pls->size() >= max) {
if (pls->size() > max) {
Expand Down
55 changes: 27 additions & 28 deletions src/osd/PrimaryLogPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
} else {
dout(10) << " waiting for " << version << " to commit" << dendl;
// always queue ondisk waiters, so that we can requeue if needed
waiting_for_ondisk[version].push_back(make_pair(op, user_version));
waiting_for_ondisk[version].emplace_back(op, user_version, return_code);
op->mark_delayed("waiting for ondisk");
}
return;
Expand Down Expand Up @@ -8485,8 +8485,10 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type)
}

if (!ctx->extra_reqids.empty()) {
dout(20) << __func__ << " extra_reqids " << ctx->extra_reqids << dendl;
dout(20) << __func__ << " extra_reqids " << ctx->extra_reqids << " "
<< ctx->extra_reqid_return_codes << dendl;
ctx->log.back().extra_reqids.swap(ctx->extra_reqids);
ctx->log.back().extra_reqid_return_codes.swap(ctx->extra_reqid_return_codes);
}

// apply new object state.
Expand Down Expand Up @@ -8784,7 +8786,9 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::const_iterator& bp,
if (cursor.is_complete()) {
// include reqids only in the final step. this is a bit fragile
// but it works...
pg_log.get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10, &reply_obj.reqids);
pg_log.get_log().get_object_reqids(ctx->obc->obs.oi.soid, 10,
&reply_obj.reqids,
&reply_obj.reqid_return_codes);
dout(20) << " got reqids" << dendl;
}

Expand Down Expand Up @@ -8820,7 +8824,8 @@ void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
uint64_t features = m->get_features();
object_copy_data_t reply_obj;

pg_log.get_log().get_object_reqids(oid, 10, &reply_obj.reqids);
pg_log.get_log().get_object_reqids(oid, 10, &reply_obj.reqids,
&reply_obj.reqid_return_codes);
dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
encode(reply_obj, osd_op.outdata, features);
osd_op.rval = -ENOENT;
Expand Down Expand Up @@ -8922,6 +8927,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
&cop->results.source_data_digest,
&cop->results.source_omap_digest,
&cop->results.reqids,
&cop->results.reqid_return_codes,
&cop->results.truncate_seq,
&cop->results.truncate_size,
&cop->rval);
Expand Down Expand Up @@ -9464,6 +9470,7 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
obs.oi.truncate_size = cb->results->truncate_size;

ctx->extra_reqids = cb->results->reqids;
ctx->extra_reqid_return_codes = cb->results->reqid_return_codes;

// cache: clear whiteout?
if (obs.oi.is_whiteout()) {
Expand Down Expand Up @@ -9627,6 +9634,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
tctx->new_obs.exists = true;

tctx->extra_reqids = results->reqids;
tctx->extra_reqid_return_codes = results->reqid_return_codes;

if (whiteout) {
// create a whiteout
Expand Down Expand Up @@ -10363,12 +10371,13 @@ void PrimaryLogPG::eval_repop(RepGather *repop)
auto it = waiting_for_ondisk.find(repop->v);
if (it != waiting_for_ondisk.end()) {
ceph_assert(waiting_for_ondisk.begin()->first == repop->v);
for (list<pair<OpRequestRef, version_t> >::iterator i =
it->second.begin();
i != it->second.end();
++i) {
osd->reply_op_error(i->first, repop->r, repop->v,
i->second);
for (auto& i : it->second) {
int return_code = repop->r;
if (return_code >= 0) {
return_code = std::get<2>(i);
}
osd->reply_op_error(std::get<0>(i), return_code, repop->v,
std::get<1>(i));
}
waiting_for_ondisk.erase(it);
}
Expand Down Expand Up @@ -11877,15 +11886,11 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue)
}

// also requeue any dups, interleaved into position
map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator p =
waiting_for_ondisk.find(repop->v);
auto p = waiting_for_ondisk.find(repop->v);
if (p != waiting_for_ondisk.end()) {
dout(10) << " also requeuing ondisk waiters " << p->second << dendl;
for (list<pair<OpRequestRef, version_t> >::iterator i =
p->second.begin();
i != p->second.end();
++i) {
rq.push_back(i->first);
for (auto& i : p->second) {
rq.push_back(std::get<0>(i));
}
waiting_for_ondisk.erase(p);
}
Expand All @@ -11899,17 +11904,11 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue)
if (requeue) {
requeue_ops(rq);
if (!waiting_for_ondisk.empty()) {
for (map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator i =
waiting_for_ondisk.begin();
i != waiting_for_ondisk.end();
++i) {
for (list<pair<OpRequestRef, version_t> >::iterator j =
i->second.begin();
j != i->second.end();
++j) {
derr << __func__ << ": op " << *(j->first->get_req()) << " waiting on "
<< i->first << dendl;
}
for (auto& i : waiting_for_ondisk) {
for (auto& j : i.second) {
derr << __func__ << ": op " << *(std::get<0>(j)->get_req())
<< " waiting on " << i.first << dendl;
}
}
ceph_assert(waiting_for_ondisk.empty());
}
Expand Down
2 changes: 2 additions & 0 deletions src/osd/PrimaryLogPG.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
mempool::osd_pglog::map<uint32_t, int> reqid_return_codes; // map reqids by index to error code
map<string, bufferlist> attrs; // xattrs
uint64_t truncate_seq;
uint64_t truncate_size;
Expand Down Expand Up @@ -576,6 +577,7 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {
int num_write; ///< count update ops

mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
mempool::osd_pglog::map<uint32_t, int> extra_reqid_return_codes;

hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking

Expand Down
28 changes: 23 additions & 5 deletions src/osd/osd_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4022,7 +4022,7 @@ void pg_log_entry_t::decode_with_checksum(bufferlist::const_iterator& p)

void pg_log_entry_t::encode(bufferlist &bl) const
{
ENCODE_START(11, 4, bl);
ENCODE_START(12, 4, bl);
encode(op, bl);
encode(soid, bl);
encode(version, bl);
Expand All @@ -4049,12 +4049,14 @@ void pg_log_entry_t::encode(bufferlist &bl) const
encode(extra_reqids, bl);
if (op == ERROR)
encode(return_code, bl);
if (!extra_reqids.empty())
encode(extra_reqid_return_codes, bl);
ENCODE_FINISH(bl);
}

void pg_log_entry_t::decode(bufferlist::const_iterator &bl)
{
DECODE_START_LEGACY_COMPAT_LEN(11, 4, 4, bl);
DECODE_START_LEGACY_COMPAT_LEN(12, 4, 4, bl);
decode(op, bl);
if (struct_v < 2) {
sobject_t old_soid;
Expand Down Expand Up @@ -4108,6 +4110,8 @@ void pg_log_entry_t::decode(bufferlist::const_iterator &bl)
decode(extra_reqids, bl);
if (struct_v >= 11 && op == ERROR)
decode(return_code, bl);
if (struct_v >= 12 && !extra_reqids.empty())
decode(extra_reqid_return_codes, bl);
DECODE_FINISH(bl);
}

Expand All @@ -4119,12 +4123,17 @@ void pg_log_entry_t::dump(Formatter *f) const
f->dump_stream("prior_version") << prior_version;
f->dump_stream("reqid") << reqid;
f->open_array_section("extra_reqids");
uint32_t idx = 0;
for (auto p = extra_reqids.begin();
p != extra_reqids.end();
++p) {
++idx, ++p) {
f->open_object_section("extra_reqid");
f->dump_stream("reqid") << p->first;
f->dump_stream("user_version") << p->second;
auto it = extra_reqid_return_codes.find(idx);
if (it != extra_reqid_return_codes.end()) {
f->dump_int("return_code", it->second);
}
f->close_section();
}
f->close_section();
Expand Down Expand Up @@ -4495,7 +4504,7 @@ void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>&

void object_copy_data_t::encode(bufferlist& bl, uint64_t features) const
{
ENCODE_START(7, 5, bl);
ENCODE_START(8, 5, bl);
encode(size, bl);
encode(mtime, bl);
encode(attrs, bl);
Expand All @@ -4511,6 +4520,7 @@ void object_copy_data_t::encode(bufferlist& bl, uint64_t features) const
encode(reqids, bl);
encode(truncate_seq, bl);
encode(truncate_size, bl);
encode(reqid_return_codes, bl);
ENCODE_FINISH(bl);
}

Expand Down Expand Up @@ -4574,6 +4584,9 @@ void object_copy_data_t::decode(bufferlist::const_iterator& bl)
decode(truncate_seq, bl);
decode(truncate_size, bl);
}
if (struct_v >= 8) {
decode(reqid_return_codes, bl);
}
}
DECODE_FINISH(bl);
}
Expand Down Expand Up @@ -4633,12 +4646,17 @@ void object_copy_data_t::dump(Formatter *f) const
f->dump_unsigned("snap", *p);
f->close_section();
f->open_array_section("reqids");
uint32_t idx = 0;
for (auto p = reqids.begin();
p != reqids.end();
++p) {
++idx, ++p) {
f->open_object_section("extra_reqid");
f->dump_stream("reqid") << p->first;
f->dump_stream("user_version") << p->second;
auto it = reqid_return_codes.find(idx);
if (it != reqid_return_codes.end()) {
f->dump_int("return_code", it->second);
}
f->close_section();
}
f->close_section();
Expand Down
7 changes: 7 additions & 0 deletions src/osd/osd_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3485,6 +3485,10 @@ struct pg_log_entry_t {
hobject_t soid;
osd_reqid_t reqid; // caller+tid to uniquely identify request
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;

///< map extra_reqids by index to error return code (if any)
mempool::osd_pglog::map<uint32_t, int> extra_reqid_return_codes;

eversion_t version, prior_version, reverting_to;
version_t user_version; // the user version for this entry
utime_t mtime; // this is the _user_ mtime, mind you
Expand Down Expand Up @@ -4484,6 +4488,9 @@ struct object_copy_data_t {
///< recent reqids on this object
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids;

///< map reqids by index to error return code (if any)
mempool::osd_pglog::map<uint32_t, int> reqid_return_codes;

uint64_t truncate_seq;
uint64_t truncate_size;

Expand Down
Loading

0 comments on commit 5f51cd2

Please sign in to comment.