Skip to content

Commit

Permalink
Merge pull request ceph#27579 from cbodley/wip-rgw-split-log-trim
Browse files Browse the repository at this point in the history
rgw: split mdlog/datalog trimming into separate files

Reviewed-by: Adam C. Emerson <[email protected]>
  • Loading branch information
cbodley authored Apr 24, 2019
2 parents 7b27e62 + 4184edf commit 7e424e7
Show file tree
Hide file tree
Showing 14 changed files with 1,006 additions and 920 deletions.
4 changes: 3 additions & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ set(librgw_common_srcs
rgw_sync_module_pubsub.cc
rgw_pubsub_push.cc
rgw_sync_module_pubsub_rest.cc
rgw_sync_log_trim.cc
rgw_sync_trace.cc
rgw_trim_bilog.cc
rgw_trim_datalog.cc
rgw_trim_mdlog.cc
rgw_period_history.cc
rgw_period_puller.cc
rgw_reshard.cc
Expand Down
4 changes: 3 additions & 1 deletion src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ extern "C" {
#include "rgw_usage.h"
#include "rgw_orphan.h"
#include "rgw_sync.h"
#include "rgw_sync_log_trim.h"
#include "rgw_trim_bilog.h"
#include "rgw_trim_datalog.h"
#include "rgw_trim_mdlog.h"
#include "rgw_data_sync.h"
#include "rgw_rest_conn.h"
#include "rgw_realm_watcher.h"
Expand Down
206 changes: 0 additions & 206 deletions src/rgw/rgw_data_sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "rgw_metadata.h"
#include "rgw_sync_counters.h"
#include "rgw_sync_module.h"
#include "rgw_sync_log_trim.h"

#include "cls/lock/cls_lock_client.h"

Expand Down Expand Up @@ -3503,208 +3502,3 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
bucket_info.bucket, status));
}


// TODO: move into rgw_data_sync_trim.cc
#undef dout_prefix
#define dout_prefix (*_dout << "data trim: ")

namespace {

/// return the marker that it's safe to trim up to
const std::string& get_stable_marker(const rgw_data_sync_marker& m)
{
return m.state == m.FullSync ? m.next_step_marker : m.marker;
}

/// comparison operator for take_min_markers()
bool operator<(const rgw_data_sync_marker& lhs,
const rgw_data_sync_marker& rhs)
{
// sort by stable marker
return get_stable_marker(lhs) < get_stable_marker(rhs);
}

/// populate the container starting with 'dest' with the minimum stable marker
/// of each shard for all of the peers in [first, last)
template <typename IterIn, typename IterOut>
void take_min_markers(IterIn first, IterIn last, IterOut dest)
{
if (first == last) {
return;
}
// initialize markers with the first peer's
auto m = dest;
for (auto &shard : first->sync_markers) {
*m = std::move(shard.second);
++m;
}
// for remaining peers, replace with smaller markers
for (auto p = first + 1; p != last; ++p) {
m = dest;
for (auto &shard : p->sync_markers) {
if (shard.second < *m) {
*m = std::move(shard.second);
}
++m;
}
}
}

} // anonymous namespace

class DataLogTrimCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
const std::string& zone_id; //< my zone id
std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
std::vector<std::string>& last_trim; //< last trimmed marker per shard
int ret{0};

public:
DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, std::vector<std::string>& last_trim)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards),
zone_id(store->svc.zone->get_zone().id),
peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
min_shard_markers(num_shards),
last_trim(last_trim)
{}

int operate() override;
};

int DataLogTrimCR::operate()
{
reenter(this) {
ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
set_status("fetching sync status");
yield {
// query data sync status from each sync peer
rgw_http_param_pair params[] = {
{ "type", "data" },
{ "status", nullptr },
{ "source-zone", zone_id.c_str() },
{ nullptr, nullptr }
};

auto p = peer_status.begin();
for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
ldout(cct, 20) << "query sync status from " << c.first << dendl;
using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
false);
++p;
}
}

// must get a successful reply from all peers to consider trimming
ret = 0;
while (ret == 0 && num_spawned() > 0) {
yield wait_for_child();
collect_next(&ret);
}
drain_all();

if (ret < 0) {
ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
return set_cr_error(ret);
}

ldout(cct, 10) << "trimming log shards" << dendl;
set_status("trimming log shards");
yield {
// determine the minimum marker for each shard
take_min_markers(peer_status.begin(), peer_status.end(),
min_shard_markers.begin());

for (int i = 0; i < num_shards; i++) {
const auto& m = min_shard_markers[i];
auto& stable = get_stable_marker(m);
if (stable <= last_trim[i]) {
continue;
}
ldout(cct, 10) << "trimming log shard " << i
<< " at marker=" << stable
<< " last_trim=" << last_trim[i] << dendl;
using TrimCR = RGWSyncLogTrimCR;
spawn(new TrimCR(store, store->data_log->get_oid(i),
stable, &last_trim[i]),
true);
}
}
return set_cr_done();
}
return 0;
}

RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards,
std::vector<std::string>& markers)
{
return new DataLogTrimCR(store, http, num_shards, markers);
}

class DataLogTrimPollCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
const utime_t interval; //< polling interval
const std::string lock_oid; //< use first data log shard for lock
const std::string lock_cookie;
std::vector<std::string> last_trim; //< last trimmed marker per shard

public:
DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, utime_t interval)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards), interval(interval),
lock_oid(store->data_log->get_oid(0)),
lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
last_trim(num_shards)
{}

int operate() override;
};

int DataLogTrimPollCR::operate()
{
reenter(this) {
for (;;) {
set_status("sleeping");
wait(interval);

// request a 'data_trim' lock that covers the entire wait interval to
// prevent other gateways from attempting to trim for the duration
set_status("acquiring trim lock");
yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
"data_trim", lock_cookie,
interval.sec()));
if (retcode < 0) {
// if the lock is already held, go back to sleep and try again later
ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
<< interval.sec() << "s" << dendl;
continue;
}

set_status("trimming");
yield call(new DataLogTrimCR(store, http, num_shards, last_trim));

// note that the lock is not released. this is intentional, as it avoids
// duplicating this work in other gateways
}
}
return 0;
}

RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards, utime_t interval)
{
return new DataLogTrimPollCR(store, http, num_shards, interval);
}
11 changes: 0 additions & 11 deletions src/rgw/rgw_data_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,4 @@ class RGWArchiveSyncModule : public RGWDefaultSyncModule {
int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};

// DataLogTrimCR factory function
extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards, utime_t interval);

// factory function for datalog trim via radosgw-admin
RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards,
std::vector<std::string>& markers);

#endif
2 changes: 2 additions & 0 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ using namespace librados;
#include "rgw_sync.h"
#include "rgw_sync_counters.h"
#include "rgw_sync_trace.h"
#include "rgw_trim_datalog.h"
#include "rgw_trim_mdlog.h"
#include "rgw_data_sync.h"
#include "rgw_realm_watcher.h"
#include "rgw_reshard.h"
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "rgw_meta_sync_status.h"
#include "rgw_period_puller.h"
#include "rgw_sync_module.h"
#include "rgw_sync_log_trim.h"
#include "rgw_trim_bilog.h"
#include "rgw_service.h"

#include "services/svc_rados.h"
Expand Down
Loading

0 comments on commit 7e424e7

Please sign in to comment.