Skip to content

Commit

Permalink
Merge PR ceph#37746 into master
Browse files Browse the repository at this point in the history
* refs/pull/37746/head:
	client: add a dedicated thread for the Client tick
	client: try to renew caps and flush old cap releases

Reviewed-by: Patrick Donnelly <[email protected]>
  • Loading branch information
batrick committed Nov 15, 2020
2 parents 3541c8d + 0fcf893 commit 11b5847
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 39 deletions.
112 changes: 80 additions & 32 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ Client::~Client()
{
ceph_assert(ceph_mutex_is_not_locked(client_lock));

if (upkeeper.joinable())
upkeeper.join();

// It is necessary to hold client_lock, because any inode destruction
// may call into ObjectCacher, which asserts that it's lock (which is
// client_lock) is held.
Expand Down Expand Up @@ -572,6 +575,14 @@ void Client::shutdown()
// MDS commands, we may have sessions that need closing.
{
std::scoped_lock l{client_lock};

// To make sure the tick thread will be stoppped before
// destructing the Client, just in case like the _mount()
// failed but didn't not get a chance to stop the tick
// thread
tick_thread_stopped = true;
upkeep_cond.notify_one();

_close_sessions();
}
cct->_conf.remove_observer(this);
Expand Down Expand Up @@ -2177,6 +2188,18 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
break;

case CEPH_SESSION_RECALL_STATE:
/*
* Call the renew caps and flush cap releases just before
* triming the caps in case the tick() won't get a chance
* to run them, which could cause the client to be blocklisted
* and MDS daemons trying to recall the caps again and
* again.
*
* In most cases it will do nothing, and the new cap releases
* added by trim_caps() followed will be deferred flushing
* by tick().
*/
renew_and_flush_cap_releases();
trim_caps(session, m->get_max_caps());
break;

Expand Down Expand Up @@ -6079,9 +6102,7 @@ int Client::mount(const std::string &mount_root, const UserPerm& perms,
return r;
}

cl.unlock();
tick(); // start tick
cl.lock();
start_tick_thread(); // start tick thread

if (require_mds) {
while (1) {
Expand Down Expand Up @@ -6379,12 +6400,9 @@ void Client::_unmount(bool abort)
traceout.close();
}

{
std::scoped_lock l(timer_lock);
if (tick_event)
timer.cancel_event(tick_event);
tick_event = 0;
}
// stop the tick thread
tick_thread_stopped = true;
upkeep_cond.notify_one();

_close_sessions();

Expand Down Expand Up @@ -6420,33 +6438,32 @@ void Client::flush_cap_releases()
}
}

void Client::tick()
void Client::renew_and_flush_cap_releases()
{
ldout(cct, 20) << "tick" << dendl;
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

{
std::scoped_lock l(timer_lock);
tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
new LambdaContext([this](int) {
tick();
}));
}
if (!mount_aborted && mdsmap->get_epoch()) {
// renew caps?
utime_t el = ceph_clock_now() - last_cap_renew;
if (unlikely(el > mdsmap->get_session_timeout() / 3.0))
renew_caps();

if (cct->_conf->client_debug_inject_tick_delay > 0) {
sleep(cct->_conf->client_debug_inject_tick_delay);
ceph_assert(0 == cct->_conf.set_val("client_debug_inject_tick_delay", "0"));
cct->_conf.apply_changes(nullptr);
flush_cap_releases();
}
}

void Client::tick()
{
ldout(cct, 20) << "tick" << dendl;

utime_t now = ceph_clock_now();

std::scoped_lock cl(client_lock);
/*
* If the mount() is not finished
*/
if (is_mounting() && !mds_requests.empty()) {
MetaRequest *req = mds_requests.begin()->second;

if (req->op_stamp + cct->_conf->client_mount_timeout < now) {
req->abort(-ETIMEDOUT);
if (req->caller_cond) {
Expand All @@ -6460,14 +6477,7 @@ void Client::tick()
}
}

if (!mount_aborted && mdsmap->get_epoch()) {
// renew caps?
utime_t el = now - last_cap_renew;
if (el > mdsmap->get_session_timeout() / 3.0)
renew_caps();

flush_cap_releases();
}
renew_and_flush_cap_releases();

// delayed caps
xlist<Inode*>::iterator p = delayed_list.begin();
Expand Down Expand Up @@ -6498,6 +6508,44 @@ void Client::tick()
}
}

void Client::start_tick_thread()
{
upkeeper = std::thread([this]() {
using time = ceph::coarse_mono_time;
using sec = std::chrono::seconds;

auto last_tick = time::min();

std::unique_lock cl(client_lock);
while (!tick_thread_stopped) {
auto now = clock::now();
auto since = now - last_tick;

auto t_interval = clock::duration(cct->_conf.get_val<sec>("client_tick_interval"));
auto d_interval = clock::duration(cct->_conf.get_val<sec>("client_debug_inject_tick_delay"));

// Clear the debug inject tick delay
if (unlikely(d_interval.count() > 0)) {
ldout(cct, 20) << "clear debug inject tick delay: " << d_interval << dendl;
ceph_assert(0 == cct->_conf.set_val("client_debug_inject_tick_delay", "0"));
cct->_conf.apply_changes(nullptr);
}

auto interval = std::max(t_interval, d_interval);
if (likely(since >= interval)) {
tick();
last_tick = clock::now();
} else {
interval -= since;
}

ldout(cct, 20) << "upkeep thread waiting interval " << interval << dendl;
if (!tick_thread_stopped)
upkeep_cond.wait_for(cl, interval);
}
});
}

void Client::collect_and_send_metrics() {
ldout(cct, 20) << __func__ << dendl;

Expand Down
12 changes: 10 additions & 2 deletions src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <memory>
#include <set>
#include <string>
#include <thread>

using std::set;
using std::map;
Expand Down Expand Up @@ -243,6 +244,7 @@ class Client : public Dispatcher, public md_config_obs_t {
template <typename T> friend class RWRef;

using Dispatcher::cct;
using clock = ceph::coarse_mono_clock;

typedef int (*add_dirent_cb_t)(void *p, struct dirent *de, struct ceph_statx *stx, off_t off, Inode *in);

Expand Down Expand Up @@ -737,7 +739,9 @@ class Client : public Dispatcher, public md_config_obs_t {
void renew_caps();
void renew_caps(MetaSession *session);
void flush_cap_releases();
void renew_and_flush_cap_releases();
void tick();
void start_tick_thread();

void inc_dentry_nr() {
++dentry_nr;
Expand Down Expand Up @@ -767,11 +771,15 @@ class Client : public Dispatcher, public md_config_obs_t {

xlist<Inode*> &get_dirty_list() { return dirty_list; }

/* timer_lock for 'timer' and 'tick_event' */
/* timer_lock for 'timer' */
ceph::mutex timer_lock = ceph::make_mutex("Client::timer_lock");
Context *tick_event = nullptr;
SafeTimer timer;

/* tick thread */
std::thread upkeeper;
ceph::condition_variable upkeep_cond;
bool tick_thread_stopped = false;

std::unique_ptr<PerfCounters> logger;
std::unique_ptr<MDSMap> mdsmap;

Expand Down
2 changes: 0 additions & 2 deletions src/common/legacy_config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ OPTION(client_cache_size, OPT_INT)
OPTION(client_cache_mid, OPT_FLOAT)
OPTION(client_use_random_mds, OPT_BOOL)
OPTION(client_mount_timeout, OPT_DOUBLE)
OPTION(client_tick_interval, OPT_DOUBLE)
OPTION(client_trace, OPT_STR)
OPTION(client_readahead_min, OPT_LONGLONG) // readahead at _least_ this much.
OPTION(client_readahead_max_bytes, OPT_LONGLONG) // default unlimited
Expand All @@ -365,7 +364,6 @@ OPTION(client_oc_max_dirty_age, OPT_DOUBLE) // max age in cache before writ
OPTION(client_oc_max_objects, OPT_INT) // max objects in cache
OPTION(client_debug_getattr_caps, OPT_BOOL) // check if MDS reply contains wanted caps
OPTION(client_debug_force_sync_read, OPT_BOOL) // always read synchronously (go to osds)
OPTION(client_debug_inject_tick_delay, OPT_INT) // delay the client tick for a number of seconds
OPTION(client_max_inline_size, OPT_U64)
OPTION(client_inject_release_failure, OPT_BOOL) // synthetic client bug for testing
OPTION(client_inject_fixed_oldest_tid, OPT_BOOL) // synthetic client bug for testing
Expand Down
6 changes: 3 additions & 3 deletions src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8563,8 +8563,8 @@ std::vector<Option> get_mds_client_options() {
.set_default(300.0)
.set_description("timeout for mounting CephFS (seconds)"),

Option("client_tick_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
.set_default(1.0)
Option("client_tick_interval", Option::TYPE_SECS, Option::LEVEL_DEV)
.set_default(1)
.set_description("seconds between client upkeep ticks"),

Option("client_trace", Option::TYPE_STR, Option::LEVEL_DEV)
Expand Down Expand Up @@ -8653,7 +8653,7 @@ std::vector<Option> get_mds_client_options() {
.set_default(false)
.set_description(""),

Option("client_debug_inject_tick_delay", Option::TYPE_INT, Option::LEVEL_DEV)
Option("client_debug_inject_tick_delay", Option::TYPE_SECS, Option::LEVEL_DEV)
.set_default(0)
.set_description(""),

Expand Down

0 comments on commit 11b5847

Please sign in to comment.