Skip to content

Commit

Permalink
Merge pull request ceph#58409 from baergj/upstream-fix-async-discard-…
Browse files Browse the repository at this point in the history
…on-start

blk/KernelDevice: React to bdev_enable_discard changes in handle_conf_change(); Fix several issues with stopping discard threads
  • Loading branch information
ljflores authored Jul 29, 2024
2 parents d09655f + 617c936 commit 0ab5214
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 77 deletions.
119 changes: 44 additions & 75 deletions src/blk/kernel/KernelDevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ int KernelDevice::open(const string& p)
{
path = p;
int r = 0, i = 0;
uint64_t num_discard_threads = 0;
dout(1) << __func__ << " path " << path << dendl;

struct stat statbuf;
Expand Down Expand Up @@ -286,10 +285,7 @@ int KernelDevice::open(const string& p)
goto out_fail;
}

num_discard_threads = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
if (support_discard && cct->_conf->bdev_enable_discard && num_discard_threads > 0) {
_discard_start();
}
_discard_update_threads();

// round size down to an even block
size &= ~(block_size - 1);
Expand Down Expand Up @@ -536,47 +532,49 @@ void KernelDevice::_aio_stop()
}
}

void KernelDevice::_discard_start()
void KernelDevice::_discard_update_threads()
{
uint64_t num = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
dout(10) << __func__ << " starting " << num << " threads" << dendl;

std::unique_lock l(discard_lock);

target_discard_threads = num;
discard_threads.reserve(num);
for(uint64_t i = 0; i < num; i++)
{
// All threads created with the same name
discard_threads.emplace_back(new DiscardThread(this, i));
discard_threads.back()->create("bstore_discard");
}
uint64_t oldcount = discard_threads.size();
uint64_t newcount = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");
if (!cct->_conf.get_val<bool>("bdev_enable_discard") || !support_discard || discard_stop) {
newcount = 0;
}

// Increase? Spawn now, it's quick
if (newcount > oldcount) {
dout(10) << __func__ << " starting " << (newcount - oldcount) << " additional discard threads" << dendl;
discard_threads.reserve(newcount);
for(uint64_t i = oldcount; i < newcount; i++)
{
// All threads created with the same name
discard_threads.emplace_back(new DiscardThread(this, i));
discard_threads.back()->create("bstore_discard");
}
// Decrease? Signal threads after telling them to stop
} else if (newcount < oldcount) {
dout(10) << __func__ << " stopping " << (oldcount - newcount) << " existing discard threads" << dendl;

// Signal the last threads to quit, and stop tracking them
for(uint64_t i = oldcount; i > newcount; i--)
{
discard_threads[i-1]->stop = true;
discard_threads[i-1]->detach();
}
discard_threads.resize(newcount);

dout(10) << __func__ << " started " << num << " threads" << dendl;
discard_cond.notify_all();
}
}

void KernelDevice::_discard_stop()
{
dout(10) << __func__ << dendl;

// Signal threads to stop, then wait for them to join
{
std::unique_lock l(discard_lock);
while (discard_threads.empty()) {
discard_cond.wait(l);
}

for(auto &t : discard_threads) {
t->stop = true;
}

discard_cond.notify_all();
}

// Threads are shared pointers and are cleaned up for us
for(auto &t : discard_threads)
t->join();
discard_threads.clear();
discard_stop = true;
_discard_update_threads();
discard_drain();

dout(10) << __func__ << " stopped" << dendl;
}
Expand Down Expand Up @@ -761,6 +759,13 @@ void KernelDevice::_discard_thread(uint64_t tid)
discard_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
// If there are non-stopped discard threads and we have been requested
// to stop, do so now. Otherwise, we need to proceed because
// discard_queued is non-empty and at least one thread is needed to
// drain it.
if (thr->stop && !discard_threads.empty())
break;

// Limit local processing to MAX_LOCAL_DISCARD items.
// This will allow threads to work in parallel
// instead of a single thread taking over the whole discard_queued.
Expand Down Expand Up @@ -1513,6 +1518,7 @@ const char** KernelDevice::get_tracked_conf_keys() const
{
static const char* KEYS[] = {
"bdev_async_discard_threads",
"bdev_enable_discard",
NULL
};
return KEYS;
Expand All @@ -1521,44 +1527,7 @@ const char** KernelDevice::get_tracked_conf_keys() const
void KernelDevice::handle_conf_change(const ConfigProxy& conf,
const std::set <std::string> &changed)
{
if (changed.count("bdev_async_discard_threads")) {
std::unique_lock l(discard_lock);

uint64_t oldval = target_discard_threads;
uint64_t newval = cct->_conf.get_val<uint64_t>("bdev_async_discard_threads");

target_discard_threads = newval;

// Increase? Spawn now, it's quick
if (newval > oldval) {
dout(10) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl;
discard_threads.reserve(target_discard_threads);
for(uint64_t i = oldval; i < newval; i++)
{
// All threads created with the same name
discard_threads.emplace_back(new DiscardThread(this, i));
discard_threads.back()->create("bstore_discard");
}
} else {
// Decrease? Signal threads after telling them to stop
dout(10) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl;

// Decreasing to zero is exactly the same as disabling async discard.
// Signal all threads to stop
if(newval == 0) {
_discard_stop();
} else {
// Signal the last threads to quit, and stop tracking them
for(uint64_t i = oldval - 1; i >= newval; i--)
{
// Also detach the thread so we no longer need to join
discard_threads[i]->stop = true;
discard_threads[i]->detach();
discard_threads.erase(discard_threads.begin() + i);
}
}

discard_cond.notify_all();
}
if (changed.count("bdev_async_discard_threads") || changed.count("bdev_enable_discard")) {
_discard_update_threads();
}
}
4 changes: 2 additions & 2 deletions src/blk/kernel/KernelDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KernelDevice : public BlockDevice,
aio_callback_t discard_callback;
void *discard_callback_priv;
bool aio_stop;
bool discard_stop;

ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
ceph::condition_variable discard_cond;
Expand All @@ -78,7 +79,6 @@ class KernelDevice : public BlockDevice,
}
};
std::vector<std::shared_ptr<DiscardThread>> discard_threads;
uint64_t target_discard_threads = 0;

std::atomic_int injecting_crash;

Expand All @@ -93,7 +93,7 @@ class KernelDevice : public BlockDevice,
int _aio_start();
void _aio_stop();

void _discard_start();
void _discard_update_threads();
void _discard_stop();
bool _discard_started();

Expand Down

0 comments on commit 0ab5214

Please sign in to comment.