Skip to content

Commit

Permalink
Merge pull request ceph#30853 from ivancich/wip-tune-sharded-bucket-l…
Browse files Browse the repository at this point in the history
…isting

rgw: reduce per-shard entry count during ordered bucket listing
  • Loading branch information
ivancich authored Nov 10, 2019
2 parents b30c452 + 729c0ba commit 7715cd4
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 40 deletions.
5 changes: 5 additions & 0 deletions PendingReleaseNotes
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,8 @@

Note that this only applies to configuration options in the
monitor's database--config file parsing is not affected.

* RGW: bucket listing performance on sharded bucket indexes has been
notably improved by heuristically -- and significantly, in many
cases -- reducing the number of entries requested from each bucket
index shard.
147 changes: 107 additions & 40 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1778,8 +1778,12 @@ int RGWRados::Bucket::List::list_objects_ordered(int64_t max_p,
}
}

constexpr int allowed_read_attempts = 2;
string skip_after_delim;
while (truncated && count <= max) {
for (int attempt = 0; attempt < allowed_read_attempts; ++attempt) {
// this loop is generally expected only to have a single
// iteration; see bottom of loop for early exit

ent_map_t ent_map;
ent_map.reserve(read_ahead);
int r = store->cls_bucket_list_ordered(target->get_bucket_info(),
Expand Down Expand Up @@ -1877,7 +1881,7 @@ int RGWRados::Bucket::List::list_objects_ordered(int64_t max_p,

result->emplace_back(std::move(entry));
count++;
}
} // eiter for loop

if (!params.delim.empty()) {
int marker_delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
Expand All @@ -1896,7 +1900,18 @@ int RGWRados::Bucket::List::list_objects_ordered(int64_t max_p,
}
}
}
}

// if we finished listing, or if we're returning at least half the
// requested entries, that's enough; S3 and swift protocols allow
// returning fewer than max entries
if (!truncated || count >= max / 2) {
break;
}

ldout(cct, 1) << "RGWRados::Bucket::List::" << __func__ <<
" INFO ordered bucket listing requires read #" << (2 + attempt) <<
dendl;
} // read attempt loop

done:
if (is_truncated)
Expand Down Expand Up @@ -7941,6 +7956,36 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_
}


uint32_t RGWRados::calc_ordered_bucket_list_per_shard(uint32_t num_entries,
uint32_t num_shards)
{
// We want to minimize the chances that when num_shards >>
// num_entries that we return much fewer than num_entries to the
// client. Given all the overhead of making a cls call to the osd,
// returning a few entries is not much more work than returning one
// entry. This minimum might be better tuned based on future
// experiments where num_shards >> num_entries. (Note: ">>" should
// be interpreted as "much greater than".)
constexpr uint32_t min_read = 8;

// The following is based on _"Balls into Bins" -- A Simple and
// Tight Analysis_ by Raab and Steger. We add 1 as a way to handle
// cases when num_shards >> num_entries (it almost serves as a
// ceiling calculation). We also assume alpha is 1.0 and extract it
// from the calculation. Future work could involve memoizing some of
// the transcendental functions to minimize repeatedly re-calling
// them with the same parameters, which we expect to be the case the
// majority of the time.
uint32_t calc_read =
1 +
static_cast<uint32_t>((num_entries / num_shards) +
sqrt((2 * num_entries) *
log(num_shards) / num_shards));

return std::max(min_read, calc_read);
}


int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
int shard_id,
const rgw_obj_index_key& start,
Expand All @@ -7962,39 +8007,46 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
// value - list result for the corresponding oid (shard), it is filled by
// the AIO callback
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, &index_pool, &oids, nullptr);
if (r < 0)
int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id,
&index_pool, &oids, nullptr);
if (r < 0) {
return r;
}

auto& ioctx = index_pool.ioctx();
const uint32_t shard_count = oids.size();
const uint32_t num_entries_per_shard =
calc_ordered_bucket_list_per_shard(num_entries, shard_count);

ldout(cct, 10) << __func__ << " request from each of " << shard_count <<
" shard(s) for " << num_entries_per_shard << " entries to get " <<
num_entries << " total entries" << dendl;

auto& ioctx = index_pool.ioctx();
map<int, struct rgw_cls_list_ret> list_results;
cls_rgw_obj_key start_key(start.name, start.instance);
r = CLSRGWIssueBucketList(ioctx, start_key, prefix, num_entries,
r = CLSRGWIssueBucketList(ioctx, start_key, prefix, num_entries_per_shard,
list_versions, oids, list_results,
cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
if (r < 0) {
return r;
}

// Create a list of iterators that are used to iterate each shard
// create a list of iterators that are used to iterate each shard
vector<RGWRados::ent_map_t::iterator> vcurrents;
vector<RGWRados::ent_map_t::iterator> vends;
vector<string> vnames;
vcurrents.reserve(list_results.size());
vends.reserve(list_results.size());
vnames.reserve(list_results.size());
map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
*is_truncated = false;
for (; iter != list_results.end(); ++iter) {
vcurrents.push_back(iter->second.dir.m.begin());
vends.push_back(iter->second.dir.m.end());
vnames.push_back(oids[iter->first]);
*is_truncated = (*is_truncated || iter->second.is_truncated);
for (auto& iter : list_results) {
vcurrents.push_back(iter.second.dir.m.begin());
vends.push_back(iter.second.dir.m.end());
vnames.push_back(oids[iter.first]);
}

// Create a map to track the next candidate entry from each shard, if the entry
// from a specified shard is selected/erased, the next entry from that shard will
// be inserted for next round selection
// create a map to track the next candidate entry from each shard,
// if the entry from a specified shard is selected/erased, the next
// entry from that shard will be inserted for next round selection
map<string, size_t> candidates;
for (size_t i = 0; i < vcurrents.size(); ++i) {
if (vcurrents[i] != vends[i]) {
Expand All @@ -8006,7 +8058,7 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
uint32_t count = 0;
while (count < num_entries && !candidates.empty()) {
r = 0;
// Select the next one
// select the next one
int pos = candidates.begin()->second;
const string& name = vcurrents[pos]->first;
struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
Expand All @@ -8016,17 +8068,18 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
if ((!dirent.exists && !dirent.is_delete_marker()) ||
!dirent.pending_map.empty() ||
force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
/* there are uncommitted ops. We need to check the current
* state, and if the tags are old we need to do clean-up as
* well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(ioctx);
r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
updates[vnames[pos]], y);
if (r < 0 && r != -ENOENT) {
return r;
return r;
}
} else {
r = 0;
r = 0;
}
if (r >= 0) {
ldout(cct, 10) << "RGWRados::cls_bucket_list_ordered: got " <<
Expand All @@ -8035,36 +8088,50 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
++count;
}

// Refresh the candidates map
// refresh the candidates map
candidates.erase(candidates.begin());
++vcurrents[pos];
if (vcurrents[pos] != vends[pos]) {
if (++vcurrents[pos] != vends[pos]) { // note: pre-increment
candidates[vcurrents[pos]->first] = pos;
} else if (list_results[pos].is_truncated) {
// once we exhaust one shard that is truncated, we need to stop,
// as we cannot be certain that one of the next entries needs to
// come from that shard; S3 and swift protocols allow returning
// fewer than what was requested
break;
}
}
} // while we haven't provided requested # of result entries

// Suggest updates if there is any
map<string, bufferlist>::iterator miter = updates.begin();
for (; miter != updates.end(); ++miter) {
if (miter->second.length()) {
// suggest updates if there is any
for (auto& miter : updates) {
if (miter.second.length()) {
ObjectWriteOperation o;
cls_rgw_suggest_changes(o, miter->second);
cls_rgw_suggest_changes(o, miter.second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
ioctx.aio_operate(miter->first, c, &o);
AioCompletion *c =
librados::Rados::aio_create_completion(nullptr, nullptr);
ioctx.aio_operate(miter.first, c, &o);
c->release();
}
}
} // updates loop

// Check if all the returned entries are consumed or not
*is_truncated = false;
// check if all the returned entries are consumed or not
for (size_t i = 0; i < vcurrents.size(); ++i) {
if (vcurrents[i] != vends[i]) {
if (vcurrents[i] != vends[i] || list_results[i].is_truncated) {
*is_truncated = true;
break;
}
}
if (!m.empty())

if (*is_truncated && count < num_entries) {
ldout(cct, 10) << "RGWRados::" << __func__ <<
": INFO requested " << num_entries << " entries but returning " <<
count << ", which is truncated" << dendl;
}

if (!m.empty()) {
*last_entry = m.rbegin()->first;
}

return 0;
}
Expand Down
6 changes: 6 additions & 0 deletions src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,12 @@ class RGWRados
bool *is_truncated, RGWAccessListFilter *filter);

uint64_t next_bucket_id();

/**
* This is broken out to facilitate unit testing.
*/
static uint32_t calc_ordered_bucket_list_per_shard(uint32_t num_entries,
uint32_t num_shards);
};

#endif

0 comments on commit 7715cd4

Please sign in to comment.