Skip to content

Commit

Permalink
src: get rid of the Observers throughout the code base.
Browse files Browse the repository at this point in the history
This is a big patch that will remove all references to the observers
throughout the code, including a complete removal of the Observer-related
messages' source files.

Signed-off-by: Joao Eduardo Luis <[email protected]>
  • Loading branch information
Joao Eduardo Luis committed May 17, 2012
1 parent 8413724 commit 035bb12
Show file tree
Hide file tree
Showing 11 changed files with 2 additions and 245 deletions.
1 change: 0 additions & 1 deletion src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
OPTION(paxos_observer_timeout, OPT_DOUBLE, 5*60) // gather updates for this long before proposing a map update
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_supported, OPT_STR, "none")
OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12)
Expand Down
57 changes: 0 additions & 57 deletions src/messages/MMonObserve.h

This file was deleted.

62 changes: 0 additions & 62 deletions src/messages/MMonObserveNotify.h

This file was deleted.

25 changes: 0 additions & 25 deletions src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonObserve.h"
#include "messages/MMonObserveNotify.h"
#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
#include "messages/MMonPaxos.h"
Expand Down Expand Up @@ -1328,25 +1326,6 @@ void Monitor::remove_session(MonSession *s)
}


void Monitor::handle_observe(MMonObserve *m)
{
dout(10) << "handle_observe " << *m << " from " << m->get_source_inst() << dendl;
// check that there are perms. Send a response back if they aren't sufficient,
// and delete the message (if it's not deleted for us, which happens when
// we own the connection to the requested observer).
MonSession *session = m->get_session();
if (!session || !session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
send_reply(m, m);
return;
}
if (m->machine_id >= PAXOS_NUM) {
dout(0) << "register_observer: bad monitor id: " << m->machine_id << dendl;
} else {
paxos[m->machine_id]->register_observer(m->get_orig_source_inst(), m->ver);
}
messenger->send_message(m, m->get_orig_source_inst());
}

void Monitor::send_command(const entity_inst_t& inst,
const vector<string>& com, version_t version)
{
Expand Down Expand Up @@ -1563,10 +1542,6 @@ bool Monitor::_ms_dispatch(Message *m)
}
break;

case MSG_MON_OBSERVE:
handle_observe((MMonObserve *)m);
break;

// elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
Expand Down
2 changes: 0 additions & 2 deletions src/mon/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class AdminSocketHook;
class MMonGetMap;
class MMonGetVersion;
class MMonProbe;
class MMonObserve;
class MMonSubscribe;
class MAuthRotating;
class MRoute;
Expand Down Expand Up @@ -266,7 +265,6 @@ class Monitor : public Dispatcher {
void _mon_status(ostream& ss);
void _quorum_status(ostream& ss);
void handle_command(class MMonCommand *m);
void handle_observe(MMonObserve *m);
void handle_route(MRoute *m);

void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
Expand Down
66 changes: 0 additions & 66 deletions src/mon/Paxos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "MonitorStore.h"

#include "messages/MMonPaxos.h"
#include "messages/MMonObserveNotify.h"

#include "common/config.h"

Expand Down Expand Up @@ -381,7 +380,6 @@ void Paxos::begin(bufferlist& v)
finish_contexts(g_ceph_context, waiting_for_commit);
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
update_observers();
return;
}

Expand Down Expand Up @@ -891,72 +889,8 @@ void Paxos::dispatch(PaxosServiceMessage *m)
default:
assert(0);
}

if (is_readable())
update_observers();
}

void Paxos::register_observer(entity_inst_t inst, version_t v)
{
dout(10) << "register_observer " << inst << " v" << v << dendl;

Observer *observer;
if (observers.count(inst))
observer = observers[inst];
else {
observers[inst] = observer = new Observer(inst, v);
}

utime_t timeout = ceph_clock_now(g_ceph_context);
timeout += g_conf->paxos_observer_timeout;
observer->timeout = timeout;

if (is_readable())
update_observers();
}


void Paxos::update_observers()
{
dout(10) << "update_observers" << dendl;

bufferlist bl;
version_t ver;

map<entity_inst_t, Observer *>::iterator iter = observers.begin();
while (iter != observers.end()) {
Observer *observer = iter->second;

// timed out?
if (ceph_clock_now(g_ceph_context) > observer->timeout) {
delete observer;
observers.erase(iter++);
continue;
}
++iter;

if (observer->last_version == 0 ||
observer->last_version < first_committed) {
ver = get_stashed(bl);
if (ver) {
dout(10) << " sending summary state v" << ver << " to " << observer->inst << dendl;
mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, true),
observer->inst);
observer->last_version = ver;
continue;
}
}

for (ver = observer->last_version + 1; ver <= last_committed; ver++) {
if (read(ver, bl)) {
dout(10) << " sending state v" << ver << " to " << observer->inst << dendl;
mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, false),
observer->inst);
observer->last_version = ver;
}
}
}
}

// -----------------
// service interface
Expand Down
13 changes: 0 additions & 13 deletions src/mon/Paxos.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class Paxos {

friend class Monitor;
friend class PaxosService;
friend class PaxosObserver;

list<std::string> extra_state_dirs;

Expand Down Expand Up @@ -145,15 +144,6 @@ class Paxos {
list<Context*> waiting_for_writeable;
list<Context*> waiting_for_commit;

// observers
struct Observer {
entity_inst_t inst;
version_t last_version;
utime_t timeout;
Observer(entity_inst_t& ei, version_t v) : inst(ei), last_version(v) { }
};
map<entity_inst_t, Observer *> observers;

//synchronization warnings
utime_t last_clock_drift_warn;
int clock_drift_warned;
Expand Down Expand Up @@ -320,9 +310,6 @@ class Paxos {
version_t get_stashed_version() { return latest_stashed; }

version_t get_first_committed() { return first_committed; }

void register_observer(entity_inst_t inst, version_t v);
void update_observers();
};


Expand Down
8 changes: 0 additions & 8 deletions src/msg/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ using namespace std;
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
#include "messages/MMonObserve.h"
#include "messages/MMonObserveNotify.h"

#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
Expand Down Expand Up @@ -298,12 +296,6 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
m = new MMonElection;
break;

case MSG_MON_OBSERVE:
m = new MMonObserve;
break;
case MSG_MON_OBSERVE_NOTIFY:
m = new MMonObserveNotify;
break;
case MSG_LOG:
m = new MLog;
break;
Expand Down
4 changes: 2 additions & 2 deletions src/msg/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#define MSG_MON_COMMAND_ACK 51
#define MSG_LOG 52
#define MSG_LOGACK 53
#define MSG_MON_OBSERVE 54
#define MSG_MON_OBSERVE_NOTIFY 55
//#define MSG_MON_OBSERVE 54
//#define MSG_MON_OBSERVE_NOTIFY 55
#define MSG_CLASS 56
#define MSG_CLASS_ACK 57

Expand Down
4 changes: 0 additions & 4 deletions src/test/encoding/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,6 @@ MESSAGE(MMonGlobalID)
MESSAGE(MMonJoin)
#include "messages/MMonMap.h"
MESSAGE(MMonMap)
#include "messages/MMonObserve.h"
MESSAGE(MMonObserve)
#include "messages/MMonObserveNotify.h"
MESSAGE(MMonObserveNotify)
#include "messages/MMonPaxos.h"
MESSAGE(MMonPaxos)
#include "messages/MMonProbe.h"
Expand Down
5 changes: 0 additions & 5 deletions src/tools/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ OSDMap *osdmap = 0;

#include "mon/mon_types.h"

#include "messages/MMonObserve.h"
#include "messages/MMonObserveNotify.h"
#include "messages/MOSDMap.h"

#include "messages/MCommand.h"
Expand Down Expand Up @@ -525,9 +523,6 @@ bool Admin::ms_dispatch(Message *m) {
void Admin::ms_handle_connect(Connection *con) {
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
ctx->lock.Lock();
if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
// send_observe_requests(ctx);
}
if (pending_cmd.size())
send_command(ctx);
ctx->lock.Unlock();
Expand Down

0 comments on commit 035bb12

Please sign in to comment.