Skip to content

Commit

Permalink
librados: add cmpext API
Browse files Browse the repository at this point in the history
The compare-extent (cmpext) operation allows callers to compare existing
object contents with an arbitrary buffer. cmpext requests can be
compounded with read and write operations, allowing for atomic object
content updates. return 0 on success, negative error code
on failure, (-MAX_ERRNO - mismatch_off) on mismatch

This commit is based on Mike Christie's initial C++ API, with the
addition of AIO support and a C API. Response marshalling was also
reworked, so that the miscompare offset is unmarshalled transparently to
the caller.

Signed-off-by: Zhengyong Wang <[email protected]>
Signed-off-by: David Disseldorp <[email protected]>
  • Loading branch information
wangzhengyong committed Apr 26, 2017
1 parent 351f78d commit 0ccebc5
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 4 deletions.
70 changes: 68 additions & 2 deletions src/include/rados/librados.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ struct rados_cluster_stat_t {
* - Creating objects: rados_write_op_create()
* - IO on objects: rados_write_op_append(), rados_write_op_write(), rados_write_op_zero
* rados_write_op_write_full(), rados_write_op_writesame(), rados_write_op_remove,
* rados_write_op_truncate(), rados_write_op_zero()
* rados_write_op_truncate(), rados_write_op_zero(), rados_write_op_cmpext()
* - Hints: rados_write_op_set_alloc_hint()
* - Performing the operation: rados_write_op_operate(), rados_aio_write_op_operate()
*/
Expand All @@ -336,7 +336,8 @@ typedef void *rados_write_op_t;
* rados_read_op_omap_cmp()
* - Object properties: rados_read_op_stat(), rados_read_op_assert_exists(),
* rados_read_op_assert_version()
* - IO on objects: rados_read_op_read(), rados_read_op_checksum()
* - IO on objects: rados_read_op_read(), rados_read_op_checksum(),
* rados_read_op_cmpext()
* - Custom operations: rados_read_op_exec(), rados_read_op_exec_user_buf()
* - Request properties: rados_read_op_set_flags()
* - Performing the operation: rados_read_op_operate(),
Expand Down Expand Up @@ -1519,6 +1520,21 @@ CEPH_RADOS_API int rados_remove(rados_ioctx_t io, const char *oid);
CEPH_RADOS_API int rados_trunc(rados_ioctx_t io, const char *oid,
uint64_t size);

/**
* Compare an on-disk object range with a buffer
*
* @param io the context in which to perform the comparison
* @param o name of the object
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API int rados_cmpext(rados_ioctx_t io, const char *o,
const char *cmp_buf, size_t cmp_len,
uint64_t off);

/**
* @name Xattrs
* Extended attributes are stored as extended attributes on the files
Expand Down Expand Up @@ -2110,6 +2126,24 @@ CEPH_RADOS_API int rados_aio_stat(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *psize, time_t *pmtime);

/**
* Asynchronously compare an on-disk object range with a buffer
*
* @param io the context in which to perform the comparison
* @param o the name of the object to compare with
* @param completion what to do when the comparison is complete
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API int rados_aio_cmpext(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cmp_buf,
size_t cmp_len,
uint64_t off);

/**
* Cancel async operation
*
Expand Down Expand Up @@ -2722,6 +2756,22 @@ CEPH_RADOS_API void rados_write_op_assert_exists(rados_write_op_t write_op);
*/
CEPH_RADOS_API void rados_write_op_assert_version(rados_write_op_t write_op, uint64_t ver);

/**
* Ensure that given object range (extent) satisfies comparison.
*
* @param write_op operation to add this action to
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @param prval returned result of comparison, 0 on success, negative error code
* on failure, (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API void rados_write_op_cmpext(rados_write_op_t write_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval);

/**
* Ensure that given xattr satisfies comparison.
* If the comparison is not satisfied, the return code of the
Expand Down Expand Up @@ -3023,6 +3073,22 @@ CEPH_RADOS_API void rados_read_op_assert_exists(rados_read_op_t read_op);
*/
CEPH_RADOS_API void rados_read_op_assert_version(rados_read_op_t read_op, uint64_t ver);

/**
* Ensure that given object range (extent) satisfies comparison.
*
* @param read_op operation to add this action to
* @param cmp_buf buffer containing bytes to be compared with object contents
* @param cmp_len length to compare and size of @cmp_buf in bytes
* @param off object byte offset at which to start the comparison
* @param prval returned result of comparison, 0 on success, negative error code
* on failure, (-MAX_ERRNO - mismatch_off) on mismatch
*/
CEPH_RADOS_API void rados_read_op_cmpext(rados_read_op_t read_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval);

/**
* Ensure that the an xattr satisfies a comparison
* If the comparison is not satisfied, the return code of the
Expand Down
16 changes: 16 additions & 0 deletions src/include/rados/librados.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ namespace librados
//flag mean ObjectOperationFlags
void set_op_flags2(int flags);

void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval);
void cmpxattr(const char *name, uint8_t op, const bufferlist& val);
void cmpxattr(const char *name, uint8_t op, uint64_t v);
void exec(const char *cls, const char *method, bufferlist& inbl);
Expand Down Expand Up @@ -755,6 +756,7 @@ namespace librados
int remove(const std::string& oid, int flags);
int trunc(const std::string& oid, uint64_t size);
int mapext(const std::string& o, uint64_t off, size_t len, std::map<uint64_t,uint64_t>& m);
int cmpext(const std::string& o, uint64_t off, bufferlist& cmp_bl);
int sparse_read(const std::string& o, std::map<uint64_t,uint64_t>& m, bufferlist& bl, size_t len, uint64_t off);
int getxattr(const std::string& oid, const char *name, bufferlist& bl);
int getxattrs(const std::string& oid, std::map<std::string, bufferlist>& attrset);
Expand Down Expand Up @@ -991,6 +993,20 @@ namespace librados
int aio_sparse_read(const std::string& oid, AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid);
/**
* Asynchronously compare an on-disk object range with a buffer
*
* @param oid the name of the object to read from
* @param c what to do when the read is complete
* @param off object byte offset at which to start the comparison
* @param cmp_bl buffer containing bytes to be compared with object contents
* @returns 0 on success, negative error code on failure,
* (-MAX_ERRNO - mismatch_off) on mismatch
*/
int aio_cmpext(const std::string& oid,
librados::AioCompletion *c,
uint64_t off,
bufferlist& cmp_bl);
int aio_write(const std::string& oid, AioCompletion *c, const bufferlist& bl,
size_t len, uint64_t off);
int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl,
Expand Down
60 changes: 60 additions & 0 deletions src/librados/IoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,54 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
return 0;
}

int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
AioCompletionImpl *c,
uint64_t off,
bufferlist& cmp_bl)
{
if (cmp_bl.length() > UINT_MAX/2)
return -E2BIG;

Context *onack = new C_aio_Complete(c);

c->is_read = true;
c->io = this;

Objecter::Op *o = objecter->prepare_cmpext_op(
oid, oloc, off, cmp_bl, snap_seq, 0,
onack, &c->objver);
objecter->op_submit(o, &c->tid);

return 0;
}

/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
AioCompletionImpl *c,
const char *cmp_buf,
size_t cmp_len,
uint64_t off)
{
if (cmp_len > UINT_MAX/2)
return -E2BIG;

bufferlist cmp_bl;
cmp_bl.append(cmp_buf, cmp_len);

Context *nested = new C_aio_Complete(c);
C_ObjectOperation *onack = new C_ObjectOperation(nested);

c->is_read = true;
c->io = this;

onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);

Objecter::Op *o = objecter->prepare_read_op(
oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
objecter->op_submit(o, &c->tid);
return 0;
}

int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len,
uint64_t off)
Expand Down Expand Up @@ -1376,6 +1424,18 @@ int librados::IoCtxImpl::read(const object_t& oid,
return bl.length();
}

int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
bufferlist& cmp_bl)
{
if (cmp_bl.length() > UINT_MAX/2)
return -E2BIG;

::ObjectOperation op;
prepare_assert_ops(&op);
op.cmpext(off, cmp_bl, NULL);
return operate_read(oid, &op, NULL);
}

int librados::IoCtxImpl::mapext(const object_t& oid,
uint64_t off, size_t len,
std::map<uint64_t,uint64_t>& m)
Expand Down
5 changes: 5 additions & 0 deletions src/librados/IoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ struct librados::IoCtxImpl {
int stat(const object_t& oid, uint64_t *psize, time_t *pmtime);
int stat2(const object_t& oid, uint64_t *psize, struct timespec *pts);
int trunc(const object_t& oid, uint64_t size);
int cmpext(const object_t& oid, uint64_t off, bufferlist& cmp_bl);

int tmap_update(const object_t& oid, bufferlist& cmdbl);
int tmap_put(const object_t& oid, bufferlist& bl);
Expand Down Expand Up @@ -191,6 +192,10 @@ struct librados::IoCtxImpl {
int aio_sparse_read(const object_t oid, AioCompletionImpl *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid);
int aio_cmpext(const object_t& oid, AioCompletionImpl *c, uint64_t off,
bufferlist& cmp_bl);
int aio_cmpext(const object_t& oid, AioCompletionImpl *c,
const char *cmp_buf, size_t cmp_len, uint64_t off);
int aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len, uint64_t off);
int aio_append(const object_t &oid, AioCompletionImpl *c,
Expand Down
79 changes: 78 additions & 1 deletion src/librados/librados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ void librados::ObjectOperation::set_op_flags2(int flags)
::set_op_flags(o, flags);
}

void librados::ObjectOperation::cmpext(uint64_t off,
bufferlist &cmp_bl,
int *prval)
{
::ObjectOperation *o = &impl->o;
o->cmpext(off, cmp_bl, prval);
}

void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, const bufferlist& v)
{
::ObjectOperation *o = &impl->o;
Expand Down Expand Up @@ -1222,6 +1230,12 @@ int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len,
return io_ctx_impl->mapext(obj, off, len, m);
}

int librados::IoCtx::cmpext(const std::string& oid, uint64_t off, bufferlist& cmp_bl)
{
object_t obj(oid);
return io_ctx_impl->cmpext(obj, off, cmp_bl);
}

int librados::IoCtx::sparse_read(const std::string& oid, std::map<uint64_t,uint64_t>& m,
bufferlist& bl, size_t len, uint64_t off)
{
Expand Down Expand Up @@ -1829,6 +1843,14 @@ int librados::IoCtx::aio_exec(const std::string& oid,
return io_ctx_impl->aio_exec(obj, c->pc, cls, method, inbl, outbl);
}

int librados::IoCtx::aio_cmpext(const std::string& oid,
librados::AioCompletion *c,
uint64_t off,
bufferlist& cmp_bl)
{
return io_ctx_impl->aio_cmpext(oid, c->pc, off, cmp_bl);
}

int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off)
Expand Down Expand Up @@ -3920,6 +3942,23 @@ extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, tim
return retval;
}

extern "C" int rados_cmpext(rados_ioctx_t io, const char *o,
const char *cmp_buf, size_t cmp_len, uint64_t off)
{
tracepoint(librados, rados_cmpext_enter, io, o, cmp_buf, cmp_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
int ret;
object_t oid(o);

bufferlist cmp_bl;
cmp_bl.append(cmp_buf, cmp_len);

ret = ctx->cmpext(oid, off, cmp_bl);
tracepoint(librados, rados_cmpext_exit, ret);

return ret;
}

extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name,
char *buf, size_t len)
{
Expand Down Expand Up @@ -4723,7 +4762,7 @@ extern "C" int rados_aio_rmxattr(rados_ioctx_t io, const char *o,
return retval;
}

extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
rados_completion_t completion,
uint64_t *psize, time_t *pmtime)
{
Expand All @@ -4736,6 +4775,20 @@ extern "C" int rados_aio_stat(rados_ioctx_t io, const char *o,
return retval;
}

extern "C" int rados_aio_cmpext(rados_ioctx_t io, const char *o,
rados_completion_t completion, const char *cmp_buf,
size_t cmp_len, uint64_t off)
{
tracepoint(librados, rados_aio_cmpext_enter, io, o, completion, cmp_buf,
cmp_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
int retval = ctx->aio_cmpext(oid, (librados::AioCompletionImpl*)completion,
cmp_buf, cmp_len, off);
tracepoint(librados, rados_aio_cmpext_exit, retval);
return retval;
}

extern "C" int rados_aio_cancel(rados_ioctx_t io, rados_completion_t completion)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
Expand Down Expand Up @@ -5196,6 +5249,18 @@ extern "C" void rados_write_op_assert_exists(rados_write_op_t write_op)
tracepoint(librados, rados_write_op_assert_exists_exit);
}

extern "C" void rados_write_op_cmpext(rados_write_op_t write_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval)
{
tracepoint(librados, rados_write_op_cmpext_enter, write_op, cmp_buf,
cmp_len, off, prval);
((::ObjectOperation *)write_op)->cmpext(off, cmp_len, cmp_buf, prval);
tracepoint(librados, rados_write_op_cmpext_exit);
}

extern "C" void rados_write_op_cmpxattr(rados_write_op_t write_op,
const char *name,
uint8_t comparison_operator,
Expand Down Expand Up @@ -5515,6 +5580,18 @@ extern "C" void rados_read_op_assert_exists(rados_read_op_t read_op)
tracepoint(librados, rados_read_op_assert_exists_exit);
}

extern "C" void rados_read_op_cmpext(rados_read_op_t read_op,
const char *cmp_buf,
size_t cmp_len,
uint64_t off,
int *prval)
{
tracepoint(librados, rados_read_op_cmpext_enter, read_op, cmp_buf,
cmp_len, off, prval);
((::ObjectOperation *)read_op)->cmpext(off, cmp_len, cmp_buf, prval);
tracepoint(librados, rados_read_op_cmpext_exit);
}

extern "C" void rados_read_op_cmpxattr(rados_read_op_t read_op,
const char *name,
uint8_t comparison_operator,
Expand Down
Loading

0 comments on commit 0ccebc5

Please sign in to comment.