Skip to content

Commit

Permalink
Merge pull request ceph#13111 from cbodley/wip-rgw-mdlog-trim
Browse files Browse the repository at this point in the history
rgw multisite: automated mdlog trimming

Reviewed-by: Yehuda Sadeh <[email protected]>
  • Loading branch information
yehudasa authored May 12, 2017
2 parents cf8b54a + ec0a655 commit 404cee7
Show file tree
Hide file tree
Showing 12 changed files with 1,077 additions and 82 deletions.
23 changes: 23 additions & 0 deletions src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ enum {
OPT_METADATA_SYNC_INIT,
OPT_METADATA_SYNC_RUN,
OPT_MDLOG_LIST,
OPT_MDLOG_AUTOTRIM,
OPT_MDLOG_TRIM,
OPT_MDLOG_FETCH,
OPT_MDLOG_STATUS,
Expand Down Expand Up @@ -819,6 +820,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
} else if (strcmp(prev_cmd, "mdlog") == 0) {
if (strcmp(cmd, "list") == 0)
return OPT_MDLOG_LIST;
if (strcmp(cmd, "autotrim") == 0)
return OPT_MDLOG_AUTOTRIM;
if (strcmp(cmd, "trim") == 0)
return OPT_MDLOG_TRIM;
if (strcmp(cmd, "fetch") == 0)
Expand Down Expand Up @@ -6190,6 +6193,26 @@ int main(int argc, const char **argv)
formatter->flush(cout);
}

if (opt_cmd == OPT_MDLOG_AUTOTRIM) {
// need a full history for purging old mdlog periods
store->meta_mgr->init_oldest_log_period();

RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http(store->ctx(), crs.get_completion_mgr());
int ret = http.set_threaded();
if (ret < 0) {
cerr << "failed to initialize http client with " << cpp_strerror(ret) << std::endl;
return -ret;
}

auto num_shards = g_conf->rgw_md_log_max_shards;
ret = crs.run(create_admin_meta_log_trim_cr(store, &http, num_shards));
if (ret < 0) {
cerr << "automated mdlog trim failed with " << cpp_strerror(ret) << std::endl;
return -ret;
}
}

if (opt_cmd == OPT_MDLOG_TRIM) {
utime_t start_time, end_time;

Expand Down
12 changes: 9 additions & 3 deletions src/rgw/rgw_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ class RGWCache : public T
bufferlist& data,
RGWObjVersionTracker *objv_tracker,
real_time set_mtime) override;
int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive) override;
int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl, off_t ofs, bool exclusive,
RGWObjVersionTracker *objv_tracker = nullptr) override;

int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
Expand Down Expand Up @@ -422,7 +423,8 @@ int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time
}

template <class T>
int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive)
int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& data, off_t ofs, bool exclusive,
RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
string oid;
Expand All @@ -436,7 +438,11 @@ int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& da
info.status = 0;
info.flags = CACHE_FLAG_DATA;
}
int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive);
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
if (cacheable) {
string name = normal_name(pool, oid);
if (ret >= 0) {
Expand Down
67 changes: 62 additions & 5 deletions src/rgw/rgw_cr_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ int RGWSimpleRadosReadAttrsCR::request_complete()

int RGWAsyncPutSystemObj::_send_request()
{
return store->put_system_obj_data(NULL, obj, bl, -1, exclusive);
return store->put_system_obj_data(NULL, obj, bl, -1, exclusive, objv_tracker);
}

RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
const rgw_raw_obj& _obj, bool _exclusive,
bufferlist& _bl) : RGWAsyncRadosRequest(caller, cn), store(_store),
obj(_obj), exclusive(_exclusive),
bl(_bl)
RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
bool _exclusive, bufferlist& _bl)
: RGWAsyncRadosRequest(caller, cn), store(_store), objv_tracker(_objv_tracker),
obj(_obj), exclusive(_exclusive), bl(_bl)
{
}

Expand Down Expand Up @@ -315,6 +315,40 @@ int RGWRadosRemoveOmapKeysCR::send_request() {
return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op);
}

RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj)
: RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
{
set_description() << "remove dest=" << obj;
}

int RGWRadosRemoveCR::send_request()
{
auto rados = store->get_rados_handle();
int r = rados->ioctx_create(obj.pool.name.c_str(), ioctx);
if (r < 0) {
lderr(cct) << "ERROR: failed to open pool (" << obj.pool.name << ") ret=" << r << dendl;
return r;
}
ioctx.locator_set_key(obj.loc);

set_status() << "send request";

librados::ObjectWriteOperation op;
op.remove();

cn = stack->create_completion_notifier();
return ioctx.aio_operate(obj.oid, cn->completion(), &op);
}

int RGWRadosRemoveCR::request_complete()
{
int r = cn->completion()->get_return_value();

set_status() << "request complete; ret=" << r;

return r;
}

RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const rgw_raw_obj& _obj,
const string& _lock_name,
Expand Down Expand Up @@ -722,6 +756,29 @@ int RGWRadosTimelogTrimCR::request_complete()
return r;
}


RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
const std::string& to_marker,
std::string *last_trim_marker)
: RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
std::string{}, to_marker),
cct(store->ctx()), last_trim_marker(last_trim_marker)
{
}

int RGWSyncLogTrimCR::request_complete()
{
int r = RGWRadosTimelogTrimCR::request_complete();
if (r < 0 && r != -ENODATA) {
return r;
}
if (*last_trim_marker < to_marker) {
*last_trim_marker = to_marker;
}
return 0;
}


int RGWAsyncStatObj::_send_request()
{
rgw_raw_obj raw_obj;
Expand Down
51 changes: 39 additions & 12 deletions src/rgw/rgw_cr_rados.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef CEPH_RGW_CR_RADOS_H
#define CEPH_RGW_CR_RADOS_H

#include <boost/intrusive_ptr.hpp>
#include "include/assert.h"
#include "rgw_coroutine.h"
#include "rgw_rados.h"
#include "common/WorkQueue.h"
Expand Down Expand Up @@ -119,6 +121,7 @@ class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {

class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
RGWRados *store;
RGWObjVersionTracker *objv_tracker;
rgw_raw_obj obj;
bool exclusive;
bufferlist bl;
Expand All @@ -127,8 +130,8 @@ class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
int _send_request() override;
public:
RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
const rgw_raw_obj& _obj, bool _exclusive,
bufferlist& _bl);
RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
bool _exclusive, bufferlist& _bl);
};

class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
Expand Down Expand Up @@ -189,16 +192,18 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
T *result;
/// on ENOENT, call handle_data() with an empty object instead of failing
const bool empty_on_enoent;
RGWObjVersionTracker *objv_tracker;

RGWAsyncGetSystemObj *req{nullptr};

public:
RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const rgw_raw_obj& _obj,
T *_result, bool empty_on_enoent = true)
T *_result, bool empty_on_enoent = true,
RGWObjVersionTracker *objv_tracker = nullptr)
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
obj_ctx(store), obj(_obj), result(_result),
empty_on_enoent(empty_on_enoent) {}
empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
~RGWSimpleRadosReadCR() override {
request_cleanup();
}
Expand All @@ -222,7 +227,7 @@ template <class T>
int RGWSimpleRadosReadCR<T>::send_request()
{
req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
store, &obj_ctx, NULL,
store, &obj_ctx, objv_tracker,
obj,
&bl, 0, -1);
if (pattrs) {
Expand Down Expand Up @@ -305,17 +310,16 @@ class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
bufferlist bl;

rgw_raw_obj obj;
RGWObjVersionTracker *objv_tracker;

RGWAsyncPutSystemObj *req;
RGWAsyncPutSystemObj *req{nullptr};

public:
RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const rgw_raw_obj& _obj,
const T& _data) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados),
store(_store),
obj(_obj),
req(NULL) {
const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
store(_store), obj(_obj), objv_tracker(objv_tracker) {
::encode(_data, bl);
}

Expand All @@ -332,7 +336,7 @@ class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {

int send_request() override {
req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
store, obj, false, bl);
store, objv_tracker, obj, false, bl);
async_rados->queue(req);
return 0;
}
Expand Down Expand Up @@ -464,6 +468,19 @@ class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
}
};

class RGWRadosRemoveCR : public RGWSimpleCoroutine {
RGWRados *store;
librados::IoCtx ioctx;
const rgw_raw_obj obj;
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;

public:
RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);

int send_request();
int request_complete();
};

class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
Expand Down Expand Up @@ -1093,6 +1110,16 @@ class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
int request_complete() override;
};

// wrapper to update last_trim_marker on success
class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
CephContext *cct;
std::string *last_trim_marker;
public:
RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
const std::string& to_marker, std::string *last_trim_marker);
int request_complete() override;
};

class RGWAsyncStatObj : public RGWAsyncRadosRequest {
RGWRados *store;
RGWBucketInfo bucket_info;
Expand Down
28 changes: 3 additions & 25 deletions src/rgw/rgw_data_sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2891,6 +2891,7 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
}


// TODO: move into rgw_data_sync_trim.cc
#undef dout_prefix
#define dout_prefix (*_dout << "data trim: ")

Expand Down Expand Up @@ -2936,28 +2937,7 @@ void take_min_markers(IterIn first, IterIn last, IterOut dest)
}
}

// wrapper to update last_trim_marker on success
class LastTimelogTrimCR : public RGWRadosTimelogTrimCR {
CephContext *cct;
std::string *last_trim_marker;
public:
LastTimelogTrimCR(RGWRados *store, const std::string& oid,
const std::string& to_marker, std::string *last_trim_marker)
: RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
std::string{}, to_marker),
cct(store->ctx()), last_trim_marker(last_trim_marker)
{}
int request_complete() override {
int r = RGWRadosTimelogTrimCR::request_complete();
if (r < 0 && r != -ENODATA) {
ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl;
return r;
}
ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl;
*last_trim_marker = to_marker;
return 0;
}
};
} // anonymous namespace

class DataLogTrimCR : public RGWCoroutine {
RGWRados *store;
Expand Down Expand Up @@ -3036,7 +3016,7 @@ int DataLogTrimCR::operate()
ldout(cct, 10) << "trimming log shard " << i
<< " at marker=" << stable
<< " last_trim=" << last_trim[i] << dendl;
using TrimCR = LastTimelogTrimCR;
using TrimCR = RGWSyncLogTrimCR;
spawn(new TrimCR(store, store->data_log->get_oid(i),
stable, &last_trim[i]),
true);
Expand Down Expand Up @@ -3100,8 +3080,6 @@ int DataLogTrimPollCR::operate()
return 0;
}

} // anonymous namespace

RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards, utime_t interval)
Expand Down
Loading

0 comments on commit 404cee7

Please sign in to comment.