Skip to content

Commit

Permalink
Merge pull request ceph#17839 from liewegas/wip-recovery-preemption
Browse files Browse the repository at this point in the history
osd: allow PG recovery scheduling preemption

Reviewed-by: Greg Farnum <[email protected]>
  • Loading branch information
liewegas authored Sep 28, 2017
2 parents f1c01db + d7b29ac commit c59efe0
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 100 deletions.
51 changes: 51 additions & 0 deletions qa/suites/rados/singleton/all/recovery-preemption.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
roles:
- - mon.a
- mon.b
- mon.c
- mgr.x
- osd.0
- osd.1
- osd.2
- osd.3
openstack:
- volumes: # attached to each instance
count: 3
size: 20 # GB
tasks:
- install:
- ceph:
conf:
osd:
osd recovery sleep: .1
osd min pg log entries: 100
osd max pg log entries: 1000
log-whitelist:
- \(POOL_APP_NOT_ENABLED\)
- \(OSDMAP_FLAGS\)
- \(OSD_
- \(OBJECT_
- \(PG_
- overall HEALTH
- exec:
osd.0:
- ceph osd pool create foo 128
- ceph osd pool application enable foo foo
- rados -p foo bench 30 write -b 4096 --no-cleanup
- ceph osd out 0
- sleep 5
- ceph osd set noup
- ceph.restart:
daemons: [osd.1]
wait-for-up: false
wait-for-healthy: false
- exec:
osd.0:
- rados -p foo bench 3 write -b 4096 --no-cleanup
- ceph osd unset noup
- sleep 10
- ceph tell osd.* config set osd_recovery_sleep 0
- ceph tell osd.* config set osd_recovery_max_active 20
- ceph.healthy:
- exec:
osd.0:
- egrep '(defer backfill|defer recovery)' /var/log/ceph/ceph-osd.*.log
162 changes: 126 additions & 36 deletions src/common/AsyncReserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "common/Finisher.h"
#include "common/Formatter.h"

#define rdout(x) lgeneric_subdout(cct,reserver,x)

/**
* Manages a configurable number of asyncronous reservations.
*
Expand All @@ -27,38 +29,104 @@
*/
template <typename T>
class AsyncReserver {
CephContext *cct;
Finisher *f;
unsigned max_allowed;
unsigned min_priority;
Mutex lock;

map<unsigned, list<pair<T, Context*> > > queues;
map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
set<T> in_progress;
struct Reservation {
T item;
unsigned prio = 0;
Context *grant = 0;
Context *preempt = 0;
Reservation() {}
Reservation(T i, unsigned pr, Context *g, Context *p = 0)
: item(i), prio(pr), grant(g), preempt(p) {}
void dump(Formatter *f) const {
f->dump_stream("item") << item;
f->dump_unsigned("prio", prio);
f->dump_bool("can_preempt", !!preempt);
}
friend ostream& operator<<(ostream& out, const Reservation& r) {
return out << r.item << "(prio " << r.prio << " grant " << r.grant
<< " preempt " << r.preempt << ")";
}
};

map<unsigned, list<Reservation>> queues;
map<T, pair<unsigned, typename list<Reservation>::iterator>> queue_pointers;
map<T,Reservation> in_progress;
set<pair<unsigned,T>> preempt_by_prio; ///< in_progress that can be preempted

void preempt_one() {
assert(!preempt_by_prio.empty());
auto q = in_progress.find(preempt_by_prio.begin()->second);
assert(q != in_progress.end());
Reservation victim = q->second;
rdout(10) << __func__ << " preempt " << victim << dendl;
f->queue(victim.preempt);
victim.preempt = nullptr;
in_progress.erase(q);
preempt_by_prio.erase(preempt_by_prio.begin());
}

void do_queues() {
typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
for (it = queues.rbegin();
it != queues.rend() &&
in_progress.size() < max_allowed &&
it->first >= min_priority;
++it) {
while (in_progress.size() < max_allowed &&
!it->second.empty()) {
pair<T, Context*> p = it->second.front();
queue_pointers.erase(p.first);
it->second.pop_front();
f->queue(p.second);
in_progress.insert(p.first);
rdout(20) << __func__ << ":\n";
JSONFormatter jf(true);
jf.open_object_section("queue");
_dump(&jf);
jf.close_section();
jf.flush(*_dout);
*_dout << dendl;

// in case min_priority was adjusted up or max_allowed was adjusted down
while (!preempt_by_prio.empty() &&
(in_progress.size() > max_allowed ||
preempt_by_prio.begin()->first < min_priority)) {
preempt_one();
}

while (!queues.empty()) {
// choose highest priority queue
auto it = queues.end();
--it;
assert(!it->second.empty());
if (it->first < min_priority) {
break;
}
if (in_progress.size() >= max_allowed &&
!preempt_by_prio.empty() &&
it->first > preempt_by_prio.begin()->first) {
preempt_one();
}
if (in_progress.size() >= max_allowed) {
break; // no room
}
// grant
Reservation p = it->second.front();
rdout(10) << __func__ << " grant " << p << dendl;
queue_pointers.erase(p.item);
it->second.pop_front();
if (it->second.empty()) {
queues.erase(it);
}
f->queue(p.grant);
p.grant = nullptr;
in_progress[p.item] = p;
if (p.preempt) {
preempt_by_prio.insert(make_pair(p.prio, p.item));
}
}
}
public:
AsyncReserver(
CephContext *cct,
Finisher *f,
unsigned max_allowed,
unsigned min_priority = 0)
: f(f),
: cct(cct),
f(f),
max_allowed(max_allowed),
min_priority(min_priority),
lock("AsyncReserver::lock") {}
Expand All @@ -77,27 +145,26 @@ class AsyncReserver {

void dump(Formatter *f) {
Mutex::Locker l(lock);
_dump(f);
}
void _dump(Formatter *f) {
f->dump_unsigned("max_allowed", max_allowed);
f->dump_unsigned("min_priority", min_priority);
f->open_array_section("queues");
for (typename map<unsigned, list<pair<T, Context*> > > ::const_iterator p =
queues.begin(); p != queues.end(); ++p) {
for (auto& p : queues) {
f->open_object_section("queue");
f->dump_unsigned("priority", p->first);
f->dump_unsigned("priority", p.first);
f->open_array_section("items");
for (typename list<pair<T, Context*> >::const_iterator q =
p->second.begin(); q != p->second.end(); ++q) {
f->dump_stream("item") << q->first;
for (auto& q : p.second) {
f->dump_object("item", q);
}
f->close_section();
f->close_section();
}
f->close_section();
f->open_array_section("in_progress");
for (typename set<T>::const_iterator p = in_progress.begin();
p != in_progress.end();
++p) {
f->dump_stream("item") << *p;
for (auto& p : in_progress) {
f->dump_object("item", p.second);
}
f->close_section();
}
Expand All @@ -113,13 +180,17 @@ class AsyncReserver {
void request_reservation(
T item, ///< [in] reservation key
Context *on_reserved, ///< [in] callback to be called on reservation
unsigned prio
unsigned prio, ///< [in] priority
Context *on_preempt = 0 ///< [in] callback to be called if we are preempted (optional)
) {
Mutex::Locker l(lock);
Reservation r(item, prio, on_reserved, on_preempt);
rdout(10) << __func__ << " queue " << r << dendl;
assert(!queue_pointers.count(item) &&
!in_progress.count(item));
queues[prio].push_back(make_pair(item, on_reserved));
queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
queues[prio].push_back(r);
queue_pointers.insert(make_pair(item,
make_pair(prio,--(queues[prio]).end())));
do_queues();
}

Expand All @@ -134,13 +205,31 @@ class AsyncReserver {
T item ///< [in] key for reservation to cancel
) {
Mutex::Locker l(lock);
if (queue_pointers.count(item)) {
unsigned prio = queue_pointers[item].first;
delete queue_pointers[item].second->second;
queues[prio].erase(queue_pointers[item].second);
queue_pointers.erase(item);
auto i = queue_pointers.find(item);
if (i != queue_pointers.end()) {
unsigned prio = i->second.first;
const Reservation& r = *i->second.second;
rdout(10) << __func__ << " cancel " << r << " (was queued)" << dendl;
delete r.grant;
delete r.preempt;
queues[prio].erase(i->second.second);
if (queues[prio].empty()) {
queues.erase(prio);
}
queue_pointers.erase(i);
} else {
in_progress.erase(item);
auto p = in_progress.find(item);
if (p != in_progress.end()) {
rdout(10) << __func__ << " cancel " << p->second
<< " (was in progress)" << dendl;
if (p->second.preempt) {
preempt_by_prio.erase(make_pair(p->second.prio, p->second.item));
delete p->second.preempt;
}
in_progress.erase(p);
} else {
rdout(10) << __func__ << " cancel " << item << " (not found)" << dendl;
}
}
do_queues();
}
Expand All @@ -157,4 +246,5 @@ class AsyncReserver {
static const unsigned MAX_PRIORITY = (unsigned)-1;
};

#undef rdout
#endif
1 change: 1 addition & 0 deletions src/common/subsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ SUBSYS(tp, 0, 5)
SUBSYS(auth, 1, 5)
SUBSYS(crypto, 1, 5)
SUBSYS(finisher, 1, 1)
SUBSYS(reserver, 1, 1)
SUBSYS(heartbeatmap, 1, 5)
SUBSYS(perfcounter, 1, 5)
SUBSYS(rgw, 1, 5) // log level for the Rados gateway
Expand Down
2 changes: 1 addition & 1 deletion src/mgr/DaemonServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ bool DaemonServer::handle_command(MCommand *m)
}
break;
case OFR_BACKFILL:
if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
ss << "pg " << pstr << " doesn't require backfilling; ";
continue;
} else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
Expand Down
14 changes: 7 additions & 7 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ OSDService::OSDService(OSD *osd) :
recovery_sleep_lock("OSDService::recovery_sleep_lock"),
recovery_sleep_timer(cct, recovery_sleep_lock, false),
reserver_finisher(cct),
local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
local_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills,
remote_reserver(cct, &reserver_finisher, cct->_conf->osd_max_backfills,
cct->_conf->osd_min_recovery_priority),
pg_temp_lock("OSDService::pg_temp_lock"),
snap_sleep_lock("OSDService::snap_sleep_lock"),
Expand All @@ -258,7 +258,7 @@ OSDService::OSDService(OSD *osd) :
scrub_sleep_lock("OSDService::scrub_sleep_lock"),
scrub_sleep_timer(
osd->client_messenger->cct, scrub_sleep_lock, false /* relax locking */),
snap_reserver(&reserver_finisher,
snap_reserver(cct, &reserver_finisher,
cct->_conf->osd_max_trimming_pgs),
recovery_lock("OSDService::recovery_lock"),
recovery_ops_active(0),
Expand Down Expand Up @@ -9069,7 +9069,7 @@ void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
i->lock();
int pgstate = i->get_state();
if ( ((newstate == PG_STATE_FORCED_RECOVERY) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING))) ||
((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL))) )
((newstate == PG_STATE_FORCED_BACKFILL) && (pgstate & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING))) )
i->_change_recovery_force_mode(newstate, false);
i->unlock();
}
Expand Down Expand Up @@ -9154,18 +9154,18 @@ void OSD::do_recovery(
pg->discover_all_missing(*rctx.query_map);
if (rctx.query_map->empty()) {
string action;
if (pg->state_test(PG_STATE_BACKFILL)) {
if (pg->state_test(PG_STATE_BACKFILLING)) {
auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
queued,
queued,
PG::CancelBackfill()));
PG::DeferBackfill(cct->_conf->osd_recovery_retry_interval)));
pg->queue_peering_event(evt);
action = "in backfill";
} else if (pg->state_test(PG_STATE_RECOVERING)) {
auto evt = PG::CephPeeringEvtRef(new PG::CephPeeringEvt(
queued,
queued,
PG::CancelRecovery()));
PG::DeferRecovery(cct->_conf->osd_recovery_retry_interval)));
pg->queue_peering_event(evt);
action = "in recovery";
} else {
Expand Down
Loading

0 comments on commit c59efe0

Please sign in to comment.