Skip to content

Commit

Permalink
rgw/d4n: modifications to the filter driver:
Browse files Browse the repository at this point in the history
1. replaced put_async() with put() in handle_data()
2. moved calls to update() from iterate() to flush()

Signed-off-by: Pritha Srivastava <[email protected]>
  • Loading branch information
pritha-srivastava committed Apr 2, 2024
1 parent 5df4a66 commit a360990
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
62 changes: 40 additions & 22 deletions src/rgw/driver/d4n/rgw_sal_d4n.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,20 +460,20 @@ void D4NFilterObject::D4NFilterReadOp::cancel() {
aio->drain();
}

int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) {
int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
auto c = aio->wait();
while (!c.empty()) {
int r = flush(dpp, std::move(c));
int r = flush(dpp, std::move(c), y);
if (r < 0) {
cancel();
return r;
}
c = aio->wait();
}
return flush(dpp, std::move(c));
return flush(dpp, std::move(c), y);
}

int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) {
int r = rgw::check_for_errors(results);

if (r < 0) {
Expand All @@ -494,11 +494,29 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;

bl_list.push_back(bl);
offset += bl.length();
int r = client_cb->handle_data(bl, 0, bl.length());
if (r < 0) {
return r;
}
auto it = blocks_info.find(offset);
if (it != blocks_info.end()) {
std::string version = source->get_object_version();
std::string prefix = source->get_prefix();
if (version.empty()) {
version = source->get_instance();
}
std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
uint64_t ofs = ofs_len_pair.first;
uint64_t len = ofs_len_pair.second;
std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
blocks_info.erase(it);
} else {
ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
}

offset += bl.length();
completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
}

Expand All @@ -523,7 +541,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int

this->client_cb = cb;
this->cb->set_client_cb(cb, dpp, &y);
this->cb->set_prefix(prefix);
source->set_prefix(prefix);

/* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
Expand All @@ -547,6 +565,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int

this->offset = ofs;

if (version.empty()) {
version = source->get_instance();
}

do {
uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
if (start_part_num == (num_parts - 1)) {
Expand All @@ -566,9 +588,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int

ceph::bufferlist bl;
std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
if (version.empty()) {
version = source->get_instance();
}

ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;

Expand All @@ -579,14 +598,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);

source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y);
this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));

ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;

auto r = flush(dpp, std::move(completed));
auto r = flush(dpp, std::move(completed), y);

if (r < 0) {
drain(dpp);
drain(dpp, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}
Expand All @@ -600,22 +618,21 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
// Read From Cache
auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);

source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y);
this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));

ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;

auto r = flush(dpp, std::move(completed));
auto r = flush(dpp, std::move(completed), y);

if (r < 0) {
drain(dpp);
drain(dpp, y);
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
return r;
}

} else {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;

auto r = drain(dpp);
auto r = drain(dpp, y);

if (r < 0) {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
Expand All @@ -628,7 +645,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int

if (start_part_num == (num_parts - 1)) {
ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
return drain(dpp);
return drain(dpp, y);
} else {
adjusted_start_ofs += obj_max_req_size;
}
Expand Down Expand Up @@ -701,6 +718,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
Attrs attrs; // empty attrs for cache sets
std::string version = source->get_object_version();
std::string prefix = source->get_prefix();
if (version.empty()) {
version = source->get_instance();
}
Expand All @@ -715,7 +733,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
//Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);

Expand Down Expand Up @@ -755,7 +773,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);

Expand Down Expand Up @@ -802,7 +820,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
block.version = version;
auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
if (ret == 0) {
ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs);
ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
if (ret == 0) {
filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);

Expand Down
11 changes: 7 additions & 4 deletions src/rgw/driver/d4n/rgw_sal_d4n.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ class D4NFilterObject : public FilterObject {
private:
D4NFilterDriver* driver;
std::string version;
std::string prefix;

public:
struct D4NFilterReadOp : FilterReadOp {
public:
class D4NFilterGetCB: public RGWGetDataCB {
private:
D4NFilterDriver* filter;
std::string prefix;
D4NFilterObject* source;
RGWGetDataCB* client_cb;
uint64_t ofs = 0, len = 0;
Expand All @@ -130,7 +130,6 @@ class D4NFilterObject : public FilterObject {
this->y = y;
}
void set_ofs(uint64_t ofs) { this->ofs = ofs; }
void set_prefix(const std::string& prefix) { this->prefix = prefix; }
int flush_last_part();
void bypass_cache_write() { this->write_to_cache = false; }
};
Expand All @@ -154,10 +153,11 @@ class D4NFilterObject : public FilterObject {
std::unique_ptr<rgw::Aio> aio;
uint64_t offset = 0; // next offset to write to client
rgw::AioResultList completed; // completed read results, sorted by offset
std::unordered_map<uint64_t, std::pair<uint64_t,uint64_t>> blocks_info;

int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results);
int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y);
void cancel();
int drain(const DoutPrefixProvider* dpp);
int drain(const DoutPrefixProvider* dpp, optional_yield y);
};

struct D4NFilterDeleteOp : FilterDeleteOp {
Expand Down Expand Up @@ -209,6 +209,9 @@ class D4NFilterObject : public FilterObject {

void set_object_version(const std::string& version) { this->version = version; }
const std::string get_object_version() { return this->version; }

void set_prefix(const std::string& prefix) { this->prefix = prefix; }
const std::string get_prefix() { return this->prefix; }
};

class D4NFilterWriter : public FilterWriter {
Expand Down

0 comments on commit a360990

Please sign in to comment.