Skip to content

Commit

Permalink
[util] use lighter locking scheme for ThreadMgr
Browse files Browse the repository at this point in the history
Use the rw_spinlock primitive to guard the ThreadMgr's registry
threads instead of Mutex.  Also, use the shared lock pattern to deal
with write and read access to the guarded entities of the ThreadMgr.
In addition, do not hold the lock for the whole duration of generating
the /threadz page for the embedded webserver.

With this patch, ThreadMgr now uses std::unordered_map as a container
for category-specific thread information and enforces stricter
consistency rules while removing thread information from its registry.
The process of adding an information about a thread into the registry
is not guarded by stricter consistency checks (i.e. it stays as it was
before this patch); see below for explanation.

NOTE:
  The stricter consistency around adding a thread into the ThreadMgr's
  registry caused issues with multiprocessing in Python tests,
  and I spent some time trying to work around that.  However, that was
  not fruitful.  I think the proper solution would be keeping the thread
  registry bound to some top-level object (like Kudu client or
  ServerBase object) and cleaning it up in accordance with the object's
  life cycle.

  In essence, the problem happens due to the combination of the
  following:
    * The way how workers are spawned by the multiprocessing.Pool,
      i.e. calling fork() at the point where Pool is instantiated.
    * The fact that pthread_t handles might be the same for threads
      in different processes.

  In detail, if multiprocessing.Pool is spawning worker processes after
  an instance of Kudu client has been created in the main test process,
  every worker gets a copy of the thread registry in its address space.
  The unexpected copy of the registry in a worker process is populated
  with the information on threads spawned due to the activity of the
  Kudu client in the main test process.  Later on, if a worker
  instantiates a Kudu client on its own, the newly spawned threads by
  the worker's Kudu client might have the same pthread_t handles
  as the threads whose information records are inadvertently inherited
  from the main test process.

  Also, it turned out it's impossible to handle worker's crash in a
  multiprocessing.Pool, and that's by design: for details see:
    https://stackoverflow.com/questions/24894682/

  BTW, in Python 3 the problem with the duplicated address space
  in worker processes has been resolved by using context and additional
  worker spawn modes (e.g., 'forkserver').

Change-Id: I4d49c1c39392e01c45019844430a4fe3d116c277
Reviewed-on: http://gerrit.cloudera.org:8080/12112
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
alexeyserbin committed Jan 3, 2019
1 parent 7337cf5 commit c7a2d69
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 98 deletions.
23 changes: 7 additions & 16 deletions src/kudu/util/os-util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <unistd.h>

#include <cstddef>
#include <fstream>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -45,10 +45,8 @@
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/logging.h"
#include "kudu/util/status.h"

using std::ifstream;
using std::istreambuf_iterator;
using std::ostringstream;
using std::string;
using std::vector;
using strings::Split;
Expand All @@ -75,7 +73,7 @@ static const int64_t kIoWait = 41 - 2;
// Largest offset we are interested in, to check we get a well formed stat file.
static const int64_t kMaxOffset = kIoWait;

Status ParseStat(const std::string& buffer, std::string* name, ThreadStats* stats) {
Status ParseStat(const string& buffer, string* name, ThreadStats* stats) {
DCHECK(stats != nullptr);

// The thread name should be the only field with parentheses. But the name
Expand Down Expand Up @@ -117,18 +115,11 @@ Status GetThreadStats(int64_t tid, ThreadStats* stats) {
if (kTicksPerSec <= 0) {
return Status::NotSupported("ThreadStats not supported");
}
faststring buf;
RETURN_NOT_OK(ReadFileToString(
Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf));

ostringstream proc_path;
proc_path << "/proc/self/task/" << tid << "/stat";
ifstream proc_file(proc_path.str().c_str());
if (!proc_file.is_open()) {
return Status::IOError("Could not open ifstream");
}

string buffer((istreambuf_iterator<char>(proc_file)),
istreambuf_iterator<char>());

return ParseStat(buffer, nullptr, stats); // don't want the name
return ParseStat(buf.ToString(), nullptr, stats);
}

void DisableCoreDumps() {
Expand Down
206 changes: 124 additions & 82 deletions src/kudu/util/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <cstring>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>
Expand All @@ -44,17 +45,18 @@
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/os-util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
Expand All @@ -64,12 +66,12 @@

using boost::bind;
using boost::mem_fn;
using std::endl;
using std::map;
using std::ostringstream;
using std::shared_ptr;
using std::string;
using std::vector;
using std::unordered_map;
using strings::Substitute;

METRIC_DEFINE_gauge_uint64(server, threads_started,
Expand Down Expand Up @@ -162,13 +164,13 @@ class ThreadMgr {
}

~ThreadMgr() {
MutexLock l(lock_);
thread_categories_.clear();
}

static void SetThreadName(const std::string& name, int64_t tid);
static void SetThreadName(const string& name, int64_t tid);

Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web);
Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
WebCallbackRegistry* web) const;

// Registers a thread to the supplied category. The key is a pthread_t,
// not the system TID, since pthread_t is less prone to being recycled.
Expand All @@ -180,7 +182,7 @@ class ThreadMgr {
void RemoveThread(const pthread_t& pthread_id, const string& category);

// Metric callback for number of threads running. Also used for error messages.
uint64_t ReadThreadsRunning();
uint64_t ReadThreadsRunning() const;

private:
// Container class for any details we want to capture about a thread
Expand All @@ -204,18 +206,29 @@ class ThreadMgr {
int64_t thread_id_;
};

struct ThreadIdHash {
size_t operator()(pthread_t thread_id) const noexcept {
return std::hash<pthread_t>()(thread_id);
}
};

struct ThreadIdEqual {
bool operator()(pthread_t lhs, pthread_t rhs) const {
return pthread_equal(lhs, rhs) != 0;
}
};

// A ThreadCategory is a set of threads that are logically related.
// TODO: unordered_map is incompatible with pthread_t, but would be more
// efficient here.
typedef map<const pthread_t, ThreadDescriptor> ThreadCategory;
typedef unordered_map<const pthread_t, ThreadDescriptor,
ThreadIdHash, ThreadIdEqual> ThreadCategory;

// All thread categorys, keyed on the category name.
typedef map<string, ThreadCategory> ThreadCategoryMap;
// All thread categories, keyed on the category name.
typedef unordered_map<string, ThreadCategory> ThreadCategoryMap;

// Protects thread_categories_ and thread metrics.
Mutex lock_;
mutable rw_spinlock lock_;

// All thread categorys that ever contained a thread, even if empty
// All thread categories that ever contained a thread, even if empty.
ThreadCategoryMap thread_categories_;

// Counters to track all-time total number of threads, and the
Expand All @@ -224,12 +237,13 @@ class ThreadMgr {
uint64_t threads_running_metric_;

// Metric callback for number of threads started.
uint64_t ReadThreadsStarted();
uint64_t ReadThreadsStarted() const;

// Webpage callback; prints all threads by category.
void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
WebCallbackRegistry::PrerenderedWebResponse* resp);
void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* output);
WebCallbackRegistry::PrerenderedWebResponse* resp) const;
void PrintThreadDescriptorRow(const ThreadDescriptor& desc,
ostringstream* output) const;
};

void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
Expand Down Expand Up @@ -258,9 +272,7 @@ void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
}

Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
WebCallbackRegistry* web) {
MutexLock l(lock_);

WebCallbackRegistry* web) const {
// Use function gauges here so that we can register a unique copy of these metrics in
// multiple tservers, even though the ThreadMgr is itself a singleton.
metrics->NeverRetire(
Expand Down Expand Up @@ -292,13 +304,13 @@ Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metric
return Status::OK();
}

uint64_t ThreadMgr::ReadThreadsStarted() {
MutexLock l(lock_);
uint64_t ThreadMgr::ReadThreadsStarted() const {
shared_lock<decltype(lock_)> l(lock_);
return threads_started_metric_;
}

uint64_t ThreadMgr::ReadThreadsRunning() {
MutexLock l(lock_);
uint64_t ThreadMgr::ReadThreadsRunning() const {
shared_lock<decltype(lock_)> l(lock_);
return threads_running_metric_;
}

Expand All @@ -319,10 +331,23 @@ void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
ANNOTATE_IGNORE_SYNC_BEGIN();
ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
{
MutexLock l(lock_);
thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid);
threads_running_metric_++;
threads_started_metric_++;
// NOTE: Not using EmplaceOrDie() here -- that's because in environments
// where fork() is called after some threads have been spawned, child
// processes will inadvertently inherit the contents of the thread
// registry (i.e. the entries in the thread_categories_ container).
// For some platforms, pthread_t handles for threads in different
// processes might be the same, so using EmplaceOrDie() would induce
// a crash when ThreadMgr::AddThread() is called for a new thread
// in the child process.
//
// TODO(aserbin): maybe, keep the thread_categories_ registry not in a
// global static container, but bind the container with the life cycle
// of some top-level object that uses the ThreadMgr as a singleton.
std::lock_guard<decltype(lock_)> l(lock_);
thread_categories_[category][pthread_id] =
ThreadDescriptor(category, name, tid);
++threads_running_metric_;
++threads_started_metric_;
}
ANNOTATE_IGNORE_SYNC_END();
ANNOTATE_IGNORE_READS_AND_WRITES_END();
Expand All @@ -332,77 +357,94 @@ void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category
ANNOTATE_IGNORE_SYNC_BEGIN();
ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
{
MutexLock l(lock_);
auto category_it = thread_categories_.find(category);
DCHECK(category_it != thread_categories_.end());
category_it->second.erase(pthread_id);
threads_running_metric_--;
std::lock_guard<decltype(lock_)> l(lock_);
auto& threads = FindOrDie(thread_categories_, category);
auto num_erased = threads.erase(pthread_id);
CHECK_EQ(1, num_erased);
--threads_running_metric_;
}
ANNOTATE_IGNORE_SYNC_END();
ANNOTATE_IGNORE_READS_AND_WRITES_END();
}

void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category,
ostringstream* output) {
for (const ThreadCategory::value_type& thread : category) {
ThreadStats stats;
Status status = GetThreadStats(thread.second.thread_id(), &stats);
if (!status.ok()) {
KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
<< status.ToString();
}
(*output) << "<tr><td>" << thread.second.name() << "</td><td>"
<< (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
<< (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
<< (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
void ThreadMgr::PrintThreadDescriptorRow(const ThreadDescriptor& desc,
ostringstream* output) const {
ThreadStats stats;
Status status = GetThreadStats(desc.thread_id(), &stats);
if (!status.ok()) {
KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
<< status.ToString();
}
(*output) << "<tr><td>" << desc.name() << "</td><td>"
<< (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
<< (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
<< (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
}

void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
WebCallbackRegistry::PrerenderedWebResponse* resp) {
ostringstream* output = resp->output;
MutexLock l(lock_);
vector<const ThreadCategory*> categories_to_print;
auto category_name = req.parsed_args.find("group");
void ThreadMgr::ThreadPathHandler(
const WebCallbackRegistry::WebRequest& req,
WebCallbackRegistry::PrerenderedWebResponse* resp) const {
ostringstream& output = *(resp->output);
vector<ThreadDescriptor> descriptors_to_print;
const auto category_name = req.parsed_args.find("group");
if (category_name != req.parsed_args.end()) {
string group = EscapeForHtmlToString(category_name->second);
(*output) << "<h2>Thread Group: " << group << "</h2>" << endl;
const auto& group = category_name->second;
const auto& group_esc = EscapeForHtmlToString(group);
output << "<h2>Thread Group: " << group_esc << "</h2>";
if (group != "all") {
ThreadCategoryMap::const_iterator category = thread_categories_.find(group);
if (category == thread_categories_.end()) {
(*output) << "Thread group '" << group << "' not found" << endl;
shared_lock<decltype(lock_)> l(lock_);
const auto it = thread_categories_.find(group);
if (it == thread_categories_.end()) {
output << "Thread group '" << group_esc << "' not found";
return;
}
categories_to_print.push_back(&category->second);
(*output) << "<h3>" << category->first << " : " << category->second.size()
<< "</h3>";
for (const auto& elem : it->second) {
descriptors_to_print.push_back(elem.second);
}
output << "<h3>" << it->first << " : " << it->second.size() << "</h3>";
} else {
for (const ThreadCategoryMap::value_type& category : thread_categories_) {
categories_to_print.push_back(&category.second);
shared_lock<decltype(lock_)> l(lock_);
for (const auto& category : thread_categories_) {
for (const auto& elem : category.second) {
descriptors_to_print.push_back(elem.second);
}
}
(*output) << "<h3>All Threads : </h3>";
output << "<h3>All Threads : </h3>";
}

(*output) << "<table class='table table-hover table-border'>";
(*output) << "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
<< "<th>Cumulative Kernel CPU(s)</th>"
<< "<th>Cumulative IO-wait(s)</th></tr></thead>";
(*output) << "<tbody>\n";

for (const ThreadCategory* category : categories_to_print) {
PrintThreadCategoryRows(*category, output);
output << "<table class='table table-hover table-border'>"
"<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
"<th>Cumulative Kernel CPU(s)</th>"
"<th>Cumulative IO-wait(s)</th></tr></thead>"
"<tbody>\n";
// Sort the entries in the table by the name of a thread.
// TODO(aserbin): use "mustache + fancy table" instead.
std::sort(descriptors_to_print.begin(), descriptors_to_print.end(),
[](const ThreadDescriptor& lhs, const ThreadDescriptor& rhs) {
return lhs.name() < rhs.name();
});
for (const auto& desc : descriptors_to_print) {
PrintThreadDescriptorRow(desc, &output);
}
(*output) << "</tbody></table>";
output << "</tbody></table>";
} else {
(*output) << "<h2>Thread Groups</h2>";
(*output) << "<h4>" << threads_running_metric_ << " thread(s) running";
(*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>";

for (const ThreadCategoryMap::value_type& category : thread_categories_) {
// Using the tree map (std::map) to have the list of the thread categories
// at the '/threadz' page sorted alphabetically.
// TODO(aserbin): use "mustache + fancy table" instead.
map<string, size_t> thread_categories_info;
{
shared_lock<decltype(lock_)> l(lock_);
output << "<h2>Thread Groups</h2>"
"<h4>" << threads_running_metric_ << " thread(s) running"
"<a href='/threadz?group=all'><h3>All Threads</h3>";
for (const auto& category : thread_categories_) {
thread_categories_info.emplace(category.first, category.second.size());
}
}
for (const auto& elem : thread_categories_info) {
string category_arg;
UrlEncode(category.first, &category_arg);
(*output) << "<a href='/threadz?group=" << category_arg << "'><h3>"
<< category.first << " : " << category.second.size() << "</h3></a>";
UrlEncode(elem.first, &category_arg);
output << "<a href='/threadz?group=" << category_arg << "'><h3>"
<< elem.first << " : " << elem.second << "</h3></a>";
}
}
}
Expand Down Expand Up @@ -496,7 +538,7 @@ Thread::~Thread() {
}
}

std::string Thread::ToString() const {
string Thread::ToString() const {
return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_);
}

Expand All @@ -513,7 +555,7 @@ int64_t Thread::WaitForTid() const {
}


Status Thread::StartThread(const std::string& category, const std::string& name,
Status Thread::StartThread(const string& category, const string& name,
const ThreadFunctor& functor, uint64_t flags,
scoped_refptr<Thread> *holder) {
TRACE_COUNTER_INCREMENT("threads_started", 1);
Expand Down

0 comments on commit c7a2d69

Please sign in to comment.