Skip to content

Commit

Permalink
Merge pull request ceph#653 from ceph/wip-mon-auth
Browse files Browse the repository at this point in the history
mon: Monitor: dissociate msg handling from session and connection logic

Reviewed-by: Sage Weil <[email protected]>
  • Loading branch information
Sage Weil committed Oct 3, 2013
2 parents 58d0a1f + f1e2393 commit de788bd
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 90 deletions.
206 changes: 116 additions & 90 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2561,67 +2561,98 @@ bool Monitor::_ms_dispatch(Message *m)
EntityName entity_name;
bool src_is_mon;

src_is_mon = !connection || (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);

if (connection) {
bool reuse_caps = false;
dout(20) << "have connection" << dendl;
s = static_cast<MonSession *>(connection->get_priv());
if (s && s->closed) {
caps = s->caps;
reuse_caps = true;
s->put();
s = NULL;
// regardless of who we are or who the sender is, the message must
// have a connection associated. If it doesn't then something fishy
// is going on.
assert(connection);

src_is_mon = (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);

bool reuse_caps = false;
dout(20) << "have connection" << dendl;
s = static_cast<MonSession *>(connection->get_priv());
if (s && s->closed) {
caps = s->caps;
reuse_caps = true;
s->put();
s = NULL;
}
if (!s) {
// if the sender is not a monitor, make sure their first message for a
// session is an MAuth. If it is not, assume it's a stray message,
// and considering that we are creating a new session it is safe to
// assume that the sender hasn't authenticated yet, so we have no way
// of assessing whether we should handle it or not.
if (!src_is_mon && m->get_type() != CEPH_MSG_AUTH) {
dout(1) << __func__ << " dropping stray message " << *m
<< " from " << m->get_source_inst() << dendl;
m->put();
return false;
}
if (!s) {
if (!exited_quorum.is_zero() && !src_is_mon) {
waitlist_or_zap_client(m);
return true;
}
dout(10) << "do not have session, making new one" << dendl;
s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
m->get_connection()->set_priv(s->get());
dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;

if (m->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
dout(10) << "setting timeout on session" << dendl;
// set an initial timeout here, so we will trim this session even if they don't
// do anything.
s->until = ceph_clock_now(g_ceph_context);
s->until += g_conf->mon_subscribe_interval;
} else {
//give it monitor caps; the peer type has been authenticated
reuse_caps = false;
dout(5) << "setting monitor caps on this connection" << dendl;
if (!s->caps.is_allow_all()) //but no need to repeatedly copy
s->caps = *mon_caps;
}
if (reuse_caps)
s->caps = caps;

if (!exited_quorum.is_zero() && !src_is_mon) {
waitlist_or_zap_client(m);
return true;
}

dout(10) << "do not have session, making new one" << dendl;
s = session_map.new_session(m->get_source_inst(), m->get_connection().get());
m->get_connection()->set_priv(s->get());
dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;

if (!src_is_mon) {
dout(10) << "setting timeout on session" << dendl;
// set an initial timeout here, so we will trim this session even if they don't
// do anything.
s->until = ceph_clock_now(g_ceph_context);
s->until += g_conf->mon_subscribe_interval;
} else {
dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
//give it monitor caps; the peer type has been authenticated
reuse_caps = false;
dout(5) << "setting monitor caps on this connection" << dendl;
if (!s->caps.is_allow_all()) //but no need to repeatedly copy
s->caps = *mon_caps;
}
if (reuse_caps)
s->caps = caps;
} else {
dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
}

if (s) {
if (s->auth_handler) {
entity_name = s->auth_handler->get_entity_name();
}
}

if (s)
dout(20) << " caps " << s->caps.get_str() << dendl;
}

if (is_synchronizing() && !src_is_mon) {
waitlist_or_zap_client(m);
return true;
}

{
switch (m->get_type()) {

ret = dispatch(s, m, src_is_mon);

if (s) {
s->put();
}

return ret;
}

bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
{
bool ret = true;

assert(m != NULL);

switch (m->get_type()) {

case MSG_ROUTE:
handle_route(static_cast<MRoute*>(m));
break;

// misc
// misc
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map(static_cast<MMonGetMap*>(m));
break;
Expand All @@ -2647,12 +2678,11 @@ bool Monitor::_ms_dispatch(Message *m)
case MSG_MON_SYNC:
handle_sync(static_cast<MMonSync*>(m));
break;

case MSG_MON_SCRUB:
handle_scrub(static_cast<MMonScrub*>(m));
break;

// OSDs
// OSDs
case MSG_OSD_MARK_ME_DOWN:
case MSG_OSD_FAILURE:
case MSG_OSD_BOOT:
Expand All @@ -2665,20 +2695,20 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;

// MDSs
// MDSs
case MSG_MDS_BEACON:
case MSG_MDS_OFFLOAD_TARGETS:
paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m);
break;

// auth
// auth
case MSG_MON_GLOBAL_ID:
case CEPH_MSG_AUTH:
/* no need to check caps here */
paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
break;

// pg
// pg
case CEPH_MSG_STATFS:
case MSG_PGSTATS:
case MSG_GETPOOLSTATS:
Expand All @@ -2689,7 +2719,7 @@ bool Monitor::_ms_dispatch(Message *m)
paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
break;

// log
// log
case MSG_LOG:
paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
break;
Expand All @@ -2698,60 +2728,60 @@ bool Monitor::_ms_dispatch(Message *m)
clog.handle_log_ack((MLogAck*)m);
break;

// monmap
// monmap
case MSG_MON_JOIN:
paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
break;

// paxos
// paxos
case MSG_MON_PAXOS:
{
MMonPaxos *pm = static_cast<MMonPaxos*>(m);
if (!src_is_mon &&
!s->is_capable("mon", MON_CAP_X)) {
//can't send these!
pm->put();
break;
}
MMonPaxos *pm = static_cast<MMonPaxos*>(m);
if (!src_is_mon ||
!s->is_capable("mon", MON_CAP_X)) {
//can't send these!
pm->put();
break;
}

if (state == STATE_SYNCHRONIZING) {
// we are synchronizing. These messages would do us no
// good, thus just drop them and ignore them.
dout(10) << __func__ << " ignore paxos msg from "
<< pm->get_source_inst() << dendl;
pm->put();
break;
}
if (state == STATE_SYNCHRONIZING) {
// we are synchronizing. These messages would do us no
// good, thus just drop them and ignore them.
dout(10) << __func__ << " ignore paxos msg from "
<< pm->get_source_inst() << dendl;
pm->put();
break;
}

// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
pm->put();
break;
}
if (pm->epoch != get_epoch()) {
pm->put();
break;
}
// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
pm->put();
break;
}
if (pm->epoch != get_epoch()) {
pm->put();
break;
}

paxos->dispatch((PaxosServiceMessage*)m);
paxos->dispatch((PaxosServiceMessage*)m);
}
break;

// elector messages
// elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
if (s &&
!s->is_capable("mon", MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
<< s->caps << dendl;
m->put();
break;
!s->is_capable("mon", MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
<< s->caps << dendl;
m->put();
break;
}
if (!is_probing() && !is_synchronizing()) {
elector.dispatch(m);
elector.dispatch(m);
} else {
m->put();
m->put();
}
break;

Expand All @@ -2769,10 +2799,6 @@ bool Monitor::_ms_dispatch(Message *m)

default:
ret = false;
}
}
if (s) {
s->put();
}

return ret;
Expand Down
2 changes: 2 additions & 0 deletions src/mon/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ class Monitor : public Dispatcher {
lock.Unlock();
return ret;
}
// dissociate message handling from session and connection logic
bool dispatch(MonSession *s, Message *m, const bool src_is_mon);
//mon_caps is used for un-connected messages from monitors
MonCap * mon_caps;
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
Expand Down

0 comments on commit de788bd

Please sign in to comment.