Skip to content

Commit

Permalink
mds: drop purge step when closing client sessions
Browse files Browse the repository at this point in the history
The purge step when closing client sessions is pointless.  If the
MDS is alive, we know which inos were used and which weren't.  It
is only the _journal_ that doesn't necessarily know which have
been recently allocated: any cleanup needs to happen on journal
replay during mds recovery.

Also clean up session states some more.
  • Loading branch information
liewegas committed Mar 23, 2010
1 parent b107e5b commit 205e883
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 65 deletions.
80 changes: 35 additions & 45 deletions src/mds/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,15 @@ void Server::handle_client_session(MClientSession *m)
__u64 sseq = 0;
switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
if (session->is_opening() || session->is_open()) {
dout(10) << "already open|opening, dropping this req" << dendl;
if (session->is_opening() ||
session->is_open() ||
session->is_stale() ||
session->is_killing()) {
dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
return;
}
assert(session->is_closed() ||
session->is_closing());
sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
pv = ++mds->sessionmap.projected;
Expand All @@ -202,8 +207,8 @@ void Server::handle_client_session(MClientSession *m)
break;

case CEPH_SESSION_REQUEST_RENEWCAPS:
if (session->is_open() || session->is_stale()) {
assert(session->is_stale() || session->is_open());
if (session->is_open() ||
session->is_stale()) {
mds->sessionmap.touch_session(session);
if (session->is_stale()) {
mds->sessionmap.set_state(session, Session::STATE_OPEN);
Expand All @@ -218,18 +223,23 @@ void Server::handle_client_session(MClientSession *m)

case CEPH_SESSION_REQUEST_CLOSE:
{
if (session->is_closed() || session->is_closing()) {
dout(10) << "already closed|closing, dropping this req" << dendl;
if (session->is_closed() ||
session->is_closing() ||
session->is_killing()) {
dout(10) << "already closed|closing|killing, dropping this req" << dendl;
return;
}
assert(session->is_open() ||
session->is_stale() ||
session->is_opening());
if (m->get_seq() < session->get_push_seq()) {
dout(10) << "old push seq " << m->get_seq() << " < " << session->get_push_seq()
<< ", dropping" << dendl;
return;
}
if (m->get_seq() != session->get_push_seq()) {
dout(0) << "old push seq " << m->get_seq() << " != " << session->get_push_seq()
<< ", BUGGY!" << dendl;
<< ", BUGGY!" << dendl;
assert(0);
}
sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
Expand All @@ -254,7 +264,8 @@ void Server::handle_client_session(MClientSession *m)
}
}

void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv,
interval_set<inodeno_t>& inos, version_t piv)
{
dout(10) << "_session_logged " << session->inst << " state_seq " << state_seq << " " << (open ? "open":"close")
<< " " << pv << dendl;
Expand All @@ -268,7 +279,8 @@ void Server::_session_logged(Session *session, __u64 state_seq, bool open, versi
assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
} else if (session->is_closing() || session->is_stale_closing()) {
} else if (session->is_closing() ||
session->is_killing()) {
// kill any lingering capabilities, leases, requests
while (!session->caps.empty()) {
Capability *cap = session->caps.front();
Expand Down Expand Up @@ -298,10 +310,12 @@ void Server::_session_logged(Session *session, __u64 state_seq, bool open, versi
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), session->inst);
mds->sessionmap.set_state(session, Session::STATE_CLOSED);
session->clear();
} else if (session->is_stale_closing()) {
} else if (session->is_killing()) {
// destroy session, close connection
mds->messenger->mark_down(session->inst.addr);
mds->sessionmap.remove_session(session);
} else {
assert(0);
}
} else {
assert(0);
Expand All @@ -317,7 +331,9 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm)
<< dendl;
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
Session *session = mds->sessionmap.get_or_add_session(p->second);
if (session->is_closed() || session->is_closing())
if (session->is_closed() ||
session->is_closing() ||
session->is_killing())
mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
}
Expand Down Expand Up @@ -357,7 +373,9 @@ void Server::terminate_sessions()
p != sessions.end();
++p) {
Session *session = *p;
if (session->is_closing() || session->is_closed())
if (session->is_closing() ||
session->is_killing() ||
session->is_closed())
continue;
__u64 sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
version_t pv = ++mds->sessionmap.projected;
Expand All @@ -368,15 +386,6 @@ void Server::terminate_sessions()
}


struct C_MDS_session_purged : public Context {
Server *server;
Session *session;
C_MDS_session_purged(Server *s, Session *ss) : server(s), session(ss) {}
void finish(int r) {
server->_finish_session_purge(session);
}
};

void Server::find_idle_sessions()
{
dout(10) << "find_idle_sessions" << dendl;
Expand Down Expand Up @@ -439,33 +448,13 @@ void Server::find_idle_sessions()
void Server::end_session(Session *session)
{
assert(session);
mds->sessionmap.set_state(session, Session::STATE_STALE_PURGING);
if (session->prealloc_inos.empty()) {
_finish_session_purge(session);
} else {
C_Gather *fin = new C_Gather(new C_MDS_session_purged(this, session));
for (map<inodeno_t,inodeno_t>::iterator p = session->prealloc_inos.m.begin();
p != session->prealloc_inos.m.end();
p++) {
inodeno_t last = p->first + p->second;
for (inodeno_t i = p->first; i < last; i = i + 1)
mds->mdcache->purge_prealloc_ino(i, fin->new_sub());
}
}
}

void Server::_finish_session_purge(Session *session)
{
dout(10) << "_finish_session_purge " << session->inst << dendl;
assert(session->is_stale_purging());
__u64 sseq = mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
__u64 sseq = mds->sessionmap.set_state(session, Session::STATE_KILLING);
version_t pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(session->inst, false, pv),
new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
}


void Server::reconnect_clients()
{
mds->sessionmap.get_client_set(client_reconnect_gather);
Expand Down Expand Up @@ -993,9 +982,10 @@ void Server::handle_client_request(MClientRequest *req)
delete req;
return;
}
if (session->is_closed() || session->is_closing() || session->is_stale_purging() ||
session->is_stale_closing()) {
dout(5) << "session closed|closing|stale_purging|stale_closing, dropping" << dendl;
if (session->is_closed() ||
session->is_closing() ||
session->is_killing()) {
dout(5) << "session closed|closing|killing, dropping" << dendl;
delete req;
return;
}
Expand Down
1 change: 0 additions & 1 deletion src/mds/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class Server {
void handle_client_session(class MClientSession *m);
void _session_logged(Session *session, __u64 state_seq,
bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
void _finish_session_purge(Session *);
version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm);
void finish_force_open_sessions(map<client_t,entity_inst_t> &cm);
void terminate_sessions();
Expand Down
3 changes: 1 addition & 2 deletions src/mds/SessionMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ void SessionMap::encode(bufferlist& bl)
if (p->second->is_open() ||
p->second->is_closing() ||
p->second->is_stale() ||
p->second->is_stale_purging() ||
p->second->is_stale_closing()) {
p->second->is_killing()) {
::encode(p->first, bl);
p->second->encode(bl);
}
Expand Down
34 changes: 17 additions & 17 deletions src/mds/SessionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,28 @@ class Session : public RefCountedObject {
// -- state etc --
public:
/*
closed <---------------------+
| |
v |
opening <---+ |
| | |
v | |
open --> closing ------------+
|^
v|
stale -> stale_purging --> stale_closing --> <deleted>
+-----------------------------------+
| |
closed <---------------------+ |
| | |
v | |
opening <---+ | |
| | | |
v | | |
open --> closing ------------+ |
|^ |
v| v
stale -> killing --------------> <deleted>
(closed) -> importing -> (opening or open or <deleted>)
*/
static const int STATE_CLOSED = 0;
static const int STATE_OPENING = 1; // journaling open
static const int STATE_OPEN = 2;
static const int STATE_CLOSING = 3; // journaling close
static const int STATE_STALE = 4;
static const int STATE_STALE_PURGING = 5;
static const int STATE_STALE_CLOSING = 6;
static const int STATE_KILLING = 5;

const char *get_state_name(int s) {
switch (s) {
Expand All @@ -71,8 +73,7 @@ class Session : public RefCountedObject {
case STATE_OPEN: return "open";
case STATE_CLOSING: return "closing";
case STATE_STALE: return "stale";
case STATE_STALE_PURGING: return "stale_purging";
case STATE_STALE_CLOSING: return "stale_closing";
case STATE_KILLING: return "killing";
default: return "???";
}
}
Expand Down Expand Up @@ -126,8 +127,7 @@ class Session : public RefCountedObject {
bool is_open() { return state == STATE_OPEN; }
bool is_closing() { return state == STATE_CLOSING; }
bool is_stale() { return state == STATE_STALE; }
bool is_stale_purging() { return state == STATE_STALE_PURGING; }
bool is_stale_closing() { return state == STATE_STALE_CLOSING; }
bool is_killing() { return state == STATE_KILLING; }

// -- caps --
private:
Expand Down

0 comments on commit 205e883

Please sign in to comment.