Skip to content

Commit

Permalink
Merge pull request ceph#14414 from Adirl/is_ready
Browse files Browse the repository at this point in the history
msg/async: Postpone bind if network stack is not ready

Reviewed-by: Haomai Wang <[email protected]>
  • Loading branch information
liewegas authored Apr 19, 2017
2 parents 4e8dd17 + 972c741 commit abd52cb
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 31 deletions.
23 changes: 21 additions & 2 deletions src/msg/async/AsyncMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,15 @@ void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;

stack->start();
stack->ready();
if (pending_bind) {
int err = bind(pending_bind_addr);
if (err) {
lderr(cct) << __func__ << " postponed bind failed" << dendl;
ceph_abort();
}
}

Mutex::Locker l(lock);
for (auto &&p : processors)
p->start();
Expand Down Expand Up @@ -321,12 +329,23 @@ int AsyncMessenger::shutdown()
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
{
lock.Lock();
if (started) {

if (!pending_bind && started) {
ldout(cct,10) << __func__ << " already started" << dendl;
lock.Unlock();
return -1;
}

ldout(cct,10) << __func__ << " bind " << bind_addr << dendl;

if (!stack->is_ready()) {
ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
pending_bind_addr = bind_addr;
pending_bind = true;
lock.Unlock();
return 0;
}

lock.Unlock();

// bind to a socket
Expand Down
11 changes: 11 additions & 0 deletions src/msg/async/AsyncMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ class AsyncMessenger : public SimplePolicyMessenger {
// maybe this should be protected by the lock?
bool need_addr;

/**
* set to bind address if bind was called before NetworkStack was ready to
* bind
*/
entity_addr_t pending_bind_addr;

/**
* false; set to true if a pending bind exists
*/
bool pending_bind = false;

/**
* The following aren't lock-protected since you shouldn't be able to race
* the only writers.
Expand Down
3 changes: 2 additions & 1 deletion src/msg/async/Stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ class NetworkStack : public CephContext::ForkWatcher {
protected:
CephContext *cct;
vector<Worker*> workers;
// Used to indicate whether thread started

explicit NetworkStack(CephContext *c, const string &t);
public:
Expand Down Expand Up @@ -337,6 +336,8 @@ class NetworkStack : public CephContext::ForkWatcher {
start();
}

virtual bool is_ready() { return true; };
virtual void ready() { };
};

#endif //CEPH_MSG_ASYNC_STACK_H
15 changes: 14 additions & 1 deletion src/msg/async/rdma/Infiniband.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,


Infiniband::Infiniband(CephContext *cct)
: device_list(new DeviceList(cct, this))
: cct(cct), lock("IB lock")
{
}

Expand All @@ -600,6 +600,19 @@ Infiniband::~Infiniband()
delete device_list;
}

void Infiniband::init()
{
Mutex::Locker l(lock);

if (initialized)
return;

device_list = new DeviceList(cct, this);
initialized = true;

dispatcher->polling_start();
}

void Infiniband::set_dispatcher(RDMADispatcher *d)
{
assert(!d ^ !dispatcher);
Expand Down
6 changes: 5 additions & 1 deletion src/msg/async/rdma/Infiniband.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ class Infiniband {
};

private:
DeviceList *device_list;
CephContext *cct;
Mutex lock;
bool initialized = false;
DeviceList *device_list = nullptr;
RDMADispatcher *dispatcher = nullptr;

public:
explicit Infiniband(CephContext *c);
~Infiniband();
void init();

void set_dispatcher(RDMADispatcher *d);

Expand Down
27 changes: 4 additions & 23 deletions src/msg/async/rdma/RDMAStack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);

cct->register_fork_watcher(this);
}

void RDMADispatcher::polling_start()
Expand Down Expand Up @@ -283,26 +281,6 @@ void RDMADispatcher::erase_qpn(uint32_t qpn)
erase_qpn_lockless(qpn);
}

void RDMADispatcher::handle_pre_fork()
{
polling_stop();
done = false;

global_infiniband->handle_pre_fork();

global_infiniband.destroy();
}

void RDMADispatcher::handle_post_fork()
{
if (!global_infiniband) {
global_infiniband.construct(cct);
global_infiniband->set_dispatcher(this);
}

polling_start();
}

void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
Expand Down Expand Up @@ -410,6 +388,8 @@ void RDMAWorker::initialize()

int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
global_infiniband->init();

auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
Expand All @@ -423,6 +403,8 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket

int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
global_infiniband->init();

RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);

Expand Down Expand Up @@ -507,7 +489,6 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
dispatcher->polling_start();

unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
Expand Down
9 changes: 6 additions & 3 deletions src/msg/async/rdma/RDMAStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ enum {
};


class RDMADispatcher : public CephContext::ForkWatcher {
class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;

Expand Down Expand Up @@ -125,8 +125,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
void notify_pending_workers();
virtual void handle_pre_fork() override;
virtual void handle_post_fork() override;
void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);

Expand Down Expand Up @@ -197,6 +195,8 @@ class RDMAStack : public NetworkStack {
RDMADispatcher *dispatcher;
PerfCounters *perf_counter;

std::atomic<bool> fork_finished = {false};

public:
explicit RDMAStack(CephContext *cct, const string &t);
virtual ~RDMAStack();
Expand All @@ -206,5 +206,8 @@ class RDMAStack : public NetworkStack {
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
RDMADispatcher *get_dispatcher() { return dispatcher; }

virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};
#endif

0 comments on commit abd52cb

Please sign in to comment.