Skip to content

Commit

Permalink
rbd-replay: Hash completions across multiple mutexes to reduce conten…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Adam Crume <[email protected]>
  • Loading branch information
adamcrume authored and liewegas committed Aug 21, 2014
1 parent ffc9d05 commit 25d3d42
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 23 deletions.
40 changes: 24 additions & 16 deletions src/rbd_replay/Replayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,17 @@ void Worker::set_action_complete(action_id_t id) {
}


Replayer::Replayer() {
Replayer::Replayer(int num_action_trackers)
: m_num_action_trackers(num_action_trackers),
m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
}

Replayer::~Replayer() {
delete[] m_action_trackers;
}

Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
return m_action_trackers[id % m_num_action_trackers];
}

void Replayer::run(const std::string replay_file) {
Expand Down Expand Up @@ -210,33 +220,35 @@ void Replayer::erase_image(imagectx_id_t imagectx_id) {
void Replayer::set_action_complete(action_id_t id) {
dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
boost::system_time now(boost::get_system_time());
boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
assert(m_actions_complete.count(id) == 0);
m_actions_complete[id] = now;
m_actions_complete_condition.notify_all();
action_tracker_d &tracker = tracker_for(id);
boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
assert(tracker.actions.count(id) == 0);
tracker.actions[id] = now;
tracker.condition.notify_all();
}

bool Replayer::is_action_complete(action_id_t id) {
boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
return _is_action_complete(id);
action_tracker_d &tracker = tracker_for(id);
boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
return tracker.actions.count(id) > 0;
}

void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
BOOST_FOREACH(const dependency_d &dep, deps) {
dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
action_tracker_d &tracker = tracker_for(dep.id);
boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
bool first_time = true;
while (!_is_action_complete(dep.id)) {
//m_actions_complete_condition.wait(lock);
while (tracker.actions.count(dep.id) == 0) {
if (!first_time) {
dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
}
m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
first_time = false;
}
boost::system_time action_completed_time(m_actions_complete[dep.id]);
boost::system_time action_completed_time(tracker.actions[dep.id]);
lock.unlock();
boost::system_time end_time(boost::get_system_time());
long long micros = (end_time - start_time).total_microseconds();
Expand All @@ -263,10 +275,6 @@ void Replayer::clear_images() {
m_images.clear();
}

bool Replayer::_is_action_complete(action_id_t id) {
return m_actions_complete.count(id) > 0;
}

void Replayer::set_latency_multiplier(float f) {
m_latency_multiplier = f;
}
22 changes: 16 additions & 6 deletions src/rbd_replay/Replayer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class Worker : public ActionCtx {

class Replayer {
public:
Replayer();
Replayer(int num_action_trackers);

~Replayer();

void run(const std::string replay_file);

Expand All @@ -95,9 +97,16 @@ class Replayer {
void set_latency_multiplier(float f);

private:
struct action_tracker_d {
// Maps an action ID to the time the action completed
std::map<action_id_t, boost::system_time> actions;
boost::shared_mutex mutex;
boost::condition condition;
};

void clear_images();

bool _is_action_complete(action_id_t id);
action_tracker_d &tracker_for(action_id_t id);

// Disallow assignment and copying
Replayer(const Replayer& rhs);
Expand All @@ -110,10 +119,11 @@ class Replayer {
std::map<imagectx_id_t, librbd::Image*> m_images;
boost::shared_mutex m_images_mutex;

// Maps an action ID to the time the action completed
std::map<action_id_t, boost::system_time> m_actions_complete;
boost::shared_mutex m_actions_complete_mutex;
boost::condition m_actions_complete_condition;
// Actions are hashed across the trackers by ID.
// Number of trackers should probably be larger than the number of cores and prime.
// Should definitely be odd.
const int m_num_action_trackers;
action_tracker_d* m_action_trackers;
};

}
Expand Down
4 changes: 3 additions & 1 deletion src/rbd_replay/rbd-replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

#include <vector>
#include <boost/thread.hpp>
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include "Replayer.hpp"
Expand Down Expand Up @@ -81,7 +82,8 @@ int main(int argc, const char **argv) {
return 1;
}

Replayer replayer;
unsigned int nthreads = boost::thread::hardware_concurrency();
Replayer replayer(2 * nthreads + 1);
replayer.set_latency_multiplier(latency_multiplier);
replayer.run(replay_file);
}

0 comments on commit 25d3d42

Please sign in to comment.