Skip to content

Commit

Permalink
rgw: reshard guard for versioned operations
Browse files Browse the repository at this point in the history
Signed-off-by: Yehuda Sadeh <[email protected]>
  • Loading branch information
yehudasa committed Jun 5, 2017
1 parent 9677f7d commit e18e271
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 39 deletions.
131 changes: 93 additions & 38 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9522,7 +9522,7 @@ int RGWRados::stat_system_obj(RGWObjectCtx& obj_ctx,
}


int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard *pbs, std::function<int(BucketShard *)> call)
int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call)
{
RGWRados *store = target->get_store();
BucketShard *bs;
Expand Down Expand Up @@ -9691,7 +9691,7 @@ int RGWRados::Bucket::UpdateIndex::cancel()
RGWRados *store = target->get_store();
BucketShard *bs;

ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
});

Expand Down Expand Up @@ -10491,6 +10491,47 @@ int RGWRados::olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjStat
return ret;
}

int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
{
rgw_obj obj;
const rgw_obj *pobj = &obj_instance;
int r;

for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
r = bs->init(pobj->bucket, *pobj);
if (r < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
return r;
}
r = call(bs);
if (r != -ERR_BUSY_RESHARDING) {
break;
}
ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
RGWReshard reshard(this);
string new_bucket_id;
r = reshard.block_while_resharding(bs, &new_bucket_id);
if (r == -ERR_BUSY_RESHARDING) {
continue;
}
if (r < 0) {
return r;
}
ldout(cct, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
i = 0; /* resharding is finished, make sure we can retry */

obj = *pobj;
obj.bucket.update_bucket_id(new_bucket_id);
pobj = &obj;
}

if (r < 0) {
return r;
}

return 0;
}

int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
bool delete_marker,
const string& op_tag,
Expand All @@ -10504,13 +10545,6 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
return r;
}

BucketShard bs(this);
r = bs.init(obj_instance.bucket, obj_instance);
if (r < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
return r;
}

rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
Expand All @@ -10519,10 +10553,23 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
zones_trace.insert(get_zone().id);
}

BucketShard bs(this);

cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
unmod_since, high_precision_time,
get_zone().log_data, zones_trace);
r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
librados::ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
return cls_rgw_bucket_link_olh(bs->index_ctx, op,
bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
unmod_since, high_precision_time,
get_zone().log_data, zones_trace);
});
if (r < 0) {
ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
return r;
}

return 0;
}

void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op)
Expand All @@ -10540,23 +10587,24 @@ int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, con
return r;
}

BucketShard bs(this);
int ret = bs.init(obj_instance.bucket, obj_instance);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}

rgw_zone_set zones_trace;
if (_zones_trace) {
zones_trace = *_zones_trace;
}
zones_trace.insert(get_zone().id);

BucketShard bs(this);

cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data, zones_trace);
if (ret < 0) {
return ret;
r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
librados::ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
olh_tag, olh_epoch, get_zone().log_data, zones_trace);
});
if (r < 0) {
ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
return r;
}

return 0;
Expand Down Expand Up @@ -10584,11 +10632,16 @@ int RGWRados::bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObj

cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());

ObjectReadOperation op;

ret = cls_rgw_get_olh_log(bs.index_ctx, bs.bucket_obj, op, key, ver_marker, olh_tag, log, is_truncated);
if (ret < 0)
ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
ObjectReadOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
key, ver_marker, olh_tag, log, is_truncated);
});
if (ret < 0) {
ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
return ret;
}

return 0;
}
Expand All @@ -10612,13 +10665,16 @@ int RGWRados::bucket_index_trim_olh_log(const RGWBucketInfo& bucket_info, RGWObj

cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());

ObjectWriteOperation op;

cls_rgw_trim_olh_log(op, key, ver, olh_tag);

ret = bs.index_ctx.operate(bs.bucket_obj, &op);
if (ret < 0)
ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
cls_rgw_trim_olh_log(op, key, ver, olh_tag);
return pbs->index_ctx.operate(pbs->bucket_obj, &op);
});
if (ret < 0) {
ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
return ret;
}

return 0;
}
Expand All @@ -10632,17 +10688,16 @@ int RGWRados::bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjSta
}

BucketShard bs(this);
int ret = bs.init(obj_instance.bucket, obj_instance);
if (ret < 0) {
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}

string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());

cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());

ret = cls_rgw_clear_olh(bs.index_ctx, bs.bucket_obj, key, olh_tag);
int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
ObjectWriteOperation op;
cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
});
if (ret < 0) {
ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
return ret;
Expand Down
4 changes: 3 additions & 1 deletion src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -2882,7 +2882,7 @@ class RGWRados
bs_initialized = false;
}

int guard_reshard(BucketShard *pbs, std::function<int(BucketShard *)> call);
int guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call);
public:

UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj),
Expand Down Expand Up @@ -3247,6 +3247,8 @@ class RGWRados
int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);

int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);

void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
int olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
Expand Down

0 comments on commit e18e271

Please sign in to comment.