Skip to content

Commit

Permalink
librbd: throttle async progress callbacks
Browse files Browse the repository at this point in the history
Ensure that no more than one outstanding progress callback
is queued for notification.  This will allow remote progress
updates to be sent at a rate in which all watch/notify
clients can support.

Signed-off-by: Jason Dillaman <[email protected]>
  • Loading branch information
Jason Dillaman authored and jdurgin committed Jan 24, 2015
1 parent cd9d8eb commit 4ac3cd7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 64 deletions.
91 changes: 29 additions & 62 deletions src/librbd/ImageWatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,58 +59,6 @@ enum {
NOTIFY_OP_SNAP_CREATE = 8
};

class RemoteProgressContext : public ProgressContext {
public:
RemoteProgressContext(ImageWatcher &image_watcher, Finisher &finisher,
const RemoteAsyncRequest &remote_async_request)
: m_image_watcher(image_watcher), m_finisher(finisher),
m_remote_async_request(remote_async_request)
{
}

virtual int update_progress(uint64_t offset, uint64_t total) {
// TODO: JD throttle notify updates(?)
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_progress,
&m_image_watcher, m_remote_async_request, offset, total));
m_finisher.queue(ctx);
return 0;
}

private:
ImageWatcher &m_image_watcher;
Finisher &m_finisher;
RemoteAsyncRequest m_remote_async_request;
};

class RemoteContext : public Context {
public:
RemoteContext(ImageWatcher &image_watcher, Finisher &finisher,
const RemoteAsyncRequest &remote_async_request,
RemoteProgressContext *prog_ctx)
: m_image_watcher(image_watcher), m_finisher(finisher),
m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
{
}

~RemoteContext() {
delete m_prog_ctx;
}

virtual void finish(int r) {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_complete,
&m_image_watcher, m_remote_async_request, r));
m_finisher.queue(ctx);
}

private:
ImageWatcher &m_image_watcher;
Finisher &m_finisher;
RemoteAsyncRequest m_remote_async_request;
RemoteProgressContext *m_prog_ctx;
};

ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
: m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
Expand Down Expand Up @@ -443,6 +391,9 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
ENCODE_FINISH(bl);

m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);

RWLock::WLocker l(m_async_request_lock);
m_async_progress.erase(request);
return 0;
}

Expand Down Expand Up @@ -757,6 +708,19 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id,
return r;
}

void ImageWatcher::schedule_update_progress(
const RemoteAsyncRequest &remote_async_request,
uint64_t offset, uint64_t total) {
RWLock::WLocker l(m_async_request_lock);
if (m_async_progress.count(remote_async_request) == 0) {
m_async_progress.insert(remote_async_request);
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_progress,
this, remote_async_request, offset, total));
m_finisher->queue(ctx);
}
}

void ImageWatcher::handle_header_update() {
ldout(m_image_ctx.cct, 1) << "image header updated" << dendl;

Expand Down Expand Up @@ -857,13 +821,11 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
RemoteAsyncRequest request;
::decode(request, iter);

RemoteProgressContext *prog_ctx =
new RemoteProgressContext(*this, *m_finisher, request);
RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
prog_ctx);
RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
request);
RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);

ldout(m_image_ctx.cct, 20) << "remote flatten request: " << request << dendl;

int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
if (r < 0) {
delete ctx;
Expand All @@ -884,14 +846,12 @@ void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
RemoteAsyncRequest request;
::decode(request, iter);

RemoteProgressContext *prog_ctx =
new RemoteProgressContext(*this, *m_finisher, request);
RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
prog_ctx);
RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
request);
RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);

ldout(m_image_ctx.cct, 20) << "remote resize request: " << request
<< " " << size << dendl;

int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
if (r < 0) {
delete ctx;
Expand Down Expand Up @@ -1076,4 +1036,11 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) {
image_watcher.handle_error(handle, err);
}

void ImageWatcher::RemoteContext::finish(int r) {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_complete,
&m_image_watcher, m_remote_async_request, r));
m_image_watcher.m_finisher->queue(ctx);
}

}
60 changes: 58 additions & 2 deletions src/librbd/ImageWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
#include <set>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -22,7 +24,6 @@ namespace librbd {

class AioCompletion;
class ImageCtx;
class ProgressContext;

struct RemoteAsyncRequest {
uint64_t gid;
Expand All @@ -32,12 +33,21 @@ namespace librbd {
RemoteAsyncRequest() : gid(), handle(), request_id() {}
RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
: gid(gid_), handle(handle_), request_id(request_id_) {}

inline bool operator<(const RemoteAsyncRequest &rhs) const {
if (gid != rhs.gid) {
return gid < rhs.gid;
} else if (handle != rhs.handle) {
return handle < rhs.handle;
} else {
return request_id < request_id;
}
}
};

class ImageWatcher {
public:


ImageWatcher(ImageCtx& image_ctx);
~ImageWatcher();

Expand Down Expand Up @@ -97,6 +107,48 @@ namespace librbd {
virtual void handle_error(uint64_t handle, int err);
};

class RemoteProgressContext : public ProgressContext {
public:
RemoteProgressContext(ImageWatcher &image_watcher,
const RemoteAsyncRequest &remote_async_request)
: m_image_watcher(image_watcher),
m_remote_async_request(remote_async_request)
{
}

virtual int update_progress(uint64_t offset, uint64_t total) {
m_image_watcher.schedule_update_progress(
m_remote_async_request, offset, total);
return 0;
}

private:
ImageWatcher &m_image_watcher;
RemoteAsyncRequest m_remote_async_request;
};

class RemoteContext : public Context {
public:
RemoteContext(ImageWatcher &image_watcher,
const RemoteAsyncRequest &remote_async_request,
RemoteProgressContext *prog_ctx)
: m_image_watcher(image_watcher),
m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
{
}

~RemoteContext() {
delete m_prog_ctx;
}

virtual void finish(int r);

private:
ImageWatcher &m_image_watcher;
RemoteAsyncRequest m_remote_async_request;
RemoteProgressContext *m_prog_ctx;
};

ImageCtx &m_image_ctx;

WatchCtx m_watch_ctx;
Expand All @@ -115,6 +167,7 @@ namespace librbd {
RWLock m_async_request_lock;
uint64_t m_async_request_id;
std::map<uint64_t, AsyncRequest> m_async_requests;
std::set<RemoteAsyncRequest> m_async_progress;

Mutex m_aio_request_lock;
Cond m_aio_request_cond;
Expand Down Expand Up @@ -152,6 +205,9 @@ namespace librbd {
ProgressContext& prog_ctx);
void notify_request_leadership();

void schedule_update_progress(const RemoteAsyncRequest &remote_async_request,
uint64_t offset, uint64_t total);

void handle_header_update();
void handle_acquired_lock();
void handle_released_lock();
Expand Down

0 comments on commit 4ac3cd7

Please sign in to comment.