Skip to content

Commit

Permalink
Merge pull request ceph#2081 from ceph/wip-osd-leaks
Browse files Browse the repository at this point in the history
osd: fix several memory leaks from fast dispatch

Reviewed-by: Samuel Just <[email protected]>
  • Loading branch information
Sage Weil committed Jul 8, 2014
2 parents 50a2183 + 0bf4f65 commit 59379a4
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 214 deletions.
195 changes: 61 additions & 134 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", 1),
paused_recovery(false),
session_waiting_lock("OSD::session_waiting_lock"),
session_waiting_for_map_lock("OSD::session_waiting_for_map_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
heartbeat_need_update(true), heartbeat_epoch(0),
Expand Down Expand Up @@ -1626,7 +1626,9 @@ int OSD::shutdown()
cct->_conf->set_val("debug_filestore", "100");
cct->_conf->set_val("debug_ms", "100");
cct->_conf->apply_changes(NULL);


dispatch_sessions_waiting_on_map();

// Shutdown PGs
{
RWLock::RLocker l(pg_map_lock);
Expand Down Expand Up @@ -1764,6 +1766,10 @@ int OSD::shutdown()
service.shutdown();
op_tracker.on_shutdown();

// zap the Sessions for any loopback Connections
client_messenger->get_loopback_connection()->set_priv(NULL);
cluster_messenger->get_loopback_connection()->set_priv(NULL);

class_handler->shutdown();
client_messenger->shutdown();
cluster_messenger->shutdown();
Expand Down Expand Up @@ -1882,6 +1888,7 @@ PG *OSD::_open_lock_pg(
pg_map[pgid] = pg;
pg->get("PGMap"); // because it's in pg_map
service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
wake_pg_waiters(pg, pgid);
}
wake_pg_waiters(pg, pgid);
return pg;
Expand Down Expand Up @@ -1914,6 +1921,7 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
pg->get("PGMap"); // For pg_map
pg_map[pg->info.pgid] = pg;
service.pg_add_epoch(pg->info.pgid, pg->get_osdmap()->get_epoch());
wake_pg_waiters(pg, pg->info.pgid);

dout(10) << "Adding newly split pg " << *pg << dendl;
vector<int> up, acting;
Expand Down Expand Up @@ -2034,26 +2042,20 @@ PG *OSD::_create_lock_pg(

PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
{
RWLock::RLocker l(pg_map_lock);
Session *session = static_cast<Session*>(
op->get_req()->get_connection()->get_priv());

{
RWLock::RLocker l(pg_map_lock);
ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
if (i != pg_map.end())
return i->second;
}
RWLock::WLocker l(pg_map_lock);
ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
if (i == pg_map.end())
session->waiting_for_pg[pgid];

map<spg_t, list<OpRequestRef> >::iterator wlistiter =
session->waiting_for_pg.find(pgid);

PG *out = NULL;
if (wlistiter == session->waiting_for_pg.end()) {
out = i->second;
if (i != pg_map.end()) {
return i->second;
} else {
wlistiter->second.push_back(op);
register_session_waiting_on_pg(session, pgid);
waiting_for_pg[pgid].push_back(op);
return NULL;
}
session->put();
return out;
}

bool OSD::_have_pg(spg_t pgid)
Expand Down Expand Up @@ -3694,7 +3696,7 @@ void OSD::ms_handle_fast_connect(Connection *con)
s = new Session;
con->set_priv(s->get());
s->con = con;
dout(10) << " new session (outgoing)" << s << " con=" << s->con
dout(10) << " new session (outgoing) " << s << " con=" << s->con
<< " addr=" << s->con->get_peer_addr() << dendl;
// we don't connect to clients
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
Expand Down Expand Up @@ -4968,8 +4970,6 @@ bool OSD::ms_dispatch(Message *m)

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
session->waiting_on_map.erase(i++));
Expand All @@ -4981,93 +4981,18 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
}
}


void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
void OSD::ms_fast_dispatch(Message *m)
{
assert(session->session_dispatch_lock.is_locked());
if (!session->osdmap) {
session->osdmap = newmap;
if (service.is_stopping()) {
m->put();
return;
}

if (newmap->get_epoch() == session->osdmap->get_epoch())
return;

assert(newmap->get_epoch() > session->osdmap->get_epoch());

map<spg_t, list<OpRequestRef> > from;
from.swap(session->waiting_for_pg);

for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();
i != from.end();
from.erase(i++)) {
set<spg_t> children;
if (!newmap->have_pg_pool(i->first.pool())) {
// drop this wait list on the ground
continue;
}
assert(session->osdmap->have_pg_pool(i->first.pool()));
if (i->first.is_split(
session->osdmap->get_pg_num(i->first.pool()),
newmap->get_pg_num(i->first.pool()),
&children)) {
for (set<spg_t>::iterator child = children.begin();
child != children.end();
++child) {
unsigned split_bits = child->get_split_bits(
newmap->get_pg_num(child->pool()));
list<OpRequestRef> child_ops;
OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
if (!child_ops.empty()) {
session->waiting_for_pg[*child].swap(child_ops);
register_session_waiting_on_pg(session, *child);
}
}
}
if (i->second.empty()) {
clear_session_waiting_on_pg(session, i->first);
} else {
session->waiting_for_pg[i->first].swap(i->second);
}
}

session->osdmap = newmap;
}

void OSD::session_notify_pg_create(
Session *session, OSDMapRef osdmap, spg_t pgid)
{
assert(session->session_dispatch_lock.is_locked());
update_waiting_for_pg(session, osdmap);
map<spg_t, list<OpRequestRef> >::iterator i =
session->waiting_for_pg.find(pgid);
if (i != session->waiting_for_pg.end()) {
session->waiting_on_map.splice(
session->waiting_on_map.end(),
i->second);
session->waiting_for_pg.erase(i);
}
clear_session_waiting_on_pg(session, pgid);
}

void OSD::session_notify_pg_cleared(
Session *session, OSDMapRef osdmap, spg_t pgid)
{
assert(session->session_dispatch_lock.is_locked());
update_waiting_for_pg(session, osdmap);
session->waiting_for_pg.erase(pgid);
clear_session_waiting_on_pg(session, pgid);
}

void OSD::ms_fast_dispatch(Message *m)
{
OpRequestRef op = op_tracker.create_request<OpRequest>(m);
OSDMapRef nextmap = service.get_nextmap_reserved();
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
assert(session);
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
Expand Down Expand Up @@ -6330,42 +6255,45 @@ void OSD::consume_map()
service.await_reserved_maps();
service.publish_map(osdmap);

set<Session*> sessions_to_check;
get_sessions_waiting_for_map(&sessions_to_check);
for (set<Session*>::iterator i = sessions_to_check.begin();
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
(*i)->session_dispatch_lock.Lock();
update_waiting_for_pg(*i, osdmap);
dispatch_session_waiting(*i, osdmap);
(*i)->session_dispatch_lock.Unlock();
(*i)->put();
}

// remove any PGs which we no longer host from the session waiting_for_pg lists
set<spg_t> pgs_to_check;
get_pgs_with_waiting_sessions(&pgs_to_check);
for (set<spg_t>::iterator p = pgs_to_check.begin();
p != pgs_to_check.end();
++p) {
vector<int> acting;
int nrep = osdmap->pg_to_acting_osds(p->pgid, acting);
int role = osdmap->calc_pg_role(whoami, acting, nrep);

if (role < 0) {
set<Session*> concerned_sessions;
get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
for (set<Session*>::iterator i = concerned_sessions.begin();
i != concerned_sessions.end();
++i) {
{
Mutex::Locker l((*i)->session_dispatch_lock);
session_notify_pg_cleared(*i, osdmap, *p);
}
(*i)->put();
dispatch_sessions_waiting_on_map();

// remove any PGs which we no longer host from the waiting_for_pg list
set<spg_t> pgs_to_delete;
{
RWLock::RLocker l(pg_map_lock);
map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
while (p != waiting_for_pg.end()) {
spg_t pgid = p->first;

vector<int> acting;
int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting);
int role = osdmap->calc_pg_role(whoami, acting, nrep);

if (role < 0) {
pgs_to_delete.insert(p->first);
/* we can delete list contents under the read lock because
* nobody will be adding to them -- everybody is now using a map
* new enough that they will simply drop ops instead of adding
* them to the list. */
dout(10) << " discarding waiting ops for " << pgid << dendl;
while (!p->second.empty()) {
p->second.pop_front();
}
}
++p;
}
}
{
RWLock::WLocker l(pg_map_lock);
for (set<spg_t>::iterator i = pgs_to_delete.begin();
i != pgs_to_delete.end();
++i) {
map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.find(*i);
assert(p->second.empty());
waiting_for_pg.erase(p);
}
}


// scan pg's
{
Expand Down Expand Up @@ -8282,7 +8210,6 @@ struct C_CompleteSplits : public Context {
osd->service.complete_split(to_complete);
}
osd->pg_map_lock.put_write();
osd->wake_pg_waiters(&**i, (*i)->info.pgid);
osd->dispatch_context_transaction(rctx, &**i);
to_complete.insert((*i)->info.pgid);
(*i)->unlock();
Expand Down
Loading

0 comments on commit 59379a4

Please sign in to comment.