Skip to content

Commit

Permalink
os/bluestore: introduce new io_uring IO engine
Browse files Browse the repository at this point in the history
This implements low-level IO engine, which utilizes brand-new
io_uring IO interface: https://lwn.net/Articles/776428/

By default libaio is used.  If bluestore_ioring=true is set but kernel
does not support io_uring or architecture is not x86-64, libaio will be
used instead.

In current patch liburing library is used in order not to open code
everything.

In order to compile with liburing WITH_LIBURING=ON should be specified.

Signed-off-by: Roman Penyaev <[email protected]>
  • Loading branch information
Roman Penyaev authored and tchaikov committed Dec 10, 2019
1 parent c05c1bd commit 2268be9
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ if(WITH_BLUESTORE)
endif()

include(CMakeDependentOption)
CMAKE_DEPENDENT_OPTION(WITH_LIBURING "Build with liburing library support" OFF
"WITH_BLUESTORE;HAVE_LIBAIO" OFF)
set(HAVE_LIBURING ${WITH_LIBURING})

CMAKE_DEPENDENT_OPTION(WITH_BLUESTORE_PMEM "Enable PMDK libraries" OFF
"WITH_BLUESTORE" OFF)

Expand Down
4 changes: 4 additions & 0 deletions src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4624,6 +4624,10 @@ std::vector<Option> get_global_options() {
.set_default(0)
.set_description("Space reserved at DB device and not allowed for 'use some extra' policy usage. Overrides 'bluestore_volume_selection_reserved_factor' setting and introduces straightforward limit."),

Option("bluestore_ioring", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(false)
.set_description("Enables Linux io_uring API instead of libaio"),

// -----------------------------------------
// kstore

Expand Down
3 changes: 3 additions & 0 deletions src/include/config-h.in.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
/* Defined if you have libaio */
#cmakedefine HAVE_LIBAIO

/* Defined if you have liburing */
#cmakedefine HAVE_LIBURING

/* Defind if you have POSIX AIO */
#cmakedefine HAVE_POSIXAIO

Expand Down
27 changes: 27 additions & 0 deletions src/os/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ if(WITH_BLUESTORE)
bluestore/StupidAllocator.cc
bluestore/BitmapAllocator.cc
bluestore/AvlAllocator.cc
bluestore/io_uring.cc
)
endif(WITH_BLUESTORE)

Expand Down Expand Up @@ -130,3 +131,29 @@ endif()
if(WITH_EVENTTRACE)
add_dependencies(os eventtrace_tp)
endif()

if(WITH_LIBURING)
include(ExternalProject)
if("${CMAKE_GENERATOR}" MATCHES "Make")
set(make_cmd "$(MAKE)")
else()
set(make_cmd "make")
endif()
ExternalProject_Add(liburing_ext
DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/
GIT_REPOSITORY http://git.kernel.dk/liburing
GIT_TAG "4e360f71131918c36774f51688e5c65dea8d43f2"
SOURCE_DIR ${CMAKE_BINARY_DIR}/src/liburing
CONFIGURE_COMMAND <SOURCE_DIR>/configure
BUILD_COMMAND env CC=${CMAKE_C_COMPILER} ${make_cmd} -C src -s
BUILD_IN_SOURCE 1
INSTALL_COMMAND "")
unset(make_cmd)
add_library(liburing STATIC IMPORTED GLOBAL)
add_dependencies(liburing liburing_ext)
set_target_properties(liburing PROPERTIES
IMPORTED_LINK_INTERFACE_LANGUAGES "C"
IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/liburing/src/liburing.a")
target_link_libraries(os liburing)
target_include_directories(os SYSTEM PRIVATE "${CMAKE_BINARY_DIR}/src/liburing/src/include")
endif(WITH_LIBURING)
15 changes: 14 additions & 1 deletion src/os/bluestore/KernelDevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "common/numa.h"

#include "global/global_context.h"
#include "ceph_io_uring.h"

#define dout_context cct
#define dout_subsys ceph_subsys_bdev
Expand All @@ -54,8 +55,20 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
fd_directs.resize(WRITE_LIFE_MAX, -1);
fd_buffereds.resize(WRITE_LIFE_MAX, -1);

bool use_ioring = g_ceph_context->_conf.get_val<bool>("bluestore_ioring");
unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
io_queue = std::unique_ptr<io_queue_t>(new aio_queue_t(iodepth));

if (use_ioring && ioring_queue_t::supported()) {
io_queue = std::make_unique<ioring_queue_t>(iodepth);
} else {
static bool once;
if (use_ioring && !once) {
derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
<< dendl;
once = true;
}
io_queue = std::make_unique<aio_queue_t>(iodepth);
}
}

int KernelDevice::_lock()
Expand Down
31 changes: 31 additions & 0 deletions src/os/bluestore/ceph_io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include "acconfig.h"

#include "include/types.h"
#include "ceph_aio.h"

struct ioring_data;

struct ioring_queue_t final : public io_queue_t {
std::unique_ptr<ioring_data> d;
unsigned iodepth = 0;

typedef std::list<aio_t>::iterator aio_iter;

// Returns true if arch is x86-64 and kernel supports io_uring
static bool supported();

ioring_queue_t(unsigned iodepth_);
~ioring_queue_t() final;

int init(std::vector<int> &fds) final;
void shutdown() final;

int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
void *priv, int *retries) final;
int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
};
267 changes: 267 additions & 0 deletions src/os/bluestore/io_uring.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "ceph_io_uring.h"

#if defined(HAVE_LIBURING) && defined(__x86_64__)

#include "liburing.h"
#include <sys/epoll.h>

/* Options */

static bool hipri = false; /* use IO polling */
static bool sq_thread = false; /* use kernel submission/poller thread */

struct ioring_data {
struct io_uring io_uring;
pthread_mutex_t cq_mutex;
pthread_mutex_t sq_mutex;
int epoll_fd = -1;
std::map<int, int> fixed_fds_map;
};

static int ioring_get_cqe(struct ioring_data *d, unsigned int max,
struct aio_t **paio)
{
struct io_uring *ring = &d->io_uring;
struct io_uring_cqe *cqe;

unsigned nr = 0;
unsigned head;
io_uring_for_each_cqe(ring, head, cqe) {
struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe);
io->rval = cqe->res;

paio[nr++] = io;

if (nr == max)
break;
}
io_uring_cq_advance(ring, nr);

return nr;
}

static int find_fixed_fd(struct ioring_data *d, int real_fd)
{
auto it = d->fixed_fds_map.find(real_fd);
if (it == d->fixed_fds_map.end())
return -1;

return it->second;
}

static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe,
struct aio_t *io)
{
int fixed_fd = find_fixed_fd(d, io->fd);

ceph_assert(fixed_fd != -1);

if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV)
io_uring_prep_writev(sqe, fixed_fd, &io->iov[0],
io->iov.size(), io->offset);
else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV)
io_uring_prep_readv(sqe, fixed_fd, &io->iov[0],
io->iov.size(), io->offset);
else
ceph_assert(0);

io_uring_sqe_set_data(sqe, io);
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
}

static int ioring_queue(struct ioring_data *d, void *priv,
list<aio_t>::iterator beg, list<aio_t>::iterator end)
{
struct io_uring *ring = &d->io_uring;
struct aio_t *io = nullptr;

ceph_assert(beg != end);

do {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe)
break;

io = &*beg;
io->priv = priv;

init_sqe(d, sqe, io);

} while (++beg != end);

if (!io)
/* Queue is full, go and reap something first */
return 0;

return io_uring_submit(ring);
}

static void build_fixed_fds_map(struct ioring_data *d,
std::vector<int> &fds)
{
int fixed_fd = 0;
for (int real_fd : fds) {
d->fixed_fds_map[real_fd] = fixed_fd++;
}
}

ioring_queue_t::ioring_queue_t(unsigned iodepth_) :
d(make_unique<ioring_data>()),
iodepth(iodepth_)
{
}

ioring_queue_t::~ioring_queue_t()
{
}

int ioring_queue_t::init(std::vector<int> &fds)
{
unsigned flags = 0;

pthread_mutex_init(&d->cq_mutex, NULL);
pthread_mutex_init(&d->sq_mutex, NULL);

if (hipri)
flags |= IORING_SETUP_IOPOLL;
if (sq_thread)
flags |= IORING_SETUP_SQPOLL;

int ret = io_uring_queue_init(iodepth, &d->io_uring, flags);
if (ret < 0)
return ret;

ret = io_uring_register(d->io_uring.ring_fd, IORING_REGISTER_FILES,
&fds[0], fds.size());
if (ret < 0) {
ret = -errno;
goto close_ring_fd;
}

build_fixed_fds_map(d.get(), fds);

d->epoll_fd = epoll_create1(0);
if (d->epoll_fd < 0) {
ret = -errno;
goto close_ring_fd;
}

struct epoll_event ev;
ev.events = EPOLLIN;
ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev);
if (ret < 0) {
ret = -errno;
goto close_epoll_fd;
}

return 0;

close_epoll_fd:
close(d->epoll_fd);
close_ring_fd:
io_uring_queue_exit(&d->io_uring);

return ret;
}

void ioring_queue_t::shutdown()
{
d->fixed_fds_map.clear();
close(d->epoll_fd);
d->epoll_fd = -1;
io_uring_queue_exit(&d->io_uring);
}

int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
uint16_t aios_size, void *priv,
int *retries)
{
(void)aios_size;
(void)retries;

pthread_mutex_lock(&d->sq_mutex);
int rc = ioring_queue(d.get(), priv, beg, end);
pthread_mutex_unlock(&d->sq_mutex);

return rc;
}

int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
{
get_cqe:
pthread_mutex_lock(&d->cq_mutex);
int events = ioring_get_cqe(d.get(), max, paio);
pthread_mutex_unlock(&d->cq_mutex);

if (events == 0) {
struct epoll_event ev;
int ret = epoll_wait(d->epoll_fd, &ev, 1, timeout_ms);
if (ret < 0)
events = -errno;
else if (ret > 0)
/* Time to reap */
goto get_cqe;
}

return events;
}

bool ioring_queue_t::supported()
{
struct io_uring_params p;

memset(&p, 0, sizeof(p));
int fd = io_uring_setup(16, &p);
if (fd < 0)
return false;

close(fd);

return true;
}

#else // #if defined(HAVE_LIBURING) && defined(__x86_64__)

struct ioring_data {};

ioring_queue_t::ioring_queue_t(unsigned iodepth_)
{
ceph_assert(0);
}

ioring_queue_t::~ioring_queue_t()
{
ceph_assert(0);
}

int ioring_queue_t::init(std::vector<int> &fds)
{
ceph_assert(0);
}

void ioring_queue_t::shutdown()
{
ceph_assert(0);
}

int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
uint16_t aios_size, void *priv,
int *retries)
{
ceph_assert(0);
}

int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
{
ceph_assert(0);
}

bool ioring_queue_t::supported()
{
return false;
}

#endif // #if defined(HAVE_LIBURING) && defined(__x86_64__)

0 comments on commit 2268be9

Please sign in to comment.