From 8fa761f7d4659ef226a21c4b909143ac0f6764b9 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Mon, 26 Jun 2017 11:44:39 +0000 Subject: [PATCH 1/2] msg/async/rdma: refactor rx buffer pool allocator Pool allocator now has a context. Each context can have a different configuration. It allows to use buffer pool safely when there are multiple RDMAStack running in parrallel. Do not fail on assertion when out of memory for rx buffers. Instead log warning and try to recover. Signed-off-by: Alex Mikheev Conflicts: src/msg/async/rdma/Infiniband.h --- src/msg/async/rdma/Infiniband.cc | 124 +++++++++++++++++++++++-------- src/msg/async/rdma/Infiniband.h | 96 ++++++++++++++++-------- src/msg/async/rdma/RDMAStack.cc | 12 ++- src/msg/async/rdma/RDMAStack.h | 1 + 4 files changed, 171 insertions(+), 62 deletions(-) diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index a6842f0bf6801..e155a44888e4a 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -633,45 +633,103 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector &chunks, return r; } -Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr; -PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr; +bool Infiniband::MemoryManager::pool_context::can_alloc(unsigned nbufs) +{ + /* unimited */ + if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0) + return true; + + if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) { + lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " << + n_bufs_allocated << " requested: " << nbufs << + " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl; + return false; + } + + return true; +} + +void Infiniband::MemoryManager::pool_context::set_stat_logger(PerfCounters *logger) { + perf_logger = logger; + if (perf_logger != nullptr) + perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); +} -unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0; -unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0; +void Infiniband::MemoryManager::pool_context::update_stats(int nbufs) +{ + n_bufs_allocated += nbufs; + + if (!perf_logger) + return; + if (nbufs > 0) { + perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs); + } else { + perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs); + } +} -char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes) +void *Infiniband::MemoryManager::mem_pool::slow_malloc() +{ + void *p; + + Mutex::Locker l(PoolAllocator::lock); + PoolAllocator::g_ctx = ctx; + // this will trigger pool expansion via PoolAllocator::malloc() + p = boost::pool::malloc(); + PoolAllocator::g_ctx = nullptr; + return p; +} + +Infiniband::MemoryManager::pool_context *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; +Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock"); + +// lock is taken by mem_pool::slow_malloc() +char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes) { mem_info *m; Chunk *ch; size_t rx_buf_size; unsigned nbufs; + MemoryManager *manager; + CephContext *cct; - rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size; + assert(g_ctx); + manager = g_ctx->manager; + cct = manager->cct; + rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size; nbufs = bytes/rx_buf_size; - if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) { + if (!g_ctx->can_alloc(nbufs)) return NULL; - } m = static_cast(manager->malloc(bytes + sizeof(*m))); - if (!m) + if (!m) { + lderr(cct) << __func__ << "failed to allocate " << + bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl; return NULL; + } + + m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); + if (m->mr == NULL) { + lderr(cct) << __func__ << "failed to register " << + bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl; + manager->free(m); + return NULL; + } - m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); - assert(m->mr); m->nbufs = nbufs; + // save this chunk context + m->ctx = g_ctx; - n_bufs_allocated += nbufs; // note that the memory can be allocated before perf logger is set - if (perf_logger) - perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs); + g_ctx->update_stats(nbufs); /* initialize chunks */ ch = m->chunks; for (unsigned i = 0; i < nbufs; i++) { ch->lkey = m->mr->lkey; - ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size; + ch->bytes = cct->_conf->ms_async_rdma_buffer_size; ch->offset = 0; ch->buffer = ch->data; // TODO: refactor tx and remove buffer ch = reinterpret_cast(reinterpret_cast(ch) + rx_buf_size); @@ -681,21 +739,21 @@ char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes) } -void Infiniband::MemoryManager::RxAllocator::free(char * const block) +void Infiniband::MemoryManager::PoolAllocator::free(char * const block) { mem_info *m; + Mutex::Locker l(lock); m = reinterpret_cast(block) - 1; - n_bufs_allocated -= m->nbufs; - if (perf_logger) - perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs); + m->ctx->update_stats(-m->nbufs); ibv_dereg_mr(m->mr); - manager->free(m); + m->ctx->manager->free(m); } Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p) : cct(c), device(d), pd(p), - rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, + rxbuf_pool_ctx(this), + rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, c->_conf->ms_async_rdma_receive_buffers > 0 ? // if possible make initial pool size 2 * receive_queue_len // that way there will be no pool expansion upon receive of the @@ -705,10 +763,6 @@ Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDo // rx pool is infinite, we can set any initial size that we want 2 * c->_conf->ms_async_rdma_receive_queue_len) { - RxAllocator::set_memory_manager(this); - // remember the setting because cct may not be available when - // global infiniband is destroyed - hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage; } Infiniband::MemoryManager::~MemoryManager() @@ -745,7 +799,7 @@ void Infiniband::MemoryManager::huge_pages_free(void *ptr) void* Infiniband::MemoryManager::malloc(size_t size) { - if (hp_enabled) + if (cct->_conf->ms_async_rdma_enable_hugepage) return huge_pages_malloc(size); else return std::malloc(size); @@ -753,7 +807,7 @@ void* Infiniband::MemoryManager::malloc(size_t size) void Infiniband::MemoryManager::free(void *ptr) { - if (hp_enabled) + if (cct->_conf->ms_async_rdma_enable_hugepage) huge_pages_free(ptr); else std::free(ptr); @@ -855,8 +909,6 @@ void Infiniband::init() ceph_abort(); } - MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers); - tx_queue_len = device->device_attr->max_qp_wr; if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) { tx_queue_len = cct->_conf->ms_async_rdma_send_buffers; @@ -932,7 +984,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio return qp; } -void Infiniband::post_chunks_to_srq(int num) +int Infiniband::post_chunks_to_srq(int num) { int ret, i = 0; ibv_sge isge[num]; @@ -941,8 +993,15 @@ void Infiniband::post_chunks_to_srq(int num) while (i < num) { chunk = get_memory_manager()->get_rx_buffer(); - - assert (chunk != NULL); + if (chunk == NULL) { + lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num << + " rx buffers. Got " << i << dendl; + if (i == 0) + return 0; + // if we got some buffers post them and hope for the best + rx_work_request[i-1].next = 0; + break; + } isge[i].addr = reinterpret_cast(chunk->data); isge[i].length = chunk->bytes; @@ -962,6 +1021,7 @@ void Infiniband::post_chunks_to_srq(int num) ibv_recv_wr *badworkrequest; ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest); assert(ret == 0); + return i; } Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index d9b196ba5570e..8a7d1f70ca866 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -241,34 +241,67 @@ class Infiniband { Chunk* chunk_base = nullptr; }; - class RxAllocator { - struct mem_info { - ibv_mr *mr; - unsigned nbufs; - Chunk chunks[0]; - }; - static MemoryManager *manager; - static unsigned n_bufs_allocated; - static unsigned max_bufs; - static PerfCounters *perf_logger; - public: - typedef std::size_t size_type; - typedef std::ptrdiff_t difference_type; - - static char * malloc(const size_type bytes); - static void free(char * const block); + class pool_context { + PerfCounters *perf_logger; + + public: + MemoryManager *manager; + unsigned n_bufs_allocated; + // true if it is possible to alloc + // more memory for the pool + pool_context(MemoryManager *m) : + perf_logger(nullptr), + manager(m), + n_bufs_allocated(0) {} + bool can_alloc(unsigned nbufs); + void update_stats(int val); + void set_stat_logger(PerfCounters *logger); + }; - static void set_memory_manager(MemoryManager *m) { - manager = m; - } - static void set_max_bufs(int n) { - max_bufs = n; - } + class PoolAllocator { + struct mem_info { + ibv_mr *mr; + pool_context *ctx; + unsigned nbufs; + Chunk chunks[0]; + }; + public: + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + + static char * malloc(const size_type bytes); + static void free(char * const block); + + static pool_context *g_ctx; + static Mutex lock; + }; - static void set_perf_logger(PerfCounters *logger) { - perf_logger = logger; - perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); - } + // modify boost pool so that it is possible to + // have a thread safe 'context' when allocating/freeing + // the memory. It is needed to allow a different pool + // configurations and bookkeeping per CephContext and + // also to be able // to use same allocator to deal with + // RX and TX pool. + // TODO: use boost pool to allocate TX chunks too + class mem_pool : public boost::pool { + private: + pool_context *ctx; + void *slow_malloc(); + + public: + explicit mem_pool(pool_context *ctx, const size_type nrequested_size, + const size_type nnext_size = 32, + const size_type nmax_size = 0) : + pool(nrequested_size, nnext_size, nmax_size), + ctx(ctx) { } + + void *malloc() { + if (!store().empty()) + return (store().malloc)(); + // need to alloc more memory... + // slow path code + return slow_malloc(); + } }; MemoryManager(CephContext *c, Device *d, ProtectionDomain *p); @@ -296,6 +329,10 @@ class Infiniband { rxbuf_pool.free(chunk); } + void set_rx_stat_logger(PerfCounters *logger) { + rxbuf_pool_ctx.set_stat_logger(logger); + } + CephContext *cct; private: // TODO: Cluster -> TxPool txbuf_pool @@ -304,8 +341,8 @@ class Infiniband { Cluster* send;// SEND Device *device; ProtectionDomain *pd; - boost::pool rxbuf_pool; - bool hp_enabled; + pool_context rxbuf_pool_ctx; + mem_pool rxbuf_pool; void* huge_pages_malloc(size_t size); void huge_pages_free(void *ptr); @@ -454,7 +491,8 @@ class Infiniband { typedef MemoryManager::Chunk Chunk; QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type); ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); - void post_chunks_to_srq(int); + // post rx buffers to srq, return number of buffers actually posted + int post_chunks_to_srq(int num); void post_chunk_to_pool(Chunk* chunk) { get_memory_manager()->release_rx_buffer(chunk); } diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index dcc6e1a8fb3db..115ff4d3c5e67 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -84,6 +84,8 @@ void RDMADispatcher::polling_start() if (t.joinable()) return; // dispatcher thread already running + get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger); + tx_cc = get_stack()->get_infiniband().create_comp_channel(cct); assert(tx_cc); rx_cc = get_stack()->get_infiniband().create_comp_channel(cct); @@ -155,6 +157,12 @@ void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { Mutex::Locker l(lock); get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); + // handle a case when we have a limited number of + // rx buffers and we could not post a required amount when polling + if (post_backlog > 0) { + ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl; + post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog); + } } void RDMADispatcher::polling() @@ -186,7 +194,9 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); Mutex::Locker l(lock);//make sure connected socket alive when pass wc - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret); + + post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret); + for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 764ea33f39e82..6869d32ef0b18 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -47,6 +47,7 @@ class RDMADispatcher { bool done = false; std::atomic num_dead_queue_pair = {0}; std::atomic num_qp_conn = {0}; + int post_backlog = 0; Mutex lock; // protect `qp_conns`, `dead_queue_pairs` // qp_num -> InfRcConnection // The main usage of `qp_conns` is looking up connection by qp_num, From 2e44fde693c945afbd399838272b62bd8b3bc614 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Sun, 20 Aug 2017 08:52:15 +0000 Subject: [PATCH 2/2] msg/async/rdma: refactor rx buffer pool allocator: cr fixes Signed-off-by: Alex Mikheev --- src/msg/async/rdma/Infiniband.cc | 24 +++---- src/msg/async/rdma/Infiniband.h | 114 ++++++++++++++++--------------- 2 files changed, 70 insertions(+), 68 deletions(-) diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index e155a44888e4a..db2245dd1e1dd 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -633,9 +633,9 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector &chunks, return r; } -bool Infiniband::MemoryManager::pool_context::can_alloc(unsigned nbufs) +bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs) { - /* unimited */ + /* unlimited */ if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0) return true; @@ -649,13 +649,13 @@ bool Infiniband::MemoryManager::pool_context::can_alloc(unsigned nbufs) return true; } -void Infiniband::MemoryManager::pool_context::set_stat_logger(PerfCounters *logger) { +void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) { perf_logger = logger; if (perf_logger != nullptr) perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); } -void Infiniband::MemoryManager::pool_context::update_stats(int nbufs) +void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs) { n_bufs_allocated += nbufs; @@ -671,17 +671,17 @@ void Infiniband::MemoryManager::pool_context::update_stats(int nbufs) void *Infiniband::MemoryManager::mem_pool::slow_malloc() { - void *p; + void *p; - Mutex::Locker l(PoolAllocator::lock); - PoolAllocator::g_ctx = ctx; - // this will trigger pool expansion via PoolAllocator::malloc() - p = boost::pool::malloc(); - PoolAllocator::g_ctx = nullptr; - return p; + Mutex::Locker l(PoolAllocator::lock); + PoolAllocator::g_ctx = ctx; + // this will trigger pool expansion via PoolAllocator::malloc() + p = boost::pool::malloc(); + PoolAllocator::g_ctx = nullptr; + return p; } -Infiniband::MemoryManager::pool_context *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; +Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock"); // lock is taken by mem_pool::slow_malloc() diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 8a7d1f70ca866..00ee99b8c8ad6 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -241,67 +241,69 @@ class Infiniband { Chunk* chunk_base = nullptr; }; - class pool_context { - PerfCounters *perf_logger; - - public: - MemoryManager *manager; - unsigned n_bufs_allocated; - // true if it is possible to alloc - // more memory for the pool - pool_context(MemoryManager *m) : - perf_logger(nullptr), - manager(m), - n_bufs_allocated(0) {} - bool can_alloc(unsigned nbufs); - void update_stats(int val); - void set_stat_logger(PerfCounters *logger); + class MemPoolContext { + PerfCounters *perf_logger; + + public: + MemoryManager *manager; + unsigned n_bufs_allocated; + // true if it is possible to alloc + // more memory for the pool + MemPoolContext(MemoryManager *m) : + perf_logger(nullptr), + manager(m), + n_bufs_allocated(0) {} + bool can_alloc(unsigned nbufs); + void update_stats(int val); + void set_stat_logger(PerfCounters *logger); }; class PoolAllocator { - struct mem_info { - ibv_mr *mr; - pool_context *ctx; - unsigned nbufs; - Chunk chunks[0]; - }; - public: - typedef std::size_t size_type; - typedef std::ptrdiff_t difference_type; - - static char * malloc(const size_type bytes); - static void free(char * const block); - - static pool_context *g_ctx; - static Mutex lock; + struct mem_info { + ibv_mr *mr; + MemPoolContext *ctx; + unsigned nbufs; + Chunk chunks[0]; + }; + public: + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + + static char * malloc(const size_type bytes); + static void free(char * const block); + + static MemPoolContext *g_ctx; + static Mutex lock; }; - // modify boost pool so that it is possible to - // have a thread safe 'context' when allocating/freeing - // the memory. It is needed to allow a different pool - // configurations and bookkeeping per CephContext and - // also to be able // to use same allocator to deal with - // RX and TX pool. - // TODO: use boost pool to allocate TX chunks too + /** + * modify boost pool so that it is possible to + * have a thread safe 'context' when allocating/freeing + * the memory. It is needed to allow a different pool + * configurations and bookkeeping per CephContext and + * also to be able to use same allocator to deal with + * RX and TX pool. + * TODO: use boost pool to allocate TX chunks too + */ class mem_pool : public boost::pool { - private: - pool_context *ctx; - void *slow_malloc(); - - public: - explicit mem_pool(pool_context *ctx, const size_type nrequested_size, - const size_type nnext_size = 32, - const size_type nmax_size = 0) : - pool(nrequested_size, nnext_size, nmax_size), - ctx(ctx) { } - - void *malloc() { - if (!store().empty()) - return (store().malloc)(); - // need to alloc more memory... - // slow path code - return slow_malloc(); - } + private: + MemPoolContext *ctx; + void *slow_malloc(); + + public: + explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size, + const size_type nnext_size = 32, + const size_type nmax_size = 0) : + pool(nrequested_size, nnext_size, nmax_size), + ctx(ctx) { } + + void *malloc() { + if (!store().empty()) + return (store().malloc)(); + // need to alloc more memory... + // slow path code + return slow_malloc(); + } }; MemoryManager(CephContext *c, Device *d, ProtectionDomain *p); @@ -341,7 +343,7 @@ class Infiniband { Cluster* send;// SEND Device *device; ProtectionDomain *pd; - pool_context rxbuf_pool_ctx; + MemPoolContext rxbuf_pool_ctx; mem_pool rxbuf_pool; void* huge_pages_malloc(size_t size);