Skip to content

Commit

Permalink
Merge branch 'wip-rbd-layering'
Browse files Browse the repository at this point in the history
Conflicts:
	src/librbd/internal.cc
  • Loading branch information
Dan Mick committed Aug 18, 2012
2 parents ae57db0 + f965358 commit ecd279f
Show file tree
Hide file tree
Showing 17 changed files with 831 additions and 192 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ noinst_HEADERS = \
librbd/ImageCtx.h\
librbd/internal.h\
librbd/LibrbdWriteback.h\
librbd/parent_types.h\
librbd/SnapInfo.h\
librbd/WatchCtx.h\
logrotate.conf\
Expand Down
224 changes: 224 additions & 0 deletions src/cls_rbd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ cls_method_handle_t h_set_parent;
cls_method_handle_t h_get_protection_status;
cls_method_handle_t h_set_protection_status;
cls_method_handle_t h_remove_parent;
cls_method_handle_t h_add_child;
cls_method_handle_t h_remove_child;
cls_method_handle_t h_get_children;
cls_method_handle_t h_get_snapcontext;
cls_method_handle_t h_get_object_prefix;
cls_method_handle_t h_get_snapshot_name;
Expand Down Expand Up @@ -990,6 +993,216 @@ int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
return 0;
}

/**
* methods for dealing with rbd_children object
*/

static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
string *image_id, snapid_t *snap_id)
{
try {
::decode(*pool_id, it);
::decode(*image_id, it);
::decode(*snap_id, it);
} catch (const buffer::error &err) {
CLS_ERR("error decoding parent spec");
return -EINVAL;
}
return 0;
}

static int decode_parent(bufferlist *in, uint64_t *pool_id,
string *image_id, snapid_t *snap_id)
{
bufferlist::iterator it = in->begin();
return decode_parent_common(it, pool_id, image_id, snap_id);
}

static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
string *image_id, snapid_t *snap_id,
string *c_image_id)
{
bufferlist::iterator it = in->begin();
int r = decode_parent_common(it, pool_id, image_id, snap_id);
if (r < 0)
return r;
try {
::decode(*c_image_id, it);
} catch (const buffer::error &err) {
CLS_ERR("error decoding child image id");
return -EINVAL;
}
return 0;
}

static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
{
bufferlist key_bl;
::encode(pool_id, key_bl);
::encode(image_id, key_bl);
::encode(snap_id, key_bl);
return string(key_bl.c_str(), key_bl.length());
}

/**
* add child to rbd_children directory object
*
* rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
* [c_image_id, [c_image_id ... ]]
*
* Input:
* @param p_pool_id parent pool id
* @param p_image_id parent image oid
* @param p_snap_id parent snapshot id
* @param c_image_id new child image oid to add
*
* @returns 0 on success, negative error on failure
*/

int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;

uint64_t p_pool_id;
snapid_t p_snap_id;
string p_image_id, c_image_id;
// Use set for ease of erase() for remove_child()
std::set<string> children;

r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
&c_image_id);
if (r < 0)
return r;

CLS_LOG(20, "add_child %s to (%d, %s, %d)", c_image_id.c_str(),
p_pool_id, p_image_id.c_str(), p_snap_id);

string key = parent_key(p_pool_id, p_image_id, p_snap_id);

// get current child list for parent, if any
r = read_key(hctx, key, &children);
if ((r < 0) && (r != -ENOENT)) {
CLS_LOG(20, "add_child: omap read failed: %d", r);
return r;
}

if (children.find(c_image_id) != children.end()) {
CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
return -EEXIST;
}
// add new child
children.insert(c_image_id);

// write back
bufferlist childbl;
::encode(children, childbl);
r = cls_cxx_map_set_val(hctx, key, &childbl);
if (r < 0)
CLS_LOG(20, "add_child: omap write failed: %d", r);
return r;
}

/**
* remove child from rbd_children directory object
*
* Input:
* @param p_pool_id parent pool id
* @param p_image_id parent image oid
* @param p_snap_id parent snapshot id
* @param c_image_id new child image oid to add
*
* @returns 0 on success, negative error on failure
*/

int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;

uint64_t p_pool_id;
snapid_t p_snap_id;
string p_image_id, c_image_id;
std::set<string> children;

r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
&c_image_id);
if (r < 0)
return r;

CLS_LOG(20, "remove_child %s from (%d, %s, %d)", c_image_id.c_str(),
p_pool_id, p_image_id.c_str(), p_snap_id);

string key = parent_key(p_pool_id, p_image_id, p_snap_id);

// get current child list for parent. Unlike add_child(), an empty list
// is an error (how can we remove something that doesn't exist?)
r = read_key(hctx, key, &children);
if (r < 0) {
CLS_LOG(20, "remove_child: read omap failed: %d", r);
return r;
}

if (children.find(c_image_id) == children.end()) {
CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
return -ENOENT;
}
// find and remove child
children.erase(c_image_id);

// now empty? remove key altogether
if (children.empty()) {
r = cls_cxx_map_remove_key(hctx, key);
if (r < 0)
CLS_LOG(20, "remove_child: remove key failed: %d", r);
} else {
// write back shortened children list
bufferlist childbl;
::encode(children, childbl);
r = cls_cxx_map_set_val(hctx, key, &childbl);
if (r < 0)
CLS_LOG(20, "remove_child: write omap failed: %d ", r);
}
return r;
}

/**
* Input:
* @param p_pool_id parent pool id
* @param p_image_id parent image oid
* @param p_snap_id parent snapshot id
* @param c_image_id new child image oid to add
*
* Output:
* @param children set<string> of children
*
* @returns 0 on success, negative error on failure
*/
int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
int r;
uint64_t p_pool_id;
snapid_t p_snap_id;
string p_image_id;
std::set<string> children;

r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
if (r < 0)
return r;

CLS_LOG(20, "get_children of (%d, %s, %d)",
p_pool_id, p_image_id.c_str(), p_snap_id);

string key = parent_key(p_pool_id, p_image_id, p_snap_id);

r = read_key(hctx, key, &children);
if (r < 0) {
if (r != -ENOENT)
CLS_LOG(20, "get_children: read omap failed: %d", r);
return r;
}
::encode(children, *out);
return 0;
}


/**
* Get the information needed to create a rados snap context for doing
Expand Down Expand Up @@ -1995,6 +2208,17 @@ void __cls_init()
CLS_METHOD_RD | CLS_METHOD_PUBLIC,
get_protection_status, &h_get_protection_status);

/* methods for the rbd_children object */
cls_register_cxx_method(h_class, "add_child",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
add_child, &h_add_child);
cls_register_cxx_method(h_class, "remove_child",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
remove_child, &h_remove_child);
cls_register_cxx_method(h_class, "get_children",
CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
get_children, &h_get_children);

/* methods for the rbd_id.$image_name objects */
cls_register_cxx_method(h_class, "get_id",
CLS_METHOD_RD | CLS_METHOD_PUBLIC,
Expand Down
2 changes: 2 additions & 0 deletions src/include/rbd/librbd.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ int rbd_snap_rollback_with_progress(rbd_image_t image, const char *snapname,
*
* @param snap_name which snapshot to protect
* @returns 0 on success, negative error code on failure
* @returns -EBUSY if snap is already protected
*/
int rbd_snap_protect(rbd_image_t image, const char *snap_name);
/**
* Allow a snaphshot to be deleted.
*
* @param snap_name which snapshot to unprotect
* @returns 0 on success, negative error code on failure
* @returns -EINVAL if snap is not protected
*/
int rbd_snap_unprotect(rbd_image_t image, const char *snap_name);
/**
Expand Down
16 changes: 16 additions & 0 deletions src/include/rbd_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@
#define RBD_DIRECTORY "rbd_directory"
#define RBD_INFO "rbd_info"

/*
* rbd_children object in each pool contains omap entries
* that map parent (poolid, imageid, snapid) to a list of children
* (imageids; snapids aren't required because we get all the snapshot
* info from a read of the child's header object anyway).
*
* The clone operation writes a new item to this child list, and rm or
* flatten removes an item, and may remove the whole entry if no children
* exist after the rm/flatten.
*
* When attempting to remove a parent, all pools are searched for
* rbd_children objects with entries referring to that parent; if any
* exist (and those children exist), the parent removal is prevented.
*/
#define RBD_CHILDREN "rbd_children"

#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */

#define RBD_MAX_OBJ_NAME_SIZE 96
Expand Down
29 changes: 21 additions & 8 deletions src/librbd/ImageCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "common/errno.h"
#include "common/perf_counters.h"

#include "librbd/cls_rbd.h"
#include "librbd/internal.h"
#include "librbd/WatchCtx.h"

Expand Down Expand Up @@ -212,6 +211,20 @@ namespace librbd {
return -ENOENT;
}

int ImageCtx::get_parent_spec(snapid_t in_snap_id, parent_spec *out_pspec)
{
assert(snap_lock.is_locked());
map<string, SnapInfo>::iterator it;

for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) {
if (it->second.id == in_snap_id) {
*out_pspec = it->second.parent.spec;
return 0;
}
}
return -ENOENT;
}

int ImageCtx::is_snap_protected(string in_snap_name, bool *is_protected) const
{
assert(snap_lock.is_locked());
Expand All @@ -237,7 +250,7 @@ namespace librbd {

void ImageCtx::add_snap(string in_snap_name, snap_t id, uint64_t in_size,
uint64_t features,
cls_client::parent_info parent,
parent_info parent,
uint8_t protection_status)
{
assert(snap_lock.is_locked());
Expand Down Expand Up @@ -287,7 +300,7 @@ namespace librbd {
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.pool_id;
return parent_md.spec.pool_id;
}
string in_snap_name;
int r = get_snap_name(in_snap_id, &in_snap_name);
Expand All @@ -296,15 +309,15 @@ namespace librbd {
map<string, SnapInfo>::const_iterator p = snaps_by_name.find(in_snap_name);
if (p == snaps_by_name.end())
return -1;
return p->second.parent.pool_id;
return p->second.parent.spec.pool_id;
}

string ImageCtx::get_parent_image_id(snap_t in_snap_id) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.image_id;
return parent_md.spec.image_id;
}
string in_snap_name;
int r = get_snap_name(in_snap_id, &in_snap_name);
Expand All @@ -313,15 +326,15 @@ namespace librbd {
map<string, SnapInfo>::const_iterator p = snaps_by_name.find(in_snap_name);
if (p == snaps_by_name.end())
return "";
return p->second.parent.image_id;
return p->second.parent.spec.image_id;
}

uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.snap_id;
return parent_md.spec.snap_id;
}
string in_snap_name;
int r = get_snap_name(in_snap_id, &in_snap_name);
Expand All @@ -330,7 +343,7 @@ namespace librbd {
map<string, SnapInfo>::const_iterator p = snaps_by_name.find(in_snap_name);
if (p == snaps_by_name.end())
return CEPH_NOSNAP;
return p->second.parent.snap_id;
return p->second.parent.spec.snap_id;
}

int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const
Expand Down
Loading

0 comments on commit ecd279f

Please sign in to comment.