Skip to content

Commit

Permalink
rgw/cache: implementation of put using yield context
Browse files Browse the repository at this point in the history
as completion token and adding throttling to `put_async`
in the cache driver api. Also added a test case to the
ssd driver unit test for `put_async`.

Signed-off-by: Pritha Srivastava <[email protected]>
  • Loading branch information
pritha-srivastava committed Apr 2, 2024
1 parent 68b8cb4 commit 1a44398
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/rgw/rgw_cache_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CacheDriver {
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) = 0;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) = 0;
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) = 0;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) = 0;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/rgw/rgw_redis_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,10 @@ rgw::AioResultList RedisDriver::get_async(const DoutPrefixProvider* dpp, optiona
return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id);
}

int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) {
rgw::AioResultList RedisDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) {
// TODO: implement
return -1;
rgw::AioResultList aio_result_list;
return aio_result_list;
}

void RedisDriver::shutdown()
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_redis_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RedisDriver : public CacheDriver {

virtual int initialize(const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override;
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
Expand Down
185 changes: 111 additions & 74 deletions src/rgw/rgw_ssd_driver.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common/async/completion.h"
#include "common/errno.h"
#include "common/async/blocked_completion.h"
#include "rgw_ssd_driver.h"
#if defined(__linux__)
#include <features.h>
Expand Down Expand Up @@ -54,43 +55,18 @@ int SSDDriver::initialize(const DoutPrefixProvider* dpp)

int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, optional_yield y)
{
bufferlist src = bl;
std::string location = partition_info.location + key;

ldpp_dout(dpp, 20) << __func__ << "(): location=" << location << dendl;
FILE *cache_file = nullptr;
int r = 0;
size_t nbytes = 0;

cache_file = fopen(location.c_str(), "w+");
if (cache_file == nullptr) {
ldpp_dout(dpp, 0) << "ERROR: put::fopen file has return error, errno=" << errno << dendl;
return -errno;
}

nbytes = fwrite(src.c_str(), 1, len, cache_file);
if (nbytes != len) {
ldpp_dout(dpp, 0) << "ERROR: put::io_write: fwrite has returned error: nbytes!=len, nbytes=" << nbytes << ", len=" << len << dendl;
return -EIO;
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
boost::system::error_code ec;
if (y) {
using namespace boost::asio;
spawn::yield_context yield = y.get_yield_context();
this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, yield[ec]);
} else {
this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, ceph::async::use_blocked[ec]);
}

r = fclose(cache_file);
if (r != 0) {
ldpp_dout(dpp, 0) << "ERROR: put::fclose file has return error, errno=" << errno << dendl;
return -errno;
if (ec) {
return ec.value();
}

if (attrs.size() > 0) {
r = set_attrs(dpp, key, attrs, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, r = " << r << dendl;
return r;
}
}

efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;

return 0;
}

Expand Down Expand Up @@ -177,6 +153,13 @@ auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& ha
return p;
}

template <typename Executor1, typename CompletionHandler>
auto SSDDriver::AsyncWriteRequest::create(const Executor1& ex1, CompletionHandler&& handler)
{
auto p = Completion::create(ex1, std::move(handler));
return p;
}

template <typename ExecutionContext, typename CompletionToken>
auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token)
Expand All @@ -190,7 +173,7 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx,
auto p = Op::create(ctx.get_executor(), init.completion_handler);
auto& op = p->user_data;

int ret = op.init(dpp, location, read_ofs, read_len, p.get());
int ret = op.prepare_libaio_read_op(dpp, location, read_ofs, read_len, p.get());
if(0 == ret) {
ret = ::aio_read(op.aio_cb.get());
}
Expand All @@ -204,6 +187,47 @@ auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx,
return init.result.get();
}

template <typename ExecutionContext, typename CompletionToken>
void SSDDriver::put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token)
{
std::string location = partition_info.location + key;
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;

using Op = AsyncWriteRequest;
using Signature = typename Op::Signature;
boost::asio::async_completion<CompletionToken, Signature> init(token);
auto p = Op::create(ctx.get_executor(), init.completion_handler);
auto& op = p->user_data;

int r = 0;
bufferlist src = bl;
r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location);
op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
op.cb->aio_sigevent.sigev_notify_attributes = nullptr;
op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get();
op.key = key;
op.dpp = dpp;
op.priv_data = this;
op.attrs = std::move(attrs);

if (r >= 0) {
r = ::aio_write(op.cb.get());
} else {
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::prepare_libaio_write_op(), r=" << r << dendl;
}

ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_write(), r=" << r << dendl;
if(r < 0) {
auto ec = boost::system::error_code{-r, boost::system::system_category()};
ceph::async::post(std::move(p), ec);
} else {
(void)p.release();
}
init.result.get();
}

rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
off_t read_ofs, off_t read_len, const std::string& key) {
return [this, dpp, y, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable {
Expand All @@ -216,7 +240,23 @@ rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, opt
auto ex = get_associated_executor(init.completion_handler);

ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_read_handler{aio, r}));
};
}

rgw::Aio::OpFunc SSDDriver::ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key) {
return [this, dpp, y, bl, len, attrs, key] (Aio* aio, AioResult& r) mutable {
ceph_assert(y);
ldpp_dout(dpp, 20) << "SSDCache: cache_write_op(): Write to Cache, oid=" << r.obj.oid << dendl;

using namespace boost::asio;
spawn::yield_context yield = y.get_yield_context();
async_completion<spawn::yield_context, void()> init(yield);
auto ex = get_associated_executor(init.completion_handler);

ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
this->put_async(dpp, y.get_io_context(), key, bl, len, attrs, bind_executor(ex, SSDDriver::libaio_write_handler{aio, r}));
};
}

Expand All @@ -227,35 +267,11 @@ rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_
return aio->get(r_obj, ssd_cache_read_op(dpp, y, this, ofs, len, key), cost, id);
}

void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
{
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;
}

int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs)
rgw::AioResultList SSDDriver::put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl;
bufferlist src = bl;
struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp);
int r = 0;
if ((r = wr->prepare_libaio_write_op(dpp, src, len, key, partition_info.location)) < 0) {
ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
return r;
}
wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
wr->key = key;
wr->priv_data = this;

if ((r = ::aio_write(wr->cb)) != 0) {
ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl;
delete wr;
return r;
}
return 0;
rgw_raw_obj r_obj;
r_obj.oid = key;
return aio->get(r_obj, ssd_cache_write_op(dpp, y, this, bl, len, attrs, key), cost, id);
}

int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
Expand All @@ -277,12 +293,11 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid
{
std::string location = cache_location + key;
int r = 0;

ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
cb = new struct aiocb;
cb.reset(new struct aiocb);
memset(cb.get(), 0, sizeof(struct aiocb));
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
memset(cb, 0, sizeof(struct aiocb));
r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
r = fd = TEMP_FAILURE_RETRY(::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode));
if (fd < 0) {
ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
return r;
Expand All @@ -303,13 +318,35 @@ int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvid
return r;
}

void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
{
SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
c->priv_data->libaio_write_completion_cb(c);
void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
auto op = std::move(p->user_data);
ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl;
int attr_ret = 0;
if (op.attrs.size() > 0) {
//TODO - fix yield_context
optional_yield y{null_yield};
attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y);
if (attr_ret < 0) {
ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
}
}

Partition partition_info = op.priv_data->get_current_partition_info(op.dpp);
efs::space_info space = efs::space(partition_info.location);
op.priv_data->set_free_space(op.dpp, space.available);

const int ret = -aio_error(op.cb.get());
boost::system::error_code ec;
if (ret < 0) {
ec.assign(-ret, boost::system::system_category());
} else if (attr_ret < 0) {
ec.assign(-attr_ret, boost::system::system_category());
}
ceph::async::dispatch(std::move(p), ec);
}

int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
int SSDDriver::AsyncReadOp::prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl;
aio_cb.reset(new struct aiocb);
Expand Down
48 changes: 32 additions & 16 deletions src/rgw/rgw_ssd_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SSDDriver : public CacheDriver {
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override { return -1; } // TODO: implement
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs) override;
virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
Expand All @@ -30,13 +30,14 @@ class SSDDriver : public CacheDriver {
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; }
void set_free_space(const DoutPrefixProvider* dpp, uint64_t free_space) { this->free_space = free_space; }

private:
Partition partition_info;
uint64_t free_space;
CephContext* cct;

struct libaio_handler {
struct libaio_read_handler {
rgw::Aio* throttle = nullptr;
rgw::AioResult& r;
// read callback
Expand All @@ -47,24 +48,41 @@ class SSDDriver : public CacheDriver {
}
};

struct libaio_write_handler {
rgw::Aio* throttle = nullptr;
rgw::AioResult& r;
// write callback
void operator()(boost::system::error_code ec) const {
r.result = -ec.value();
throttle->put(r);
}
};

// unique_ptr with custom deleter for struct aiocb
struct libaio_aiocb_deleter {
void operator()(struct aiocb* c) {
if(c->aio_fildes > 0) {
if( ::close(c->aio_fildes) != 0) {
}
TEMP_FAILURE_RETRY(::close(c->aio_fildes));
}
c->aio_buf = nullptr;
delete c;
}
};

template <typename ExecutionContext, typename CompletionToken>
auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token);


template <typename ExecutionContext, typename CompletionToken>
void put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, CompletionToken&& token);

rgw::Aio::OpFunc ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
off_t read_ofs, off_t read_len, const std::string& key);

rgw::Aio::OpFunc ssd_cache_write_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, const std::string& key);

using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;

struct AsyncReadOp {
Expand All @@ -73,8 +91,8 @@ class SSDDriver : public CacheDriver {
using Signature = void(boost::system::error_code, bufferlist);
using Completion = ceph::async::Completion<Signature, AsyncReadOp>;

int init(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
static void libaio_cb_aio_dispatch(sigval sigval);
int prepare_libaio_read_op(const DoutPrefixProvider *dpp, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
static void libaio_cb_aio_dispatch(sigval sigval);

template <typename Executor1, typename CompletionHandler>
static auto create(const Executor1& ex1, CompletionHandler&& handler);
Expand All @@ -85,21 +103,19 @@ class SSDDriver : public CacheDriver {
std::string key;
void *data;
int fd;
struct aiocb *cb;
unique_aio_cb_ptr cb;
SSDDriver *priv_data;
rgw::sal::Attrs attrs;

using Signature = void(boost::system::error_code);
using Completion = ceph::async::Completion<Signature, AsyncWriteRequest>;

AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {}
int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location);
static void libaio_write_cb(sigval sigval);

~AsyncWriteRequest() {
::close(fd);
cb->aio_buf = nullptr;
delete(cb);
}
template <typename Executor1, typename CompletionHandler>
static auto create(const Executor1& ex1, CompletionHandler&& handler);
};

void libaio_write_completion_cb(AsyncWriteRequest* c);
};

} } // namespace rgw::cache
Expand Down
Loading

0 comments on commit 1a44398

Please sign in to comment.