Skip to content

Commit

Permalink
cls/rgw: adding an entry to reshard queue has O_CREAT option
Browse files Browse the repository at this point in the history
Adds the ability to prevent overwriting a reshard queue (log) entry
for a given bucket with a newer entry. This adds a flag to the op, so
it will either CREATE or make no changes. If an entry already exists
when this flag is set, -EEXIST will be returned.

This is a preparatory step to adding shard reduction to dynamic
resharding.

Signed-off-by: J. Eric Ivancich <[email protected]>
  • Loading branch information
ivancich committed May 30, 2024
1 parent 64e49fa commit 9884e64
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 12 deletions.
24 changes: 20 additions & 4 deletions src/cls/rgw/cls_rgw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4422,15 +4422,31 @@ static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist
return -EINVAL;
}


string key;
std::string key;
op.entry.get_key(&key);

int ret;
bufferlist bl;

if (op.create_only) {
ret = cls_cxx_map_get_val(hctx, key, &bl);
if (ret == 0) {
// entry already exists; make no changes
return -EEXIST;
} else if (ret != -ENOENT) {
CLS_ERR("error accessing reshard queue for %s with key %s",
op.entry.bucket_name.c_str(), key.c_str());
return ret;
}

// we got a -ENOENT and can just fall through...
}

encode(op.entry, bl);
int ret = cls_cxx_map_set_val(hctx, key, &bl);
ret = cls_cxx_map_set_val(hctx, key, &bl);
if (ret < 0) {
CLS_ERR("error adding reshard job for bucket %s with key %s",op.entry.bucket_name.c_str(), key.c_str());
CLS_ERR("error adding reshard job for bucket %s with key %s",
op.entry.bucket_name.c_str(), key.c_str());
return ret;
}

Expand Down
5 changes: 4 additions & 1 deletion src/cls/rgw/cls_rgw_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1083,11 +1083,14 @@ void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op,
op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in);
}

void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
void cls_rgw_reshard_add(librados::ObjectWriteOperation& op,
const cls_rgw_reshard_entry& entry,
const bool create_only)
{
bufferlist in;
cls_rgw_reshard_add_op call;
call.entry = entry;
call.create_only = create_only;
encode(call, in);
op.exec(RGW_CLASS, RGW_RESHARD_ADD, in);
}
Expand Down
4 changes: 3 additions & 1 deletion src/cls/rgw/cls_rgw_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,9 @@ int cls_rgw_lc_list(librados::IoCtx& io_ctx, const std::string& oid,
void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op, const std::string& part_key, const RGWUploadPartInfo& info);

/* resharding */
void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
void cls_rgw_reshard_add(librados::ObjectWriteOperation& op,
const cls_rgw_reshard_entry& entry,
const bool create_only);
void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
// these overloads which call io_ctx.operate() should not be called in the rgw.
// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
Expand Down
14 changes: 11 additions & 3 deletions src/cls/rgw/cls_rgw_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -1480,19 +1480,27 @@ struct cls_rgw_mp_upload_part_info_update_op {
WRITE_CLASS_ENCODER(cls_rgw_mp_upload_part_info_update_op)

struct cls_rgw_reshard_add_op {
cls_rgw_reshard_entry entry;
cls_rgw_reshard_entry entry;

// true -> will not overwrite existing entry
bool create_only {false};

cls_rgw_reshard_add_op() {}

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
ENCODE_START(2, 1, bl);
encode(entry, bl);
encode(create_only, bl);
ENCODE_FINISH(bl);
}

void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
DECODE_START(2, bl);
decode(entry, bl);
create_only = false;
if (struct_v >= 2) {
decode(create_only, bl);
}
DECODE_FINISH(bl);
}
static void generate_test_instances(std::list<cls_rgw_reshard_add_op*>& o);
Expand Down
20 changes: 17 additions & 3 deletions src/rgw/driver/rados/rgw_reshard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1041,11 +1041,25 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry,
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);

librados::ObjectWriteOperation op;
cls_rgw_reshard_add(op, entry);

// if we're reducing, we don't want to overwrite an existing entry
// in order to not interfere with the reshard reduction wait period
const bool create_only = entry.new_num_shards < entry.old_num_shards;

cls_rgw_reshard_add(op, entry, create_only);

int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
if (create_only && ret == -EEXIST) {
ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" <<
logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
entry.bucket_name <<
", because it's a reshard reduction and an entry for that bucket already exists" <<
dendl;
// this is not an error so just fall through
} else if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" <<
logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
entry.bucket_name << dendl;
return ret;
}
return 0;
Expand Down

0 comments on commit 9884e64

Please sign in to comment.