Skip to content

Commit

Permalink
client: implement RDCACHE reference tracking
Browse files Browse the repository at this point in the history
make the code be able to track Fc caps used by aysnc buffer reads

Signed-off-by: Yan, Zheng <[email protected]>
  • Loading branch information
Yan, Zheng committed Apr 4, 2014
1 parent 18201ef commit 701c22a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 38 deletions.
78 changes: 44 additions & 34 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2264,41 +2264,54 @@ void Client::get_cap_ref(Inode *in, int cap)
ldout(cct, 5) << "get_cap_ref got first FILE_BUFFER ref on " << *in << dendl;
in->get();
}
if ((cap & CEPH_CAP_FILE_CACHE) &&
in->cap_refs[CEPH_CAP_FILE_CACHE] == 0) {
ldout(cct, 5) << "get_cap_ref got first FILE_CACHE ref on " << *in << dendl;
in->get();
}
in->get_cap_ref(cap);
}

void Client::put_cap_ref(Inode *in, int cap)
{
bool last = in->put_cap_ref(cap);
int last = in->put_cap_ref(cap);
if (last) {
int put_nref = 0;
int drop = last & ~in->caps_issued();
if (in->snapid == CEPH_NOSNAP) {
if ((cap & CEPH_CAP_FILE_WR) &&
if ((last & CEPH_CAP_FILE_WR) &&
!in->cap_snaps.empty() &&
in->cap_snaps.rbegin()->second->writing) {
ldout(cct, 10) << "put_cap_ref finishing pending cap_snap on " << *in << dendl;
in->cap_snaps.rbegin()->second->writing = 0;
finish_cap_snap(in, in->cap_snaps.rbegin()->second, in->caps_used());
finish_cap_snap(in, in->cap_snaps.rbegin()->second, get_caps_used(in));
signal_cond_list(in->waitfor_caps); // wake up blocked sync writers
}
if (cap & CEPH_CAP_FILE_BUFFER) {
if (last & CEPH_CAP_FILE_BUFFER) {
for (map<snapid_t,CapSnap*>::iterator p = in->cap_snaps.begin();
p != in->cap_snaps.end();
++p)
p->second->dirty_data = 0;
check_caps(in, false);
signal_cond_list(in->waitfor_commit);
ldout(cct, 5) << "put_cap_ref dropped last FILE_BUFFER ref on " << *in << dendl;
put_inode(in);
++put_nref;
}
}
if (cap & CEPH_CAP_FILE_CACHE) {
check_caps(in, false);
if (last & CEPH_CAP_FILE_CACHE) {
ldout(cct, 5) << "put_cap_ref dropped last FILE_CACHE ref on " << *in << dendl;
++put_nref;
}
if (drop) {
if (drop & CEPH_CAP_FILE_CACHE)
_invalidate_inode_cache(in, false);
else
check_caps(in, false);
}
if (put_nref)
put_inode(in, put_nref);
}
}


int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
{
while (1) {
Expand Down Expand Up @@ -2338,6 +2351,14 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
}
}

int Client::get_caps_used(Inode *in)
{
unsigned used = in->caps_used();
if (!(used & CEPH_CAP_FILE_CACHE) &&
!objectcacher->set_is_empty(&in->oset))
used |= CEPH_CAP_FILE_CACHE;
return used;
}

void Client::cap_delay_requeue(Inode *in)
{
Expand Down Expand Up @@ -2434,7 +2455,7 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
void Client::check_caps(Inode *in, bool is_delayed)
{
unsigned wanted = in->caps_wanted();
unsigned used = in->caps_used();
unsigned used = get_caps_used(in);
unsigned cap_used;

int retain = wanted | CEPH_CAP_PIN;
Expand Down Expand Up @@ -2548,7 +2569,7 @@ struct C_SnapFlush : public Context {

void Client::queue_cap_snap(Inode *in, snapid_t seq)
{
int used = in->caps_used();
int used = get_caps_used(in);
int dirty = in->caps_dirty();
ldout(cct, 10) << "queue_cap_snap " << *in << " seq " << seq << " used " << ccap_string(used) << dendl;

Expand Down Expand Up @@ -2741,9 +2762,8 @@ void Client::_async_invalidate(Inode *in, int64_t off, int64_t len, bool keep_ca
ino_invalidate_cb(ino_invalidate_cb_handle, in->vino(), off, len);

client_lock.Lock();
if (!keep_caps) {
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
}
if (!keep_caps)
check_caps(in, false);
put_inode(in);
client_lock.Unlock();
ldout(cct, 10) << "_async_invalidate " << off << "~" << len << (keep_caps ? " keep_caps" : "") << " done" << dendl;
Expand All @@ -2755,8 +2775,7 @@ void Client::_schedule_invalidate_callback(Inode *in, int64_t off, int64_t len,
// we queue the invalidate, which calls the callback and decrements the ref
async_ino_invalidator.queue(new C_Client_CacheInvalidate(this, in, off, len, keep_caps));
else if (!keep_caps)
// if not set, we just decrement the cap ref here
in->put_cap_ref(CEPH_CAP_FILE_CACHE);
check_caps(in, false);
}

void Client::_invalidate_inode_cache(Inode *in, bool keep_caps)
Expand Down Expand Up @@ -2787,7 +2806,7 @@ void Client::_invalidate_inode_cache(Inode *in, int64_t off, int64_t len, bool k
void Client::_release(Inode *in)
{
ldout(cct, 20) << "_release " << *in << dendl;
if (in->cap_refs[CEPH_CAP_FILE_CACHE]) {
if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0) {
_invalidate_inode_cache(in, false);
}
}
Expand Down Expand Up @@ -2860,12 +2879,7 @@ void Client::_flushed(Inode *in)
{
ldout(cct, 10) << "_flushed " << *in << dendl;

// release clean pages too, if we dont hold RDCACHE reference
if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0) {
_invalidate_inode_cache(in, true);
}

put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
put_cap_ref(in, CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER);
}


Expand Down Expand Up @@ -3047,7 +3061,7 @@ void Client::trim_caps(MetaSession *s, int max)
int mine = cap->issued | cap->implemented;
int oissued = in->auth_cap ? in->auth_cap->issued : 0;
// disposable non-auth cap
if (!(in->caps_used() & ~oissued & mine)) {
if (!(get_caps_used(in) & ~oissued & mine)) {
ldout(cct, 20) << " removing unused, unneeded non-auth cap on " << *in << dendl;
remove_cap(cap, true);
trimmed++;
Expand Down Expand Up @@ -3138,7 +3152,7 @@ void Client::flush_caps(Inode *in, MetaSession *session)
int wanted = in->caps_wanted();
int retain = wanted | CEPH_CAP_PIN;

send_cap(in, session, cap, in->caps_used(), wanted, retain, in->flushing_caps);
send_cap(in, session, cap, get_caps_used(in), wanted, retain, in->flushing_caps);
}

void Client::wait_sync_caps(uint64_t want)
Expand Down Expand Up @@ -3700,7 +3714,7 @@ void Client::_invalidate_inode_parents(Inode *in)
void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m)
{
int mds = session->mds_num;
int used = in->caps_used();
int used = get_caps_used(in);
int wanted = in->caps_wanted();

const int old_caps = cap->issued;
Expand Down Expand Up @@ -3764,12 +3778,12 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
cap->issued = new_caps;
cap->implemented |= new_caps;

if ((~cap->issued & old_caps) & CEPH_CAP_FILE_CACHE)
_release(in);

if (((used & ~new_caps) & CEPH_CAP_FILE_BUFFER) &&
!_flush(in)) {
// waitin' for flush
} else if ((old_caps & ~new_caps) & CEPH_CAP_FILE_CACHE) {
_release(in);
} else {
cap->wanted = 0; // don't let check_caps skip sending a response to MDS
check = true;
Expand All @@ -3796,7 +3810,7 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
}

if (check)
check_caps(in, true);
check_caps(in, false);

// wake up waiters
if (new_caps)
Expand Down Expand Up @@ -6145,10 +6159,6 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
readahead = false;
}

// we will populate the cache here
if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0)
in->get_cap_ref(CEPH_CAP_FILE_CACHE);

ldout(cct, 10) << "readahead=" << readahead << " nr_consec=" << f->nr_consec_read
<< " max_byes=" << conf->client_readahead_max_bytes
<< " max_periods=" << conf->client_readahead_max_periods << dendl;
Expand Down Expand Up @@ -6451,7 +6461,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) {
// do buffered write
if (!in->oset.dirty_or_tx)
get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
get_cap_ref(in, CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER);

get_cap_ref(in, CEPH_CAP_FILE_BUFFER);

Expand Down
1 change: 1 addition & 0 deletions src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ class Client : public Dispatcher {
void kick_flushing_caps(MetaSession *session);
void kick_maxsize_requests(MetaSession *session);
int get_caps(Inode *in, int need, int want, int *have, loff_t endoff);
int get_caps_used(Inode *in);

void maybe_update_snaprealm(SnapRealm *realm, snapid_t snap_created, snapid_t snap_highwater,
vector<snapid_t>& snaps);
Expand Down
6 changes: 3 additions & 3 deletions src/client/Inode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ void Inode::get_cap_ref(int cap)
}
}

bool Inode::put_cap_ref(int cap)
int Inode::put_cap_ref(int cap)
{
// if cap is always a single bit (which it seems to be)
// all this logic is equivalent to:
// if (--cap_refs[c]) return false; else return true;
bool last = false;
int last = 0;
int n = 0;
while (cap) {
if (cap & 1) {
Expand All @@ -126,7 +126,7 @@ bool Inode::put_cap_ref(int cap)
assert(cap_refs[c] > 0);
}
if (--cap_refs[c] == 0)
last = true;
last |= c;
//cout << "inode " << *this << " put " << cap_string(c) << " " << (cap_refs[c]+1) << " -> " << cap_refs[c] << std::endl;
}
cap >>= 1;
Expand Down
2 changes: 1 addition & 1 deletion src/client/Inode.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class Inode {
bool put_open_ref(int mode);

void get_cap_ref(int cap);
bool put_cap_ref(int cap);
int put_cap_ref(int cap);
bool is_any_caps();
bool cap_is_valid(Cap* cap);
int caps_issued(int *implemented = 0);
Expand Down
12 changes: 12 additions & 0 deletions src/osdc/ObjectCacher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,18 @@ void ObjectCacher::flusher_entry()

// -------------------------------------------------

bool ObjectCacher::set_is_empty(ObjectSet *oset)
{
assert(lock.is_locked());
if (oset->objects.empty())
return true;

for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
if (!(*p)->is_empty())
return false;

return true;
}

bool ObjectCacher::set_is_cached(ObjectSet *oset)
{
Expand Down
1 change: 1 addition & 0 deletions src/osdc/ObjectCacher.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ class ObjectCacher {
bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);

public:
bool set_is_empty(ObjectSet *oset);
bool set_is_cached(ObjectSet *oset);
bool set_is_dirty_or_committing(ObjectSet *oset);

Expand Down

0 comments on commit 701c22a

Please sign in to comment.