Skip to content

Commit

Permalink
Merge pull request ceph#17018 from alex-mikheev/wip_rdma_rxpool_2
Browse files Browse the repository at this point in the history
msg/async/rdma: refactor rx buffer pool allocator

Reviewed-by: Haomai Wang <[email protected]>
  • Loading branch information
yuyuyu101 authored Sep 7, 2017
2 parents f618508 + 2e44fde commit f3bb91e
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 50 deletions.
124 changes: 92 additions & 32 deletions src/msg/async/rdma/Infiniband.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,45 +633,103 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
return r;
}

Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
{
/* unlimited */
if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
return true;

if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
n_bufs_allocated << " requested: " << nbufs <<
" limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
return false;
}

return true;
}

void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
perf_logger = logger;
if (perf_logger != nullptr)
perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
}

unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0;
void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
{
n_bufs_allocated += nbufs;

if (!perf_logger)
return;

if (nbufs > 0) {
perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
} else {
perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
}
}

char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
void *Infiniband::MemoryManager::mem_pool::slow_malloc()
{
void *p;

Mutex::Locker l(PoolAllocator::lock);
PoolAllocator::g_ctx = ctx;
// this will trigger pool expansion via PoolAllocator::malloc()
p = boost::pool<PoolAllocator>::malloc();
PoolAllocator::g_ctx = nullptr;
return p;
}

Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock");

// lock is taken by mem_pool::slow_malloc()
char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
{
mem_info *m;
Chunk *ch;
size_t rx_buf_size;
unsigned nbufs;
MemoryManager *manager;
CephContext *cct;

rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
assert(g_ctx);
manager = g_ctx->manager;
cct = manager->cct;
rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
nbufs = bytes/rx_buf_size;

if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
if (!g_ctx->can_alloc(nbufs))
return NULL;
}

m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
if (!m)
if (!m) {
lderr(cct) << __func__ << "failed to allocate " <<
bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
return NULL;
}

m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
if (m->mr == NULL) {
lderr(cct) << __func__ << "failed to register " <<
bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl;
manager->free(m);
return NULL;
}

m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
assert(m->mr);
m->nbufs = nbufs;
// save this chunk context
m->ctx = g_ctx;

n_bufs_allocated += nbufs;
// note that the memory can be allocated before perf logger is set
if (perf_logger)
perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
g_ctx->update_stats(nbufs);

/* initialize chunks */
ch = m->chunks;
for (unsigned i = 0; i < nbufs; i++) {
ch->lkey = m->mr->lkey;
ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size;
ch->bytes = cct->_conf->ms_async_rdma_buffer_size;
ch->offset = 0;
ch->buffer = ch->data; // TODO: refactor tx and remove buffer
ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
Expand All @@ -681,21 +739,21 @@ char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
}


void Infiniband::MemoryManager::RxAllocator::free(char * const block)
void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
{
mem_info *m;
Mutex::Locker l(lock);

m = reinterpret_cast<mem_info *>(block) - 1;
n_bufs_allocated -= m->nbufs;
if (perf_logger)
perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
m->ctx->update_stats(-m->nbufs);
ibv_dereg_mr(m->mr);
manager->free(m);
m->ctx->manager->free(m);
}

Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
: cct(c), device(d), pd(p),
rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
rxbuf_pool_ctx(this),
rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
c->_conf->ms_async_rdma_receive_buffers > 0 ?
// if possible make initial pool size 2 * receive_queue_len
// that way there will be no pool expansion upon receive of the
Expand All @@ -705,10 +763,6 @@ Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDo
// rx pool is infinite, we can set any initial size that we want
2 * c->_conf->ms_async_rdma_receive_queue_len)
{
RxAllocator::set_memory_manager(this);
// remember the setting because cct may not be available when
// global infiniband is destroyed
hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
}

Infiniband::MemoryManager::~MemoryManager()
Expand Down Expand Up @@ -745,15 +799,15 @@ void Infiniband::MemoryManager::huge_pages_free(void *ptr)

void* Infiniband::MemoryManager::malloc(size_t size)
{
if (hp_enabled)
if (cct->_conf->ms_async_rdma_enable_hugepage)
return huge_pages_malloc(size);
else
return std::malloc(size);
}

void Infiniband::MemoryManager::free(void *ptr)
{
if (hp_enabled)
if (cct->_conf->ms_async_rdma_enable_hugepage)
huge_pages_free(ptr);
else
std::free(ptr);
Expand Down Expand Up @@ -855,8 +909,6 @@ void Infiniband::init()
ceph_abort();
}

MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);

tx_queue_len = device->device_attr->max_qp_wr;
if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
Expand Down Expand Up @@ -932,7 +984,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
return qp;
}

void Infiniband::post_chunks_to_srq(int num)
int Infiniband::post_chunks_to_srq(int num)
{
int ret, i = 0;
ibv_sge isge[num];
Expand All @@ -941,8 +993,15 @@ void Infiniband::post_chunks_to_srq(int num)

while (i < num) {
chunk = get_memory_manager()->get_rx_buffer();

assert (chunk != NULL);
if (chunk == NULL) {
lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num <<
" rx buffers. Got " << i << dendl;
if (i == 0)
return 0;
// if we got some buffers post them and hope for the best
rx_work_request[i-1].next = 0;
break;
}

isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
isge[i].length = chunk->bytes;
Expand All @@ -962,6 +1021,7 @@ void Infiniband::post_chunks_to_srq(int num)
ibv_recv_wr *badworkrequest;
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
return i;
}

Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
Expand Down
74 changes: 57 additions & 17 deletions src/msg/async/rdma/Infiniband.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,33 +241,68 @@ class Infiniband {
Chunk* chunk_base = nullptr;
};

class RxAllocator {
class MemPoolContext {
PerfCounters *perf_logger;

public:
MemoryManager *manager;
unsigned n_bufs_allocated;
// true if it is possible to alloc
// more memory for the pool
MemPoolContext(MemoryManager *m) :
perf_logger(nullptr),
manager(m),
n_bufs_allocated(0) {}
bool can_alloc(unsigned nbufs);
void update_stats(int val);
void set_stat_logger(PerfCounters *logger);
};

class PoolAllocator {
struct mem_info {
ibv_mr *mr;
MemPoolContext *ctx;
unsigned nbufs;
Chunk chunks[0];
};
static MemoryManager *manager;
static unsigned n_bufs_allocated;
static unsigned max_bufs;
static PerfCounters *perf_logger;
public:
typedef std::size_t size_type;
typedef std::ptrdiff_t difference_type;

static char * malloc(const size_type bytes);
static void free(char * const block);

static void set_memory_manager(MemoryManager *m) {
manager = m;
}
static void set_max_bufs(int n) {
max_bufs = n;
}
static MemPoolContext *g_ctx;
static Mutex lock;
};

static void set_perf_logger(PerfCounters *logger) {
perf_logger = logger;
perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
/**
* modify boost pool so that it is possible to
* have a thread safe 'context' when allocating/freeing
* the memory. It is needed to allow a different pool
* configurations and bookkeeping per CephContext and
* also to be able to use same allocator to deal with
* RX and TX pool.
* TODO: use boost pool to allocate TX chunks too
*/
class mem_pool : public boost::pool<PoolAllocator> {
private:
MemPoolContext *ctx;
void *slow_malloc();

public:
explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size,
const size_type nnext_size = 32,
const size_type nmax_size = 0) :
pool(nrequested_size, nnext_size, nmax_size),
ctx(ctx) { }

void *malloc() {
if (!store().empty())
return (store().malloc)();
// need to alloc more memory...
// slow path code
return slow_malloc();
}
};

Expand Down Expand Up @@ -296,6 +331,10 @@ class Infiniband {
rxbuf_pool.free(chunk);
}

void set_rx_stat_logger(PerfCounters *logger) {
rxbuf_pool_ctx.set_stat_logger(logger);
}

CephContext *cct;
private:
// TODO: Cluster -> TxPool txbuf_pool
Expand All @@ -304,8 +343,8 @@ class Infiniband {
Cluster* send;// SEND
Device *device;
ProtectionDomain *pd;
boost::pool<RxAllocator> rxbuf_pool;
bool hp_enabled;
MemPoolContext rxbuf_pool_ctx;
mem_pool rxbuf_pool;

void* huge_pages_malloc(size_t size);
void huge_pages_free(void *ptr);
Expand Down Expand Up @@ -454,7 +493,8 @@ class Infiniband {
typedef MemoryManager::Chunk Chunk;
QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
void post_chunks_to_srq(int);
// post rx buffers to srq, return number of buffers actually posted
int post_chunks_to_srq(int num);
void post_chunk_to_pool(Chunk* chunk) {
get_memory_manager()->release_rx_buffer(chunk);
}
Expand Down
12 changes: 11 additions & 1 deletion src/msg/async/rdma/RDMAStack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ void RDMADispatcher::polling_start()
if (t.joinable())
return; // dispatcher thread already running

get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);

tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
assert(tx_cc);
rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
Expand Down Expand Up @@ -158,6 +160,12 @@ void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
Mutex::Locker l(lock);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
// handle a case when we have a limited number of
// rx buffers and we could not post a required amount when polling
if (post_backlog > 0) {
ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
}
}

void RDMADispatcher::polling()
Expand Down Expand Up @@ -189,7 +197,9 @@ void RDMADispatcher::polling()
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);

Mutex::Locker l(lock);//make sure connected socket alive when pass wc
get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);

post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);

for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/rdma/RDMAStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RDMADispatcher {
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
int post_backlog = 0;
Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
Expand Down

0 comments on commit f3bb91e

Please sign in to comment.