Skip to content

Commit

Permalink
Merge pull request ceph#56486 from soumyakoduri/wip-skoduri-lc-stall
Browse files Browse the repository at this point in the history
rgw/lc: advance head if the current entry doesn't exist

Reviewed-by: Matt Benjamin <[email protected]>
  • Loading branch information
cbodley authored Apr 25, 2024
2 parents befd8dc + e2b3a30 commit 6ac316b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 56 deletions.
120 changes: 64 additions & 56 deletions src/rgw/rgw_lc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,55 @@ inline int RGWLC::advance_head(const std::string& lc_shard,
return ret;
} /* advance head */

inline int RGWLC::check_if_shard_done(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head, int worker_ix)
{
int ret{0};

if (head.get_marker().empty()) {
/* done with this shard */
ldpp_dout(this, 5) <<
"RGWLC::process() next_entry not found. cycle finished lc_shard="
<< lc_shard << " worker=" << worker_ix
<< dendl;
head.set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
ret = 1; // to mark that shard is done
}
return ret;
}

inline int RGWLC::update_head(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date, int worker_ix)
{
int ret{0};

ret = advance_head(lc_shard, head, entry, start_date);
if (ret != 0) {
ldpp_dout(this, 0) << "RGWLC::update_head() failed to advance head "
<< lc_shard
<< dendl;
goto exit;
}

ret = check_if_shard_done(lc_shard, head, worker_ix);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::update_head() failed to check if shard is done "
<< lc_shard
<< dendl;
}

exit:
return ret;
}

int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
{
Expand Down Expand Up @@ -2279,13 +2328,14 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
/* fetches the entry pointed to by head.bucket */
ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry);
if (ret == -ENOENT) {
ret = sal_lc->get_next_entry(lc_shard, head->get_marker(), &entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_next_entry(lc_shard, "
<< "head.marker, entry) returned error ret==" << ret
<< dendl;
goto exit;
}
/* skip to next entry */
std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry();
tmp_entry->set_bucket(head->get_marker());

if (update_head(lc_shard, *head.get(), *tmp_entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
Expand All @@ -2306,51 +2356,21 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
<< "RGWLC::process(): ACTIVE entry: " << entry
<< " index: " << index << " worker ix: " << worker->ix << dendl;
/* skip to next entry */
if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
goto exit;
}
/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
goto exit;
if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
} else {
if ((entry->get_status() == lc_complete) &&
already_run_today(cct, entry->get_start_time())) {
/* skip to next entry */
if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
goto exit;
}
ldpp_dout(this, 5) << "RGWLC::process() worker ix: " << worker->ix
<< " SKIP processing for already-processed bucket " << entry->get_bucket()
<< dendl;
/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
goto exit;
}
/* skip to next entry */
if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
}
Expand Down Expand Up @@ -2432,19 +2452,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
}

/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
if (check_if_shard_done(lc_shard, *head.get(), worker->ix) != 0 ) {
goto exit;
}
} while(1 && !once && !going_down());
Expand Down
7 changes: 7 additions & 0 deletions src/rgw/rgw_lc.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ class RGWLC : public DoutPrefixProvider {
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date);
int check_if_shard_done(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
int worker_ix);
int update_head(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date, int worker_ix);
int process(int index, int max_lock_secs, LCWorker* worker, bool once);
int process_bucket(int index, int max_lock_secs, LCWorker* worker,
const std::string& bucket_entry_marker, bool once);
Expand Down

0 comments on commit 6ac316b

Please sign in to comment.