Skip to content

Commit

Permalink
rgw/cache: implementation of put_async method in ssd driver
Browse files Browse the repository at this point in the history
and redis driver and squashes the following commits.

rgw/cache: implementation of async put. the call
does not take into account throttling for now.
rgw/cache: dummy implementation of put_async in redis
driver to fix compilation error.

Signed-off-by: Pritha Srivastava <[email protected]>
  • Loading branch information
pritha-srivastava committed Apr 2, 2024
1 parent ab5df9c commit 9511574
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/rgw/rgw_cache_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CacheDriver {
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) = 0;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) = 0;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/rgw/rgw_redis_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -746,4 +746,8 @@ void RedisCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, option
{
}

int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
{
return 0;
}
} } // namespace rgw::cache
1 change: 1 addition & 0 deletions src/rgw/rgw_redis_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class RedisDriver : public CacheDriver {

virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;

struct libaio_handler { // should this be the same as SSDDriver? -Sam
rgw::Aio* throttle = nullptr;
Expand Down
71 changes: 70 additions & 1 deletion src/rgw/rgw_ssd_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ int SSDDriver::put(const DoutPrefixProvider* dpp, const std::string& key, buffer
return -errno;
}

efs::space_info space = efs::space(location);
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;

if (attrs.size() > 0) {
Expand Down Expand Up @@ -229,6 +229,38 @@ rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_
return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
}

void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
{
efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;

insert_entry(c->dpp, c->key, 0, c->cb->aio_nbytes);
}

int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl;
struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp);
int r = 0;
if ((r = wr->prepare_libaio_write_op(dpp, bl, len, key, partition_info.location)) < 0) {
ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
return r;
}
wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
wr->key = key;
wr->priv_data = this;

if ((r = ::aio_write(wr->cb)) != 0) {
ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl;
delete wr;
return r;
}
return 0;
}

int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key)
{
std::string location = partition_info.location + key;
Expand All @@ -244,6 +276,43 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke
return remove_entry(dpp, key);
}

int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
{
std::string location = cache_location + key;
int r = 0;

ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
cb = new struct aiocb;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
memset(cb, 0, sizeof(struct aiocb));
r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
if (fd < 0) {
ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
return r;
}
if (dpp->get_cct()->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL)
posix_fadvise(fd, 0, 0, dpp->get_cct()->_conf->rgw_d3n_l1_fadvise);
cb->aio_fildes = fd;

data = malloc(len);
if (!data) {
ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: memory allocation failed" << dendl;
::close(fd);
return r;
}
cb->aio_buf = data;
memcpy((void*)data, bl.c_str(), len);
cb->aio_nbytes = len;
return r;
}

void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
{
SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
ldpp_dout(c->dpp, 20) << "SSDCache: " << __func__ << "()" << dendl;
c->priv_data->libaio_write_completion_cb(c);
}

int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl;
Expand Down
42 changes: 32 additions & 10 deletions src/rgw/rgw_ssd_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SSDDriver : public CacheDriver {
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
Expand Down Expand Up @@ -63,7 +64,6 @@ class SSDDriver : public CacheDriver {
template <typename ExecutionContext, typename CompletionToken>
auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token);

protected:
static std::unordered_map<std::string, Partition> partitions;
std::unordered_map<std::string, Entry> entries;
Expand All @@ -79,6 +79,7 @@ class SSDDriver : public CacheDriver {
std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);

private:

// unique_ptr with custom deleter for struct aiocb
struct libaio_aiocb_deleter {
void operator()(struct aiocb* c) {
Expand All @@ -93,17 +94,38 @@ struct libaio_aiocb_deleter {
using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;

struct AsyncReadOp {
bufferlist result;
unique_aio_cb_ptr aio_cb;
using Signature = void(boost::system::error_code, bufferlist);
using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
bufferlist result;
unique_aio_cb_ptr aio_cb;
using Signature = void(boost::system::error_code, bufferlist);
using Completion = ceph::async::Completion<Signature, AsyncReadOp>;

int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
static void libaio_cb_aio_dispatch(sigval sigval);
int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
static void libaio_cb_aio_dispatch(sigval sigval);

template <typename Executor1, typename CompletionHandler>
static auto create(const Executor1& ex1, CompletionHandler&& handler);
};
template <typename Executor1, typename CompletionHandler>
static auto create(const Executor1& ex1, CompletionHandler&& handler);
};

struct AsyncWriteRequest {
const DoutPrefixProvider* dpp;
std::string key;
void *data;
int fd;
struct aiocb *cb;
SSDDriver *priv_data;

AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {}
int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location);
static void libaio_write_cb(sigval sigval);

~AsyncWriteRequest() {
::close(fd);
cb->aio_buf = nullptr;
delete(cb);
}
};

void libaio_write_completion_cb(AsyncWriteRequest* c);

};

Expand Down

0 comments on commit 9511574

Please sign in to comment.