Skip to content

Commit

Permalink
rgw/CloudTransition: Verify if the object is already tiered
Browse files Browse the repository at this point in the history
Add class to fetch headers from remote endpoint and verify if the object
is already tiered.

& Few other fixes stated below -

* Erase data in the head of cloud transitioned object
* 'placement rm' command should erase tier_config details
* A new option added in the object manifest to denote if the
  object is tiered in multiparts

Signed-off-by: Soumya Koduri <[email protected]>
  • Loading branch information
soumyakoduri committed Nov 18, 2021
1 parent 6333c0e commit 557b519
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5109,6 +5109,11 @@ int main(int argc, const char **argv)
if (iter != zonegroup.placement_targets.end()) {
RGWZoneGroupPlacementTarget& info = zonegroup.placement_targets[placement_id];
info.storage_classes.erase(*opt_storage_class);

auto ptiter = info.tier_targets.find(*opt_storage_class);
if (ptiter != info.tier_targets.end()) {
info.tier_targets.erase(ptiter);
}
}
}
} else if (opt_cmd == OPT::ZONEGROUP_PLACEMENT_DEFAULT) {
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_json_enc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ void RGWObjTier::dump(Formatter *f) const
{
f->dump_string("name", name);
f->dump_object("tier_placement", tier_placement);
f->dump_bool("is_multipart_upload", is_multipart_upload);
}

void rgw_bucket_placement::dump(Formatter *f) const
Expand Down
34 changes: 31 additions & 3 deletions src/rgw/rgw_lc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1291,36 +1291,48 @@ class LCOpAction_Transition : public LCOpAction {
RGWRados::Object op_target(tier_ctx.store->getRados(),
tier_ctx.bucket_info,
tier_ctx.rctx, tier_ctx.obj);
real_time read_mtime;

RGWRados::Object::Read read_op(&op_target);

read_op.params.attrs = &attrs;
read_op.params.lastmod = &read_mtime;

int r = read_op.prepare(null_yield);
if (r < 0) {
return r;
}

if (read_mtime != tier_ctx.o.meta.mtime) {
/* raced */
return -ECANCELED;
}

tier_ctx.rctx.set_atomic(tier_ctx.obj);

RGWRados::Object::Write obj_op(&op_target);
RGWObjState *s = tier_ctx.rctx.get_state(tier_ctx.obj);

obj_op.meta.modify_tail = true;
obj_op.meta.flags = PUT_OBJ_CREATE;
obj_op.meta.category = RGWObjCategory::CloudTiered;
obj_op.meta.delete_at = real_time();
obj_op.meta.data = NULL;
bufferlist blo;
blo.append("");
obj_op.meta.data = &blo;
obj_op.meta.if_match = NULL;
obj_op.meta.if_nomatch = NULL;
obj_op.meta.user_data = NULL;
obj_op.meta.zones_trace = NULL;
obj_op.meta.delete_at = real_time();

RGWObjManifest *pmanifest;

pmanifest = &(*s->manifest);
RGWObjTier tier_config;
tier_config.name = oc.tier.storage_class;
tier_config.tier_placement = oc.tier;
tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;

pmanifest->set_tier_type("cloud");
pmanifest->set_tier_config(tier_config);
Expand All @@ -1336,7 +1348,9 @@ class LCOpAction_Transition : public LCOpAction {
/* should the obj_size also be set to '0' or is it needed
* to keep track of original size before transition.
* But unless obj_size is set to '0', obj_iters cannot
* be reset I guess
* be reset I guess. For regular transitioned objects
* obj_size remains the same even when object is moved to other
* storage class. So maybe better to keep it the same way.
*/
//pmanifest->set_obj_size(0);

Expand All @@ -1347,6 +1361,8 @@ class LCOpAction_Transition : public LCOpAction {
bl.append(oc.tier.storage_class);
attrs[RGW_ATTR_STORAGE_CLASS] = bl;

attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_TAIL_TAG);

obj_op.write_meta(tier_ctx.o.meta.size, 0, attrs, null_yield);
if (r < 0) {
Expand Down Expand Up @@ -1391,7 +1407,19 @@ class LCOpAction_Transition : public LCOpAction {
tier_ctx.multipart_sync_threshold = oc.tier.multipart_sync_threshold;
tier_ctx.storage_class = oc.tier.storage_class;

ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
bool al_tiered = false;
ret = crs.run(new RGWLCCloudCheckCR(tier_ctx, &al_tiered));

if (ret < 0) {
ldpp_dout(oc.dpp, 0) << "XXXXXXXXXXXXXX failed in RGWCloudCheckCR() ret=" << ret << dendl;
}

if (!al_tiered) {
ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered false" << dendl;
ret = crs.run(new RGWLCCloudTierCR(tier_ctx));
} else {
ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX lc.cc is_already_tiered true" << dendl;
}
http_manager.stop();

if (ret < 0) {
Expand Down
267 changes: 267 additions & 0 deletions src/rgw/rgw_lc_tier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,207 @@ static void init_headers(map<string, bufferlist>& attrs,
}
}

static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
{
for (auto header : headers) {
const string& val = header.second;
if (header.first == "RGWX_OBJECT_SIZE") {
info->content_len = atoi(val.c_str());
} else {
info->attrs[header.first] = val;
}
}

info->acls.set_ctx(cct);
auto aiter = attrs.find(RGW_ATTR_ACL);
if (aiter != attrs.end()) {
bufferlist& bl = aiter->second;
auto bliter = bl.cbegin();
try {
info->acls.decode(bliter);
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
return -EIO;
}
} else {
ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
}

return 0;
}

class RGWLCStreamGetCRF : public RGWStreamReadHTTPResourceCRF
{
RGWRESTConn::get_obj_params req_params;

CephContext *cct;
RGWHTTPManager *http_manager;
rgw_lc_obj_properties obj_properties;
std::shared_ptr<RGWRESTConn> conn;
rgw::sal::RGWObject* dest_obj;
string etag;
RGWRESTStreamRWRequest *in_req;
map<string, string> headers;

public:
RGWLCStreamGetCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWHTTPManager *_http_manager,
const rgw_lc_obj_properties& _obj_properties,
std::shared_ptr<RGWRESTConn> _conn,
rgw::sal::RGWObject* _dest_obj) :
RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _http_manager, _dest_obj->get_key()),
cct(_cct), http_manager(_http_manager), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
}



int init() override {
/* init input connection */

req_params.get_op = false; /* Need only headers */
// req_params.skip_decrypt = false;
req_params.prepend_metadata = true;
req_params.rgwx_stat = true;
req_params.sync_manifest = true;
req_params.skip_decrypt = true;

// req_params.unmod_ptr = &src_properties.mtime;
// req_params.etag = src_properties.etag;
// req_params.mod_zone_id = src_properties.zone_short_id;
// req_params.mod_pg_ver = src_properties.pg_ver;

// if (range.is_set) {
// req_params.range_is_set = true;
// req_params.range_start = range.ofs;
// req_params.range_end = range.ofs + range.size - 1;
// }

int ret = conn->get_obj(dest_obj, req_params, false /* send */, &in_req);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
return ret;
}

set_req(in_req);

return RGWStreamReadHTTPResourceCRF::init();
}

int init2() {
/* init input connection */

req_params.get_op = false; /* Need only headers */
// req_params.skip_decrypt = false;
req_params.prepend_metadata = true;
req_params.rgwx_stat = true;
req_params.sync_manifest = true;
req_params.skip_decrypt = true;

// req_params.unmod_ptr = &src_properties.mtime;
// req_params.etag = src_properties.etag;
// req_params.mod_zone_id = src_properties.zone_short_id;
// req_params.mod_pg_ver = src_properties.pg_ver;

// if (range.is_set) {
// req_params.range_is_set = true;
// req_params.range_start = range.ofs;
// req_params.range_end = range.ofs + range.size - 1;
// }

string etag;
real_time set_mtime;

int ret = conn->get_obj(dest_obj, req_params, true /* send */, &in_req);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
return ret;
}

ret = conn->complete_request(in_req, nullptr, nullptr,
nullptr, nullptr, &headers);
if (ret < 0 && ret != -ENOENT) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): XXXXXXXXXXXX conn->complete_request() returned ret=" << ret << dendl;
return ret;
}
// set_req(in_req);

// return RGWStreamReadHTTPResourceCRF::init();
return 0;
}

int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
map<string, bufferlist> src_attrs;

ldout(cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;

if (extra_data.length() > 0) {
JSONParser jp;
if (!jp.parse(extra_data.c_str(), extra_data.length())) {
ldout(cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
return -EIO;
}

JSONDecoder::decode_json("attrs", src_attrs, &jp);
}
return do_decode_rest_obj(cct, src_attrs, headers, &rest_obj);
}

void handle_headers(const map<string, string>& _headers) {
headers = _headers;
}

int is_already_tiered() {
char buf[32];
/*rgw_rest_obj rest_obj;
rest_obj.init(dest_obj->get_key());


if (do_decode_rest_obj(cct, attrs, headers, &rest_obj)) {
ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
return set_cr_error(-EINVAL);
}

for (auto header : headers) {
const string& val = header.second;
if (header.first == "RGWX_OBJECT_SIZE") {
info->content_len = atoi(val.c_str());
} else {
info->attrs[header.first] = val;
}
}*/

map<string, string> attrs = headers;
// req->get_out_headers(&attrs);
// get_attrs(&attrs);

for (auto a : attrs) {
ldout(cct, 0) << "XXXXXXXXXXXXXX GetCrf attr[" << a.first << "] = " << a.second <<dendl;
}
utime_t ut(obj_properties.mtime);
snprintf(buf, sizeof(buf), "%lld.%09lld",
(long long)ut.sec(),
(long long)ut.nsec());

string s = attrs["X_AMZ_META_RGWX_SOURCE_MTIME"];

if (s.empty())
s = attrs["x_amz_meta_rgwx_source_mtime"];

ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered attrs[X_AMZ_META_RGWX_SOURCE_MTIME] = " << s <<dendl;
ldout(cct, 0) << "XXXXXXXXXXXXXX is_already_tiered mtime buf = " << buf <<dendl;
if (!s.empty() && !strcmp(s.c_str(), buf)){
return 1;
}
return 0;
}

bool need_extra_data() override {
return true;
}
};

class RGWLCStreamReadCRF : public RGWStreamReadCRF
{
CephContext *cct;
Expand Down Expand Up @@ -966,6 +1167,71 @@ class RGWLCStreamObjToCloudMultipartCR : public RGWCoroutine {
}
};

int RGWLCCloudCheckCR::operate() {
/* Check if object has already been transitioned */
rgw_lc_obj_properties obj_properties(tier_ctx.o.meta.mtime,
tier_ctx.o.meta.etag,
tier_ctx.o.versioned_epoch,
tier_ctx.acl_mappings,
tier_ctx.target_storage_class);

rgw_bucket target_bucket;
string target_obj_name;

target_bucket.name = tier_ctx.target_bucket_name;
target_obj_name = tier_ctx.obj.key.name; // cross check with aws module

std::shared_ptr<rgw::sal::RGWRadosBucket> dest_bucket;
dest_bucket.reset(new rgw::sal::RGWRadosBucket(tier_ctx.store, target_bucket));

std::shared_ptr<rgw::sal::RGWRadosObject> dest_obj;
dest_obj.reset(new rgw::sal::RGWRadosObject(tier_ctx.store, rgw_obj_key(target_obj_name), (rgw::sal::RGWRadosBucket *)(dest_bucket.get())));


// std::shared_ptr<RGWStreamReadHTTPResourceCRF> get_crf;
std::shared_ptr<RGWLCStreamGetCRF> get_crf;
get_crf.reset(new RGWLCStreamGetCRF((CephContext *)(tier_ctx.cct), get_env(), this,
(RGWHTTPManager*)(tier_ctx.http_manager),
obj_properties, tier_ctx.conn, static_cast<rgw::sal::RGWObject *>(dest_obj.get())));
int ret;

// yield {
ret = get_crf->init2();
// }
if (ret < 0) {
ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->init failed, ret = " << ret << dendl;
return set_cr_error(ret);
}
//reenter(this) {
bl.clear();
/* do {
// yield {
ret = get_crf->get_headers(&need_retry);
if (ret < 0) {
ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX get_crf->read failed, ret = " << ret << dendl;
return set_cr_error(ret);
// }
}
if (retcode < 0) {
ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl;
return set_cr_error(ret);
}
} while (need_retry); */

if ((static_cast<RGWLCStreamGetCRF *>(get_crf.get()))->is_already_tiered()) {
*already_tiered = true;
ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered true" << dendl;
return set_cr_done();
}

ldout(tier_ctx.cct, 0) << "XXXXXXXXXXXXXX is_already_tiered false..going with out_crf writing" << dendl;

return set_cr_done();
// } //reenter

return 0;
}

map <pair<string, string>, utime_t> target_buckets;

int RGWLCCloudTierCR::operate() {
Expand Down Expand Up @@ -1035,6 +1301,7 @@ int RGWLCCloudTierCR::operate() {
if (size < multipart_sync_threshold) {
call (new RGWLCStreamObjToCloudPlainCR(tier_ctx));
} else {
tier_ctx.is_multipart_upload = true;
call(new RGWLCStreamObjToCloudMultipartCR(tier_ctx));

}
Expand Down
Loading

0 comments on commit 557b519

Please sign in to comment.