Skip to content

Commit

Permalink
Merge PR ceph#31489 into master
Browse files Browse the repository at this point in the history
* refs/pull/31489/head:
	pybind/rados: add WriteOp::writesame() and test WriteOp::writesame()
	pybind/rados: add Ioctx::aio_writesame() and test Ioctx::aio_writesame()
	pybind/rados: add Ioctx::writesame() and test Ioctx::writesame()

Reviewed-by: Deepika Upadhyay <[email protected]>
  • Loading branch information
liewegas committed Dec 9, 2019
2 parents 946a8f7 + 65cbfc8 commit ae60fd9
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
1 change: 1 addition & 0 deletions .githubmap
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,4 @@ dsavineau Dimitri Savineau <[email protected]>
jschmid1 Joshua Schmid <[email protected]>
bk201 Kiefer Chang <[email protected]>
alimaredia Ali Maredia <[email protected]>
ideepika Deepika Upadhyay <[email protected]>
98 changes: 98 additions & 0 deletions src/pybind/rados/rados.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ cdef extern from "rados/librados.h" nogil:
int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime)
int rados_write(rados_ioctx_t io, const char *oid, const char *buf, size_t len, uint64_t off)
int rados_write_full(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
int rados_writesame(rados_ioctx_t io, const char *oid, const char *buf, size_t data_len, size_t write_len, uint64_t off)
int rados_append(rados_ioctx_t io, const char *oid, const char *buf, size_t len)
int rados_read(rados_ioctx_t io, const char *oid, char *buf, size_t len, uint64_t off)
int rados_remove(rados_ioctx_t io, const char *oid)
Expand Down Expand Up @@ -275,6 +276,7 @@ cdef extern from "rados/librados.h" nogil:
int rados_aio_write(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len, uint64_t off)
int rados_aio_append(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
int rados_aio_write_full(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * buf, size_t len)
int rados_aio_writesame(rados_ioctx_t io, const char *oid, rados_completion_t completion, const char *buf, size_t data_len, size_t write_len, uint64_t off)
int rados_aio_remove(rados_ioctx_t io, const char * oid, rados_completion_t completion)
int rados_aio_read(rados_ioctx_t io, const char * oid, rados_completion_t completion, char * buf, size_t len, uint64_t off)
int rados_aio_flush(rados_ioctx_t io)
Expand Down Expand Up @@ -305,6 +307,7 @@ cdef extern from "rados/librados.h" nogil:
void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)
void rados_write_op_exec(rados_write_op_t write_op, const char *cls, const char *method, const char *in_buf, size_t in_len, int *prval)
void rados_write_op_writesame(rados_write_op_t write_op, const char *buffer, size_t data_len, size_t write_len, uint64_t offset)
void rados_read_op_omap_get_vals2(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
void rados_read_op_omap_get_keys2(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, unsigned char *pmore, int * prval)
void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
Expand Down Expand Up @@ -2183,6 +2186,25 @@ cdef class WriteOp(object):
with nogil:
rados_write_op_exec(self.write_op, _cls, _method, _data, _data_len, NULL)

@requires(('to_write', bytes), ('write_len', int), ('offset', int))
def writesame(self, to_write, write_len, offset=0):
"""
Write the same buffer multiple times
:param to_write: data to write
:type to_write: bytes
:param write_len: total number of bytes to write
:type len: int
:param offset: byte offset in the object to begin writing at
:type offset: int
"""
cdef:
char *_to_write = to_write
size_t _data_len = len(to_write)
size_t _write_len = write_len
uint64_t _offset = offset
with nogil:
rados_write_op_writesame(self.write_op, _to_write, _data_len, _write_len, _offset)

class WriteOpCtx(WriteOp, OpCtx):
"""write operation context manager"""

Expand Down Expand Up @@ -2429,6 +2451,49 @@ cdef class Ioctx(object):
raise make_ex(ret, "error writing object %s" % object_name)
return completion

@requires(('object_name', str_type), ('to_write', bytes), ('write_len', int),
('offset', int), ('oncomplete', opt(Callable)))
def aio_writesame(self, object_name, to_write, write_len, offset=0,
oncomplete=None):
"""
Asynchronously write the same buffer multiple times
:param object_name: name of the object
:type object_name: str
:param to_write: data to write
:type to_write: bytes
:param write_len: total number of bytes to write
:type write_len: int
:param offset: byte offset in the object to begin writing at
:type offset: int
:param oncomplete: what to do when the writesame is safe and
complete in memory on all replicas
:type oncomplete: completion
:raises: :class:`Error`
:returns: completion object
"""

object_name = cstr(object_name, 'object_name')

cdef:
Completion completion
char* _object_name = object_name
char* _to_write = to_write
size_t _data_len = len(to_write)
size_t _write_len = write_len
uint64_t _offset = offset

completion = self.__get_completion(oncomplete, None)
self.__track_completion(completion)
with nogil:
ret = rados_aio_writesame(self.io, _object_name, completion.rados_comp,
_to_write, _data_len, _write_len, _offset)

if ret < 0:
completion._cleanup()
raise make_ex(ret, "error writing object %s" % object_name)
return completion

@requires(('object_name', str_type), ('to_append', bytes), ('oncomplete', opt(Callable)),
('onsafe', opt(Callable)))
def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
Expand Down Expand Up @@ -2816,6 +2881,39 @@ returned %d, but should return zero on success." % (self.name, ret))
raise LogicError("Ioctx.write_full(%s): rados_write_full \
returned %d, but should return zero on success." % (self.name, ret))

@requires(('key', str_type), ('data', bytes), ('write_len', int), ('offset', int))
def writesame(self, key, data, write_len, offset=0):
"""
Write the same buffer multiple times
:param key: name of the object
:type key: str
:param data: data to write
:type data: bytes
:param write_len: total number of bytes to write
:type write_len: int
:param offset: byte offset in the object to begin writing at
:type offset: int
:raises: :class:`TypeError`
:raises: :class:`LogicError`
"""
self.require_ioctx_open()

key = cstr(key, 'key')
cdef:
char *_key = key
char *_data = data
size_t _data_len = len(data)
size_t _write_len = write_len
uint64_t _offset = offset

with nogil:
ret = rados_writesame(self.io, _key, _data, _data_len, _write_len, _offset)
if ret < 0:
raise make_ex(ret, "Ioctx.writesame(%s): failed to write %s"
% (self.name, key))
assert(ret == 0)

@requires(('key', str_type), ('data', bytes))
def append(self, key, data):
"""
Expand Down
27 changes: 27 additions & 0 deletions src/test/pybind/test_rados.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ def test_write_full(self):
self.ioctx.write_full('abc', b'd')
eq(self.ioctx.read('abc'), b'd')

def test_writesame(self):
self.ioctx.writesame('ob', b'rzx', 9)
eq(self.ioctx.read('ob'), b'rzxrzxrzx')

def test_append(self):
self.ioctx.write('abc', b'a')
self.ioctx.append('abc', b'b')
Expand Down Expand Up @@ -534,6 +538,12 @@ def test_execute_op(self):
self.ioctx.operate_write_op(write_op, "object")
eq(self.ioctx.read('object'), b"Hello, ebs!")

def test_writesame_op(self):
with WriteOpCtx() as write_op:
write_op.writesame(b'rzx', 9)
self.ioctx.operate_write_op(write_op, 'abc')
eq(self.ioctx.read('abc'), b'rzxrzxrzx')

def test_get_omap_vals_by_keys(self):
keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
Expand Down Expand Up @@ -695,6 +705,23 @@ def cb(blah):
eq(contents, b"bar")
[i.remove() for i in self.ioctx.list_objects()]

def test_aio_writesame(self):
lock = threading.Condition()
count = [0]
def cb(blah):
with lock:
count[0] += 1
lock.notify()
return 0
comp = self.ioctx.aio_writesame("abc", b"rzx", 9, 0, cb)
comp.wait_for_complete()
with lock:
while count[0] < 1:
lock.wait()
eq(comp.get_return_value(), 0)
eq(self.ioctx.read("abc"), b"rzxrzxrzx")
[i.remove() for i in self.ioctx.list_objects()]

def test_aio_stat(self):
lock = threading.Condition()
count = [0]
Expand Down

0 comments on commit ae60fd9

Please sign in to comment.