Skip to content

Commit

Permalink
cls/2pc_queue: add async API to read operations
Browse files Browse the repository at this point in the history
Signed-off-by: Yuval Lifshitz <[email protected]>
  • Loading branch information
yuvalif committed May 17, 2020
1 parent acefe44 commit c296680
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 79 deletions.
121 changes: 89 additions & 32 deletions src/cls/2pc_queue/cls_2pc_queue_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,54 @@ void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name,
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
}

int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
cls_queue_get_capacity_ret op_ret;
auto iter = bl.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}

size = op_ret.queue_capacity;

return 0;
}

#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) {
bufferlist in, out;
const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
if (r < 0 ) {
return r;
}

cls_queue_get_capacity_ret op_ret;
auto iter = out.cbegin();
return cls_2pc_queue_get_capacity_result(out, size);
}
#endif

// optionally async method for getting capacity (bytes)
// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
}


int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
cls_2pc_queue_reserve_ret op_ret;
auto iter = bl.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}

size = op_ret.queue_capacity;
res_id = op_ret.id;

return 0;
}

int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::ObjectWriteOperation& op,
int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name,
uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
bufferlist in, out;
cls_2pc_queue_reserve_op reserve_op;
Expand All @@ -46,22 +73,25 @@ int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, librados::Obj

encode(reserve_op, in);
int rval;
ObjectWriteOperation op;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);

if (r < 0) {
return r;
}

return cls_2pc_queue_reserve_result(out, res_id);
}

cls_2pc_queue_reserve_ret op_ret;
auto iter = out.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
res_id = op_ret.id;

return 0;
void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size,
uint32_t entries, bufferlist* obl, int* prval) {
bufferlist in;
cls_2pc_queue_reserve_op reserve_op;
reserve_op.size = res_size;
reserve_op.entries = entries;
encode(reserve_op, in);
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval);
}

void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec,
Expand All @@ -82,22 +112,10 @@ void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
}

int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
std::vector<cls_queue_entry>& entries,
int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
bool *truncated, std::string& next_marker) {
bufferlist in, out;
cls_queue_list_op op;
op.start_marker = marker;
op.max = max;
encode(op, in);

const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
if (r < 0) {
return r;
}

cls_queue_list_ret ret;
auto iter = out.cbegin();
auto iter = bl.cbegin();
try {
decode(ret, iter);
} catch (buffer::error& err) {
Expand All @@ -112,16 +130,37 @@ int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const st
return 0;
}

int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max,
std::vector<cls_queue_entry>& entries,
bool *truncated, std::string& next_marker) {
bufferlist in, out;
cls_queue_list_op op;
op.start_marker = marker;
op.max = max;
encode(op, in);

const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
if (r < 0) {
return r;
}
return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker);
}
#endif

void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) {
bufferlist in;
cls_queue_list_op list_op;
list_op.start_marker = marker;
list_op.max = max;
encode(list_op, in);

op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval);
}

int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) {
cls_2pc_queue_reservations_ret ret;
auto iter = out.cbegin();
auto iter = bl.cbegin();
try {
decode(ret, iter);
} catch (buffer::error& err) {
Expand All @@ -133,6 +172,24 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string&
return 0;
}

#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
bufferlist in, out;

const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
if (r < 0) {
return r;
}
return cls_2pc_queue_list_reservations_result(out, reservations);
}
#endif

void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;

op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
}

void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
bufferlist in;
cls_queue_remove_op rem_op;
Expand Down
47 changes: 40 additions & 7 deletions src/cls/2pc_queue/cls_2pc_queue_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,41 @@
// and more may be allocated as xattrs of the object (depending with the number of concurrent reservations)
void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size);

// return max capacity (bytes)
// these overloads which call io_ctx.operate() or io_ctx.exec() should not be called in the rgw.
// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()/exec()
#ifndef CLS_CLIENT_HIDE_IOCTX
// return capacity (bytes)
int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size);

// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// return a reservation id if reservations is possible, 0 otherwise
int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, librados::ObjectWriteOperation& op,
int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name,
uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id);

// incremental listing of all entries in the queue
int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);

// list all pending reservations in the queue
int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
#endif

// optionally async method for getting capacity (bytes)
// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);

int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);

// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// notes:
// (1) make sure that librados::OPERATION_RETURNVEC is passed to the executing function
// (2) multiple operations cannot be executed in a batch (operations both read and write)
// after answer is received, call cls_2pc_queue_reserve_result() to parse the results
void cls_2pc_queue_reserve(librados::ObjectWriteOperation& op, uint64_t res_size,
uint32_t entries, bufferlist* obl, int* prval);

int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id);

// commit data using a reservation done beforehand
// res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once
// the size of bl_data_vec must be equal or smaller to the size reserved for the res_id
Expand All @@ -35,12 +62,18 @@ void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector<buffer
void cls_2pc_queue_abort(librados::ObjectWriteOperation& op,
cls_2pc_reservation::id_t res_id);

// incremental listing of all entries in the queue
int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
// optionally async incremental listing of all entries in the queue
// after answer is received, call cls_2pc_queue_list_entries_result() to parse the results
void cls_2pc_queue_list_entries(librados::ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval);

// list all pending reservations in the queue
int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
bool *truncated, std::string& next_marker);

// optionally async listing of all pending reservations in the queue
// after answer is received, call cls_2pc_queue_list_reservations_result() to parse the results
void cls_2pc_queue_list_reservations(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);

int cls_2pc_queue_list_reservations_result(const librados::bufferlist& bl, cls_2pc_reservations& reservations);

// remove all entries up to the given marker
void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
Expand Down
Loading

0 comments on commit c296680

Please sign in to comment.