Skip to content

Commit

Permalink
Merge commit 'origin/filestore' into unstable
Browse files Browse the repository at this point in the history
Conflicts:

	src/os/FileStore.cc
	src/os/FileStore.h
  • Loading branch information
liewegas committed Feb 17, 2010
2 parents ef27fd6 + d6d3d2a commit 0e33171
Show file tree
Hide file tree
Showing 25 changed files with 1,586 additions and 585 deletions.
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ noinst_HEADERS = \
common/Semaphore.h\
common/Spinlock.h\
common/Thread.h\
common/Throttle.h\
common/Timer.h\
common/tls.h\
common/WorkQueue.h\
Expand Down
7 changes: 7 additions & 0 deletions src/TODO
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ v0.20
- rebuild mds hierarchy
- kclient: retry alloc on ENOMEM when reading from connection?

filestore
- throttling
- flush objects onto primary during recovery
- audit queue_transaction calls for dependencies
- convert apply_transaction calls in handle_map to queue?
- need an osdmap cache layer?

bugs
- dbench 1, restart mds (may take a few times), dbench will error out.
Expand Down Expand Up @@ -193,6 +199,7 @@ filestore performance notes
- acks: never.
- throttle: ??
- rmw: rmw must block on prior fs writes.
* JourningObjectStore interface needs work?

- separate reads/writes into separate op queues?
-
Expand Down
51 changes: 45 additions & 6 deletions src/common/Finisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
#include "config.h"
#include "Finisher.h"

#include "common/debug.h"
#define DOUT_SUBSYS finisher
#undef dout_prefix
#define dout_prefix *_dout << dbeginl << std::hex << pthread_self() << std::dec << " finisher(" << this << ") "

void Finisher::start()
{
finisher_thread.create();
Expand All @@ -16,29 +21,63 @@ void Finisher::stop()
finisher_thread.join();
}

void Finisher::wait_for_empty()
{
finisher_lock.Lock();
while (!finisher_queue.empty() || finisher_running) {
dout(10) << "wait_for_empty waiting" << dendl;
finisher_empty_cond.Wait(finisher_lock);
}
dout(10) << "wait_for_empty empty" << dendl;
finisher_lock.Unlock();
}

void *Finisher::finisher_thread_entry()
{
finisher_lock.Lock();
//dout_generic(10) << "finisher_thread start" << dendl;
dout(10) << "finisher_thread start" << dendl;

while (!finisher_stop) {
while (!finisher_queue.empty()) {
vector<Context*> ls;
list<pair<Context*,int> > ls_rval;
ls.swap(finisher_queue);

ls_rval.swap(finisher_queue_rval);
finisher_running = true;
finisher_lock.Unlock();
dout(10) << "finisher_thread doing " << ls << dendl;

finish_contexts(ls, 0);
for (vector<Context*>::iterator p = ls.begin();
p != ls.end();
p++) {
if (*p) {
(*p)->finish(0);
delete *p;
} else {
assert(!ls_rval.empty());
Context *c = ls_rval.front().first;
c->finish(ls_rval.front().second);
delete c;
ls_rval.pop_front();
}
}
dout(10) << "finisher_thread done with " << ls << dendl;
ls.clear();

finisher_lock.Lock();
finisher_running = false;
}
if (finisher_stop) break;
dout(10) << "finisher_thread empty" << dendl;
finisher_empty_cond.Signal();
if (finisher_stop)
break;

//dout_generic(30) << "finisher_thread sleeping" << dendl;
dout(10) << "finisher_thread sleeping" << dendl;
finisher_cond.Wait(finisher_lock);
}
finisher_empty_cond.Signal();

//dout_generic(10) << "finisher_thread start" << dendl;
dout(10) << "finisher_thread stop" << dendl;
finisher_lock.Unlock();
return 0;
}
Expand Down
18 changes: 13 additions & 5 deletions src/common/Finisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

class Finisher {
Mutex finisher_lock;
Cond finisher_cond;
bool finisher_stop;
Cond finisher_cond, finisher_empty_cond;
bool finisher_stop, finisher_running;
vector<Context*> finisher_queue;
list<pair<Context*,int> > finisher_queue_rval;

void *finisher_thread_entry();

Expand All @@ -34,9 +35,13 @@ class Finisher {
} finisher_thread;

public:
void queue(Context *c) {
void queue(Context *c, int r = 0) {
finisher_lock.Lock();
finisher_queue.push_back(c);
if (r) {
finisher_queue_rval.push_back(pair<Context*, int>(c, r));
finisher_queue.push_back(NULL);
} else
finisher_queue.push_back(c);
finisher_cond.Signal();
finisher_lock.Unlock();
}
Expand All @@ -58,7 +63,10 @@ class Finisher {
void start();
void stop();

Finisher() : finisher_lock("Finisher::finisher_lock"), finisher_stop(false), finisher_thread(this) {}
void wait_for_empty();

Finisher() : finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false), finisher_thread(this) {}
};

#endif
68 changes: 68 additions & 0 deletions src/common/Throttle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#ifndef _CEPH_THROTTLE_H
#define _CEPH_THROTTLE_H

#include "Mutex.h"
#include "Cond.h"

class Throttle {
__u64 count, want, max;
Mutex lock;
Cond cond;

public:
Throttle(__u64 m = 0) : count(0), max(m),
lock("Throttle::lock") {}

private:
void _reset_max(__u64 m) {
if (m) {
if (m < max)
cond.SignalAll();
max = m;
}
}
bool _wait(__u64 c) {
bool waited = false;
while (max && count + c > max) {
waited = true;
cond.Wait(lock);
}
return waited;
}

public:
__u64 get_current() {
Mutex::Locker l(lock);
return count;
}

bool wait(__u64 m = 0) {
Mutex::Locker l(lock);
_reset_max(m);
return _wait(0);
}

__u64 take(__u64 c = 1) {
Mutex::Locker l(lock);
count += c;
return count;
}

bool get(__u64 c = 1, __u64 m = 0) {
Mutex::Locker l(lock);
_reset_max(m);
bool waited = _wait(c);
count += c;
return waited;
}

__u64 put(__u64 c = 1) {
Mutex::Locker l(lock);
cond.SignalAll();
count -= c;
return count;
}
};


#endif
5 changes: 3 additions & 2 deletions src/common/WorkQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void ThreadPool::worker()
_lock.Unlock();
wq->_void_process(item);
_lock.Lock();
wq->_void_process_finish(item);
dout(15) << "worker wq " << wq->name << " done processing " << item << dendl;
processing--;
if (_pause || _draining)
Expand Down Expand Up @@ -120,12 +121,12 @@ void ThreadPool::unpause()
_lock.Unlock();
}

void ThreadPool::drain()
void ThreadPool::drain(_WorkQueue *wq)
{
dout(10) << "drain" << dendl;
_lock.Lock();
_draining = true;
while (processing)
while (processing || (wq != NULL && !wq->_empty()))
_wait_cond.Wait(_lock);
_draining = false;
_lock.Unlock();
Expand Down
15 changes: 13 additions & 2 deletions src/common/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class ThreadPool {
_WorkQueue(string n) : name(n) {}
virtual ~_WorkQueue() {}
virtual void _clear() = 0;
virtual bool _empty() = 0;
virtual void *_void_dequeue() = 0;
virtual void _void_process(void *) = 0;
virtual void _void_process_finish(void *) = 0;
};

public:
Expand All @@ -44,14 +46,17 @@ class ThreadPool {
virtual void _dequeue(T *) = 0;
virtual T *_dequeue() = 0;
virtual void _process(T *) = 0;
virtual void _clear() = 0;
virtual void _process_finish(T *) {}

void *_void_dequeue() {
return (void *)_dequeue();
}
void _void_process(void *p) {
_process((T *)p);
}
void _void_process_finish(void *p) {
_process_finish((T *)p);
}

public:
WorkQueue(string n, ThreadPool *p) : _WorkQueue(n), pool(p) {
Expand Down Expand Up @@ -88,6 +93,9 @@ class ThreadPool {
void _kick() {
pool->_kick();
}
void drain() {
pool->drain(this);
}

};

Expand Down Expand Up @@ -163,13 +171,16 @@ class ThreadPool {
void unlock() {
_lock.Unlock();
}
void wait(Cond &c) {
c.Wait(_lock);
}

void start();
void stop(bool clear_after=true);
void pause();
void pause_new();
void unpause();
void drain();
void drain(_WorkQueue *wq = 0);
};


Expand Down
4 changes: 2 additions & 2 deletions src/common/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ inline ostream& operator<<(ostream& out, _bad_endl_use_dendl_t) {


// generic macros
#define generic_dout(x) do { if ((x) <= g_conf.debug) { *_dout << dbeginl
#define generic_derr(x) do { if ((x) <= g_conf.debug) { *_derr << dbeginl
#define generic_dout(x) do { if ((x) <= g_conf.debug) { *_dout << dbeginl << std::hex << pthread_self() << std::dec << " "
#define generic_derr(x) do { if ((x) <= g_conf.debug) { *_derr << dbeginl << std::hex << pthread_self() << std::dec << " "

#define pdout(x,p) do { if ((x) <= (p)) { *_dout << dbeginl

Expand Down
13 changes: 11 additions & 2 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ static struct config_option config_optionsp[] = {
OPTION(debug_paxos, 0, OPT_INT, 0),
OPTION(debug_tp, 0, OPT_INT, 0),
OPTION(debug_auth, 0, OPT_INT, 1),
OPTION(debug_finisher, 0, OPT_INT, 1),
OPTION(keyring, 'k', OPT_STR, "~/.ceph/keyring.bin, /etc/ceph/keyring.bin, .ceph_keyring"),
OPTION(supported_auth, 0, OPT_STR, "none"),
OPTION(clock_lock, 0, OPT_BOOL, false),
Expand Down Expand Up @@ -519,15 +520,21 @@ static struct config_option config_optionsp[] = {
OPTION(osd_auto_weight, 0, OPT_BOOL, false),
OPTION(osd_class_timeout, 0, OPT_FLOAT, 10.0),
OPTION(filestore, 0, OPT_BOOL, false),
OPTION(filestore_max_sync_interval, 0, OPT_DOUBLE, .2), // seconds
OPTION(filestore_min_sync_interval, 0, OPT_DOUBLE, .001), // seconds
OPTION(filestore_max_sync_interval, 0, OPT_DOUBLE, 5), // seconds
OPTION(filestore_min_sync_interval, 0, OPT_DOUBLE, .01), // seconds
OPTION(filestore_fake_attrs, 0, OPT_BOOL, false),
OPTION(filestore_fake_collections, 0, OPT_BOOL, false),
OPTION(filestore_dev, 0, OPT_STR, 0),
OPTION(filestore_btrfs_trans, 0, OPT_BOOL, true),
OPTION(filestore_btrfs_snap, 0, OPT_BOOL, true),
OPTION(filestore_flusher, 0, OPT_BOOL, true),
OPTION(filestore_flusher_max_fds, 0, OPT_INT, 512),
OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
OPTION(filestore_journal_parallel, 0, OPT_BOOL, true),
OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false),
OPTION(filestore_queue_max_ops, 0, OPT_INT, 500),
OPTION(filestore_queue_max_bytes, 0, OPT_INT, 100 << 20),
OPTION(filestore_op_threads, 0, OPT_INT, 2),
OPTION(ebofs, 0, OPT_BOOL, false),
OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
OPTION(ebofs_verify, 0, OPT_BOOL, false),
Expand All @@ -543,6 +550,8 @@ static struct config_option config_optionsp[] = {
OPTION(journal_block_align, 0, OPT_BOOL, true),
OPTION(journal_max_write_bytes, 0, OPT_INT, 0),
OPTION(journal_max_write_entries, 0, OPT_INT, 100),
OPTION(journal_queue_max_ops, 0, OPT_INT, 500),
OPTION(journal_queue_max_bytes, 0, OPT_INT, 100 << 20),
OPTION(bdev_lock, 0, OPT_BOOL, true),
OPTION(bdev_iothreads, 0, OPT_INT, 1), // number of ios to queue with kernel
OPTION(bdev_idle_kick_after_ms, 0, OPT_INT, 100), // ms
Expand Down
9 changes: 9 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct md_config_t {
int debug_paxos;
int debug_tp;
int debug_auth;
int debug_finisher;

// clock
bool clock_lock;
Expand Down Expand Up @@ -336,9 +337,15 @@ struct md_config_t {
bool filestore_fake_collections;
const char *filestore_dev;
bool filestore_btrfs_trans;
bool filestore_btrfs_snap;
bool filestore_flusher;
int filestore_flusher_max_fds;
bool filestore_sync_flush;
bool filestore_journal_parallel;
bool filestore_journal_writeahead;
int filestore_queue_max_ops;
int filestore_queue_max_bytes;
int filestore_op_threads;

// ebofs
bool ebofs;
Expand All @@ -358,6 +365,8 @@ struct md_config_t {
bool journal_block_align;
int journal_max_write_bytes;
int journal_max_write_entries;
int journal_queue_max_ops;
int journal_queue_max_bytes;

// block device
bool bdev_lock;
Expand Down
Loading

0 comments on commit 0e33171

Please sign in to comment.