Skip to content

Commit

Permalink
crimson/thread: pin thread pool to given CPU
Browse files Browse the repository at this point in the history
to take the full advantage of seastar reactor, we need to confine the
alien threads to given CPU core(s). in current setting, it's assumed
that alien threads do not perform CPU-bound tasks, so a single core
would suffice. we can always change the interface to pass a cpu_set_t
or a std::bitset if one CPU core is not enough.

Signed-off-by: Kefu Chai <[email protected]>
  • Loading branch information
tchaikov committed Jun 29, 2018
1 parent fade1ef commit 460ad1f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
18 changes: 16 additions & 2 deletions src/crimson/thread/ThreadPool.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#include "ThreadPool.h"

#include <pthread.h>
#include "crimson/net/Config.h"
#include "include/intarith.h"

namespace ceph::thread {

ThreadPool::ThreadPool(size_t n_threads,
size_t queue_sz)
size_t queue_sz,
unsigned cpu_id)
: queue_size{round_up_to(queue_sz, seastar::smp::count)},
pending{queue_size}
{
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([this] {
threads.emplace_back([this, cpu_id] {
pin(cpu_id);
loop();
});
}
Expand All @@ -23,6 +27,16 @@ ThreadPool::~ThreadPool()
}
}

void ThreadPool::pin(unsigned cpu_id)
{
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(cpu_id, &cs);
[[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
sizeof(cs), &cs);
assert(r == 0);
}

void ThreadPool::loop()
{
for (;;) {
Expand Down
6 changes: 4 additions & 2 deletions src/crimson/thread/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ThreadPool {
bool is_stopping() const {
return stopping.load(std::memory_order_relaxed);
}
static void pin(unsigned cpu_id);
seastar::semaphore& local_free_slots() {
return submit_queue.local().free_slots;
}
Expand All @@ -82,11 +83,12 @@ class ThreadPool {
* it waits in this queue. we will round this number to
* multiple of the number of cores.
* @param n_threads the number of threads in this thread pool.
* @note, each @c Task has its own ceph::thread::Condition, which possesses
* @param cpu the CPU core to which this thread pool is assigned
* @note each @c Task has its own ceph::thread::Condition, which possesses
* possesses an fd, so we should keep the size of queue under a resonable
* limit.
*/
ThreadPool(size_t n_threads, size_t queue_sz);
ThreadPool(size_t n_threads, size_t queue_sz, unsigned cpu);
~ThreadPool();
seastar::future<> start();
seastar::future<> stop();
Expand Down
2 changes: 1 addition & 1 deletion src/test/crimson/test_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ seastar::future<> test_accumulate(ThreadPool& tp) {

int main(int argc, char** argv)
{
ThreadPool tp{2, 128};
ThreadPool tp{2, 128, 0};
seastar::app_template app;
return app.run(argc, argv, [&tp] {
return tp.start().then([&tp] {
Expand Down

0 comments on commit 460ad1f

Please sign in to comment.