Skip to content

Commit

Permalink
Merge pull request ceph#9292 from marcan/pyrbd-aio
Browse files Browse the repository at this point in the history
pybind: AIO bindings for RBD

Reviewed-by: Jason Dillaman <[email protected]>
  • Loading branch information
Jason Dillaman authored Jun 16, 2016
2 parents 996b447 + 07ff002 commit 799633c
Show file tree
Hide file tree
Showing 2 changed files with 361 additions and 3 deletions.
304 changes: 301 additions & 3 deletions src/pybind/rbd/rbd.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ method.
# Copyright 2011 Josh Durgin
# Copyright 2015 Hector Martin <[email protected]>

import cython
import sys

from cpython cimport PyObject, ref, exc
from libc cimport errno
from libc.stdint cimport *
Expand All @@ -38,8 +41,6 @@ cdef extern from "Python.h":
cdef extern from "time.h":
ctypedef long int time_t

ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr)

cdef extern from "rbd/librbd.h" nogil:
enum:
_RBD_FEATURE_LAYERING "RBD_FEATURE_LAYERING"
Expand Down Expand Up @@ -71,6 +72,7 @@ cdef extern from "rbd/librbd.h" nogil:
ctypedef void* rados_ioctx_t
ctypedef void* rbd_image_t
ctypedef void* rbd_image_options_t
ctypedef void *rbd_completion_t

ctypedef struct rbd_image_info_t:
uint64_t size
Expand Down Expand Up @@ -123,6 +125,9 @@ cdef extern from "rbd/librbd.h" nogil:
time_t last_update
bint up

ctypedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg)
ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr)

void rbd_version(int *major, int *minor, int *extra)

void rbd_image_options_create(rbd_image_options_t* opts)
Expand Down Expand Up @@ -258,6 +263,21 @@ cdef extern from "rbd/librbd.h" nogil:
rbd_mirror_image_status_t *mirror_image_status,
size_t status_size)

int rbd_aio_write2(rbd_image_t image, uint64_t off, size_t len,
const char *buf, rbd_completion_t c, int op_flags)
int rbd_aio_read2(rbd_image_t image, uint64_t off, size_t len,
char *buf, rbd_completion_t c, int op_flags)
int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len,
rbd_completion_t c)

int rbd_aio_create_completion(void *cb_arg, rbd_callback_t complete_cb,
rbd_completion_t *c)
int rbd_aio_is_complete(rbd_completion_t c)
int rbd_aio_wait_for_complete(rbd_completion_t c)
ssize_t rbd_aio_get_return_value(rbd_completion_t c)
void rbd_aio_release(rbd_completion_t c)
int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)

RBD_FEATURE_LAYERING = _RBD_FEATURE_LAYERING
RBD_FEATURE_STRIPINGV2 = _RBD_FEATURE_STRIPINGV2
RBD_FEATURE_EXCLUSIVE_LOCK = _RBD_FEATURE_EXCLUSIVE_LOCK
Expand Down Expand Up @@ -397,7 +417,7 @@ cdef make_ex(ret, msg):
cdef rados_ioctx_t convert_ioctx(rados.Ioctx ioctx) except? NULL:
return <rados_ioctx_t>ioctx.io

cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr):
cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr) nogil:
return 0

def cstr(val, name, encoding="utf-8", opt=False):
Expand Down Expand Up @@ -444,6 +464,113 @@ cdef void* realloc_chk(void* ptr, size_t size) except NULL:
raise MemoryError("realloc failed")
return ret

cdef class Completion

cdef void __aio_complete_cb(rbd_completion_t completion, void *args) with gil:
"""
Callback to oncomplete() for asynchronous operations
"""
cdef Completion cb = <Completion>args
cb._complete()


cdef class Completion(object):
"""completion object"""

cdef:
object image
object oncomplete
rbd_completion_t rbd_comp
PyObject* buf
bint persisted
object exc_info

def __cinit__(self, image, object oncomplete):
self.oncomplete = oncomplete
self.image = image
self.persisted = False

def is_complete(self):
"""
Has an asynchronous operation completed?
This does not imply that the callback has finished.
:returns: True if the operation is completed
"""
with nogil:
ret = rbd_aio_is_complete(self.rbd_comp)
return ret == 1

def wait_for_complete_and_cb(self):
"""
Wait for an asynchronous operation to complete
This method waits for the callback to execute, if one was provided.
It will also re-raise any exceptions raised by the callback. You
should call this to "reap" asynchronous completions and ensure that
any exceptions in the callbacks are handled, as an exception internal
to this module may have occurred.
"""
with nogil:
rbd_aio_wait_for_complete(self.rbd_comp)

if self.exc_info:
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]

def get_return_value(self):
"""
Get the return value of an asychronous operation
The return value is set when the operation is complete.
:returns: int - return value of the operation
"""
with nogil:
ret = rbd_aio_get_return_value(self.rbd_comp)
return ret

def __dealloc__(self):
"""
Release a completion
This is automatically called when the completion object is freed.
"""
ref.Py_XDECREF(self.buf)
self.buf = NULL
if self.rbd_comp != NULL:
with nogil:
rbd_aio_release(self.rbd_comp)
self.rbd_comp = NULL

cdef void _complete(self):
try:
self.__unpersist()
if self.oncomplete:
self.oncomplete(self)
# In the event that something raises an exception during the next 2
# lines of code, we will not be able to catch it, and this may result
# in the app not noticing a failed callback. However, this should only
# happen in extreme circumstances (OOM, etc.). KeyboardInterrupt
# should not be a problem because the callback thread from librbd
# ought to have SIGINT blocked.
except:
self.exc_info = sys.exc_info()

cdef __persist(self):
if self.oncomplete is not None and not self.persisted:
# Increment our own reference count to make sure the completion
# is not freed until the callback is called. The completion is
# allowed to be freed if there is no callback.
ref.Py_INCREF(self)
self.persisted = True

cdef __unpersist(self):
if self.persisted:
ref.Py_DECREF(self)
self.persisted = False


class RBD(object):
"""
This class wraps librbd CRUD functions.
Expand Down Expand Up @@ -1052,6 +1179,31 @@ cdef class Image(object):
self.close()
return False

def __get_completion(self, oncomplete):
"""
Constructs a completion to use with asynchronous operations
:param oncomplete: callback for the completion
:raises: :class:`Error`
:returns: completion object
"""

completion_obj = Completion(self, oncomplete)

cdef:
rbd_completion_t completion
PyObject* p_completion_obj= <PyObject*>completion_obj

with nogil:
ret = rbd_aio_create_completion(p_completion_obj, __aio_complete_cb,
&completion)
if ret < 0:
raise make_ex(ret, "error getting a completion")

completion_obj.rbd_comp = completion
return completion_obj

def close(self):
"""
Release the resources used by this image object.
Expand Down Expand Up @@ -1929,6 +2081,152 @@ written." % (self.name, ret, length))
free(c_status.description)
return status

def aio_read(self, offset, length, oncomplete, fadvise_flags=0):
"""
Asynchronously read data from the image
Raises :class:`InvalidArgument` if part of the range specified is
outside the image.
oncomplete will be called with the returned read value as
well as the completion:
oncomplete(completion, data_read)
:param offset: the offset to start reading at
:type offset: int
:param length: how many bytes to read
:type length: int
:param oncomplete: what to do when the read is complete
:type oncomplete: completion
:param fadvise_flags: fadvise flags for this read
:type fadvise_flags: int
:returns: str - the data read
:raises: :class:`InvalidArgument`, :class:`IOError`
"""

cdef:
char *ret_buf
uint64_t _offset = offset
size_t _length = length
int _fadvise_flags = fadvise_flags
Completion completion

def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

completion = self.__get_completion(oncomplete_)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
try:
completion.__persist()
with nogil:
ret = rbd_aio_read2(self.image, _offset, _length, ret_buf,
completion.rbd_comp, _fadvise_flags)
if ret < 0:
raise make_ex(ret, 'error reading %s %ld~%ld' %
(self.name, offset, length))
except:
completion.__unpersist()
raise

return completion

def aio_write(self, data, offset, oncomplete, fadvise_flags=0):
"""
Asynchronously write data to the image
Raises :class:`InvalidArgument` if part of the write would fall outside
the image.
oncomplete will be called with the returned read value as
well as the completion:
oncomplete(completion, data_read)
:param offset: the offset to start reading at
:type offset: int
:param length: how many bytes to read
:type length: int
:param oncomplete: what to do when the read is complete
:type oncomplete: completion
:param fadvise_flags: fadvise flags for this read
:type fadvise_flags: int
:returns: str - the data read
:raises: :class:`InvalidArgument`, :class:`IOError`
"""

cdef:
uint64_t _offset = offset
char *_data = data
size_t _length = len(data)
int _fadvise_flags = fadvise_flags
Completion completion

completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_write2(self.image, _offset, _length, _data,
completion.rbd_comp, _fadvise_flags)
if ret < 0:
raise make_ex(ret, 'error writing %s %ld~%ld' %
(self.name, offset, _length))
except:
completion.__unpersist()
raise

return completion

def aio_discard(self, offset, length, oncomplete):
"""
Asynchronously trim the range from the image. It will be logically
filled with zeroes.
"""

cdef:
uint64_t _offset = offset
size_t _length = length
Completion completion

completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_discard(self.image, _offset, _length,
completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error discarding %s %ld~%ld' %
(self.name, offset, _length))
except:
completion.__unpersist()
raise

return completion

def aio_flush(self, oncomplete):
"""
Asyncronously wait until all writes are fully flushed if caching is
enabled.
"""

cdef Completion completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_flush(self.image, completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error flushing')
except:
completion.__unpersist()
raise

return completion

cdef class SnapIterator(object):
"""
Iterator over snapshot info for an image.
Expand Down
Loading

0 comments on commit 799633c

Please sign in to comment.