Skip to content

Commit

Permalink
librados: add writesame API
Browse files Browse the repository at this point in the history
The writesame operation allows callers to write the same data buffer
multiple times to a given object.

Signed-off-by: David Disseldorp <[email protected]>
  • Loading branch information
ddiss committed Apr 25, 2016
1 parent e334ba9 commit 161d67c
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 2 deletions.
59 changes: 57 additions & 2 deletions src/include/rados/librados.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ struct rados_cluster_stat_t {
* rados_write_op_assert_version()
* - Creating objects: rados_write_op_create()
* - IO on objects: rados_write_op_append(), rados_write_op_write(), rados_write_op_zero
* rados_write_op_write_full(), rados_write_op_remove, rados_write_op_truncate(),
* rados_write_op_zero()
* rados_write_op_write_full(), rados_write_op_writesame(), rados_write_op_remove,
* rados_write_op_truncate(), rados_write_op_zero()
* - Hints: rados_write_op_set_alloc_hint()
* - Performing the operation: rados_write_op_operate(), rados_aio_write_op_operate()
*/
Expand Down Expand Up @@ -1333,6 +1333,24 @@ CEPH_RADOS_API int rados_write(rados_ioctx_t io, const char *oid,
CEPH_RADOS_API int rados_write_full(rados_ioctx_t io, const char *oid,
const char *buf, size_t len);

/**
* Write the same *data_len* bytes from *buf* multiple times into the
* *oid* object. *write_len* bytes are written in total, which must be
* a multiple of *data_len*. The value of *write_len* and *data_len*
* must be <= UINT_MAX/2.
*
* @param io the io context in which the write will occur
* @param oid name of the object
* @param buf data to write
* @param data_len length of the data, in bytes
* @param write_len the total number of bytes to write
* @param off byte offset in the object to begin writing at
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_writesame(rados_ioctx_t io, const char *oid,
const char *buf, size_t data_len,
size_t write_len, uint64_t off);

/**
* Efficiently copy a portion of one object to another
*
Expand Down Expand Up @@ -1889,6 +1907,29 @@ CEPH_RADOS_API int rados_aio_write_full(rados_ioctx_t io, const char *oid,
rados_completion_t completion,
const char *buf, size_t len);

/**
* Asychronously write the same buffer multiple times
*
* Queues the writesame and returns.
*
* The return value of the completion will be 0 on success, negative
* error code on failure.
*
* @param io the io context in which the write will occur
* @param oid name of the object
* @param completion what to do when the writesame is safe and complete
* @param buf data to write
* @param data_len length of the data, in bytes
* @param write_len the total number of bytes to write
* @param off byte offset in the object to begin writing at
* @returns 0 on success, -EROFS if the io context specifies a snap_seq
* other than LIBRADOS_SNAP_HEAD
*/
CEPH_RADOS_API int rados_aio_writesame(rados_ioctx_t io, const char *oid,
rados_completion_t completion,
const char *buf, size_t data_len,
size_t write_len, uint64_t off);

/**
* Asychronously remove an object
*
Expand Down Expand Up @@ -2510,6 +2551,20 @@ CEPH_RADOS_API void rados_write_op_write_full(rados_write_op_t write_op,
const char *buffer,
size_t len);

/**
* Write the same buffer multiple times
* @param write_op operation to add this action to
* @param buffer bytes to write
* @param data_len length of buffer
* @param write_len total number of bytes to write, as a multiple of @data_len
* @param offset offset to write to
*/
CEPH_RADOS_API void rados_write_op_writesame(rados_write_op_t write_op,
const char *buffer,
size_t data_len,
size_t write_len,
uint64_t offset);

/**
* Append to end of object.
* @param write_op operation to add this action to
Expand Down
6 changes: 6 additions & 0 deletions src/include/rados/librados.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ namespace librados

void write(uint64_t off, const bufferlist& bl);
void write_full(const bufferlist& bl);
void writesame(uint64_t off, uint64_t write_len,
const bufferlist& bl);
void append(const bufferlist& bl);
void remove();
void truncate(uint64_t off);
Expand Down Expand Up @@ -683,6 +685,8 @@ namespace librados
* NOTE: this call steals the contents of @param bl.
*/
int write_full(const std::string& oid, bufferlist& bl);
int writesame(const std::string& oid, bufferlist& bl,
size_t write_len, uint64_t off);
int clone_range(const std::string& dst_oid, uint64_t dst_off,
const std::string& src_oid, uint64_t src_off,
size_t len);
Expand Down Expand Up @@ -908,6 +912,8 @@ namespace librados
int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl,
size_t len);
int aio_write_full(const std::string& oid, AioCompletion *c, const bufferlist& bl);
int aio_writesame(const std::string& oid, AioCompletion *c, const bufferlist& bl,
size_t write_len, uint64_t off);

/**
* Asychronously remove an object
Expand Down
47 changes: 47 additions & 0 deletions src/librados/IoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,21 @@ int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
return operate(oid, &op, NULL);
}

int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
size_t write_len, uint64_t off)
{
if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
return -E2BIG;
if ((bl.length() == 0) || (write_len % bl.length()))
return -EINVAL;
::ObjectOperation op;
prepare_assert_ops(&op);
bufferlist mybl;
mybl.substr_of(bl, 0, bl.length());
op.writesame(off, write_len, mybl);
return operate(oid, &op, NULL);
}

int librados::IoCtxImpl::clone_range(const object_t& dst_oid,
uint64_t dst_offset,
const object_t& src_oid,
Expand Down Expand Up @@ -934,6 +949,38 @@ int librados::IoCtxImpl::aio_write_full(const object_t &oid,
return 0;
}

int librados::IoCtxImpl::aio_writesame(const object_t &oid,
AioCompletionImpl *c,
const bufferlist& bl,
size_t write_len,
uint64_t off)
{
auto ut = ceph::real_clock::now(client->cct);

if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
return -E2BIG;
if ((bl.length() == 0) || (write_len % bl.length()))
return -EINVAL;
/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
return -EROFS;

Context *onack = new C_aio_Ack(c);
Context *onsafe = new C_aio_Safe(c);

c->io = this;
queue_aio_write(c);

Objecter::Op *o = objecter->prepare_writesame_op(
oid, oloc,
write_len, off,
snapc, bl, ut, 0,
onack, onsafe, &c->objver);
objecter->op_submit(o, &c->tid);

return 0;
}

int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c)
{
auto ut = ceph::real_clock::now(client->cct);
Expand Down
4 changes: 4 additions & 0 deletions src/librados/IoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ struct librados::IoCtxImpl {
int write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
int append(const object_t& oid, bufferlist& bl, size_t len);
int write_full(const object_t& oid, bufferlist& bl);
int writesame(const object_t& oid, bufferlist& bl,
size_t write_len, uint64_t offset);
int clone_range(const object_t& dst_oid, uint64_t dst_offset,
const object_t& src_oid, uint64_t src_offset, uint64_t len);
int read(const object_t& oid, bufferlist& bl, size_t len, uint64_t off);
Expand Down Expand Up @@ -201,6 +203,8 @@ struct librados::IoCtxImpl {
const bufferlist& bl, size_t len);
int aio_write_full(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl);
int aio_writesame(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t write_len, uint64_t off);
int aio_remove(const object_t &oid, AioCompletionImpl *c);
int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
const char *method, bufferlist& inbl, bufferlist *outbl);
Expand Down
70 changes: 70 additions & 0 deletions src/librados/librados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ void librados::ObjectWriteOperation::write_full(const bufferlist& bl)
o->write_full(c);
}

void librados::ObjectWriteOperation::writesame(uint64_t off, uint64_t write_len,
const bufferlist& bl)
{
::ObjectOperation *o = &impl->o;
bufferlist c = bl;
o->writesame(off, write_len, c);
}

void librados::ObjectWriteOperation::append(const bufferlist& bl)
{
::ObjectOperation *o = &impl->o;
Expand Down Expand Up @@ -1184,6 +1192,13 @@ int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl)
return io_ctx_impl->write_full(obj, bl);
}

int librados::IoCtx::writesame(const std::string& oid, bufferlist& bl,
size_t write_len, uint64_t off)
{
object_t obj(oid);
return io_ctx_impl->writesame(obj, bl, write_len, off);
}

int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off,
const std::string& src_oid, uint64_t src_off,
size_t len)
Expand Down Expand Up @@ -1796,6 +1811,14 @@ int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioComplet
return io_ctx_impl->aio_write_full(obj, c->pc, bl);
}

int librados::IoCtx::aio_writesame(const std::string& oid, librados::AioCompletion *c,
const bufferlist& bl, size_t write_len,
uint64_t off)
{
return io_ctx_impl->aio_writesame(oid, c->pc, bl, write_len, off);
}


int librados::IoCtx::aio_remove(const std::string& oid, librados::AioCompletion *c)
{
return io_ctx_impl->aio_remove(oid, c->pc);
Expand Down Expand Up @@ -3279,6 +3302,23 @@ extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf
return retval;
}

extern "C" int rados_writesame(rados_ioctx_t io,
const char *o,
const char *buf,
size_t data_len,
size_t write_len,
uint64_t off)
{
tracepoint(librados, rados_writesame_enter, io, o, buf, data_len, write_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
bl.append(buf, data_len);
int retval = ctx->writesame(oid, bl, write_len, off);
tracepoint(librados, rados_writesame_exit, retval);
return retval;
}

extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off,
const char *src, uint64_t src_off, size_t len)
{
Expand Down Expand Up @@ -4338,6 +4378,23 @@ extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o,
return retval;
}

extern "C" int rados_aio_writesame(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *buf, size_t data_len,
size_t write_len, uint64_t off)
{
tracepoint(librados, rados_aio_writesame_enter, io, o, completion, buf,
data_len, write_len, off);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
bl.append(buf, data_len);
int retval = ctx->aio_writesame(o, (librados::AioCompletionImpl*)completion,
bl, write_len, off);
tracepoint(librados, rados_aio_writesame_exit, retval);
return retval;
}

extern "C" int rados_aio_remove(rados_ioctx_t io, const char *o,
rados_completion_t completion)
{
Expand Down Expand Up @@ -4876,6 +4933,19 @@ extern "C" void rados_write_op_write_full(rados_write_op_t write_op,
tracepoint(librados, rados_write_op_write_full_exit);
}

extern "C" void rados_write_op_writesame(rados_write_op_t write_op,
const char *buffer,
size_t data_len,
size_t write_len,
uint64_t offset)
{
tracepoint(librados, rados_write_op_writesame_enter, write_op, buffer, data_len, write_len, offset);
bufferlist bl;
bl.append(buffer, data_len);
((::ObjectOperation *)write_op)->writesame(offset, write_len, bl);
tracepoint(librados, rados_write_op_writesame_exit);
}

extern "C" void rados_write_op_append(rados_write_op_t write_op,
const char *buffer,
size_t len)
Expand Down
49 changes: 49 additions & 0 deletions src/osdc/Objecter.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ struct ObjectOperation {
osd_op.op.extent.length = len;
osd_op.indata.claim_append(bl);
}
void add_writesame(int op, uint64_t off, uint64_t write_len,
bufferlist& bl) {
OSDOp& osd_op = add_op(op);
osd_op.op.writesame.offset = off;
osd_op.op.writesame.length = write_len;
osd_op.op.writesame.data_length = bl.length();
osd_op.indata.claim_append(bl);
}
void add_clone_range(int op, uint64_t off, uint64_t len,
const object_t& srcoid, uint64_t srcoff,
snapid_t srcsnapid) {
Expand Down Expand Up @@ -370,6 +378,9 @@ struct ObjectOperation {
void write_full(bufferlist& bl) {
add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
}
void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
}
void append(bufferlist& bl) {
add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
}
Expand Down Expand Up @@ -2598,6 +2609,44 @@ class Objecter : public md_config_obs_t, public Dispatcher {
op_submit(o, &tid);
return tid;
}
Op *prepare_writesame_op(
const object_t& oid, const object_locator_t& oloc,
uint64_t write_len, uint64_t off,
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, int flags, Context *onack,
Context *oncommit, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {

vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_WRITESAME;
ops[i].op.writesame.offset = off;
ops[i].op.writesame.length = write_len;
ops[i].op.writesame.data_length = bl.length();
ops[i].indata = bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
}
ceph_tid_t writesame(
const object_t& oid, const object_locator_t& oloc,
uint64_t write_len, uint64_t off,
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, int flags, Context *onack,
Context *oncommit, version_t *objver = NULL,
ObjectOperation *extra_ops = NULL, int op_flags = 0) {

Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
mtime, flags, onack, oncommit, objver,
extra_ops, op_flags);

ceph_tid_t tid;
op_submit(o, &tid);
return tid;
}
ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, ceph::real_time mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq, Context *onack,
Expand Down
Loading

0 comments on commit 161d67c

Please sign in to comment.