Thread|Document|2013-07-13
译自:C++11 threads, locks and condition variables。看到如此好文,原谅我的情不自禁,向原作者致谢!
std::thread 类代表一个可执行的线程,在 <thread>
下。std::thread
可以和普通函数,lambdas 函数,仿函数(实现了 opertor()
的类)一起工作。此外,它允许你为你的线程函数传入任意数量的参数。
#include <thread>
void func()
{
// do some work
}
int main()
{
std::thread t(func);
t.join();
return 0;
}
t
是一个执行 func
的线程对象,join
阻塞调用线程(这里是住线程),直到该线程运行结束。线程函数的返回值将被忽略的,但是,线程函数可以传入任意数量的参数。
void func(int i, double d, const std::string& s)
{
std::cout << i << ", " << d << ", " << s << std::endl;
}
int main()
{
std::thread t(func, 1, 12.50, "sample");
t.join();
return 0;
}
虽然可以给线程函数传递任意多的参数,但是都是以值传递的方式传参的。如果需要引用传参,传递的参数必须使用 std::ref 或者std::cref 进行转换。eg:
void func(int& a)
{
a++;
}
int main()
{
int a = 42;
std::thread t(func, std::ref(a));
t.join();
std::cout << a << std::endl;
return 0;
}
参数输出为 43, 如果没有 std::ref
的转换,输出的值应为 42。
除 join
之外,线程类也提供了其它的方法:
-
swap : 把两个线程的相关操作(underlying handles)互换。
-
detach : 允许线程对象继续独立的运行。Detach 的线程不再可连接(你不用等它们了)。
int main() { std::thread t(funct); t.detach(); return 0; }
值得注意的是,如果线程函数抛出一个异常,用普通的 try-catch
块是捕捉不到异常的。换句话说,下面这样是不行的:
try
{
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
}
放大异常可以在线程内部捕捉,然后把它存到之后可以访问到的地方。
std::mutex g_mutex;
std::vector<std::exception_ptr> g_exceptions;
void throw_function()
{
throw std::exception("Something wrong happened");
}
void func()
{
try
{
throw_function();
}
catch(...)
{
std::lock_guard<std::mutex> lock(g_mutex);
g_exceptions.push_back(std::current_exception());
}
}
int main()
{
g_exceptions.clear();
std::thread t(func);
t.join();
for(auto & e : g_exceptions)
{
try
{
if (e != nullptr)
{
std::rethrow_exception(e);
}
}
catch(const std::exception & e)
{
std::cout << e.what() << std::endl;
}
}
return 0;
}
关于捕捉和放大异常更多资料可以阅读这里:Handling C++ exceptions thrown from worker thread in the main thread 和 How can I propagate exceptions between threads?
另外,<thread>
头文件在 std::this_thread
提供了很多有用的函数:
- get_id: 返回当前线程的 ID;
- yield: 告诉调度程序,运行其他线程(在你处于忙等待的时候非常有用);
- sleep_for: 阻塞当前线程直到指定的时段(sleep_duration);
- sleep_util: 阻塞当前线程直到指定的时间(sleep_time);
在最后一个例子中,访问 g_exceptions
向量我需要进行同步来确保在同一时间只有一个线程在进行 push 操作。因此,我使用了 mutex 。C++11 在 <mutex>
头文件中提供了四种 mutex 来做同步操作。
- mutex: 提供了核心函数 lock(), unlock() 和 非阻塞的 try_lock() 函数(判断 mutex 是否可用);
- recursive_mutex: 允许相同线程多次获得 mutex;
- timed_mutex: 和 mutex 类相似,但是它有自己的两个核心方法 try_lock_for() 和 ry_lock_until() 用来尝试在指定的时间段或者时间点获取 mutex ;
- recursive_timed_mutex: timed_mutex 和 recursive_mutex 的综合体。
下面是使用 std::mutex
的例子(注意 get_id()
和 sleep_id()
的用法):
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex g_lock;
void func()
{
g_lock.lock();
std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));
std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
g_lock.unlock();
}
int main()
{
srand((unsigned int)time(0));
std::thread t1(func);
std::thread t2(func);
std::thread t3(func);
t1.join();
t2.join();
t3.join();
return 0;
}
输出可能是这样:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424
lock()
和 unlock
方法简单明了,第一次锁住 mutex, 如果 mutex 不可用的话进行阻塞操作,之后对 mutex 进行解锁。
下面的例子展示了单个的线程安全容器(内部实际用的是 std::vector
)。这个容器有类似于 add()
的操作,添加单个元素和 addrange
添加多个多个元素(其实是多次调用 add()
)。
标注:下面的例子其实并不是真正线程安全的,有几个原因包括 va_args 的使用。并且,dump()
方法不应该属于 container。例子的目的仅仅在于讲解关于 mutex 的概念,而不是一个完整的,无错的,线程安全的容器。
template <typename T>
class container
{
std::mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
_lock.lock();
_elements.push_back(element);
_lock.unlock();
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
_lock.lock();
add(va_arg(arguments, T));
_lock.unlock();
}
va_end(arguments);
}
void dump()
{
_lock.lock();
for(auto e : _elements)
std::cout << e << std::endl;
_lock.unlock();
}
};
void func(container<int>& cont)
{
cont.addrange(3, rand(), rand(), rand());
}
int main()
{
srand((unsigned int)time(0));
container<int> cont;
std::thread t1(func, std::ref(cont));
std::thread t2(func, std::ref(cont));
std::thread t3(func, std::ref(cont));
t1.join();
t2.join();
t3.join();
cont.dump();
return 0;
}
当你运行上面程序的时候,会发现会进入死锁。原因是容器在释放 mutex 之前请求了多次。这时候你就需要使用 std::recusive_mutex
了,它允许线程请求同一个 mutex 多次。可以请求的最大次数没有指定,但是假如到达了请求上限,调用 lock 会抛出一个 std::system_error
的异常。修改上面的代码比较简单,只需要使用 std::recursive_mutex
代替 std::mutex
。
template <typename T>
class container
{
std::recursive_mutex _lock;
// ...
};
输出类似于:
6334
18467
41
6334
18467
41
6334
18467
41
机智的你可能注意到了每一次 func
每一次调用都生成了相同的数字序列。这是因为种子是局部线程的,调用 srand()
只能从主线程上初始化种子。其他的工作线程没有被初始化,所以你每次得到的种子都是一样的。
显式的锁或者解锁可能会导致一些问题,比如忘了解锁或者和请求顺序不同的解锁可能会导致死锁。标准提供了几个类和函数帮助你解决这个问题。包装类(wrapper classes)允许使用 RAII 风格(在代码块中自动加锁和解锁)把 mutexs 一致化。这些包装器有:
- lock_guard : 当对象构造它的时候尝试去请求自己的 mutex (调用 lock),当对象析构它的时候会自动释放 mutex (调用 unlock()),这是一个不能拷贝的类;
- unique_lock : 和 lock_guard不同,它是一个通用的 mutex 包装器。提供了延迟锁(defferd locking), 时间锁(time locking), 递归锁(recursive locking), 转移所有权而用条件变量。这个类也是不可拷贝的类,但是他支持 move 操作。
使用这些包装器重写上面的容器类之后是这样的:
template <typename T>
class container
{
std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
_elements.push_back(element);
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
add(va_arg(arguments, T));
}
va_end(arguments);
}
void dump()
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e : _elements)
std::cout << e << std::endl;
}
};
虽然 dump()
方法应该声明为 const , 因为它并没有修改容器的状态。但是如果你声明为 const 以后,编译器会报如下错误:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
一个 mutex(regardless which implement is used) 必须被请求和释放,实现上调用了非常量的函数 lock()
和 unlock()
。因此 lock_guard
的参数逻辑上不应该是常量。解决这个问题的方法是使用 mutable
声明 mutex 。Mutable 允许在常量方法中使用。
template <typename T>
class container
{
mutable std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void dump() const
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e : _elements)
std::cout << e << std::endl;
}
};
这些 wrapper guards 的构造函数已经负载(overloads)参数指示锁的策略。可用的策略有:
defer_lock_t
类型的defer_lock
: 不请求 mutex ;try_to_lock_t
类型的try_to_lock
: 试图请求 mutex ,不阻塞 ;adopt_lock_t
类型的adopt_lock
: 用 mutex 唤醒调用线程 ;
这些策略的声明像这样:
struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了这些 mutex 的包装器之外,标准也提供了几个对一个或者多个 mutex 加锁的方法:
- lock: 使用避免死锁算法对 mutexes 加锁(通过调用
locks()
,try_locks
和unlock()
)。 - try_lock: 按照指定的 mutexes 顺序调用
try_lock()
尝试调用 mutex 。
下面是一个死锁的例子:我们有一个元素容器并且有一个从一个容器和另外一个容器交换的方法 exchange
。 为了达到线程安全,在两个容器中同步存取,请求不同容器的 mutex 。
template <typename T>
class container
{
public:
std::mutex _lock;
std::set<T> _elements;
void add(T element)
{
_elements.insert(element);
}
void remove(T element)
{
_elements.erase(element);
}
};
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
cont1._lock.lock();
std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock
cont2._lock.lock();
cont1.remove(value);
cont2.add(value);
cont1._lock.unlock();
cont2._lock.unlock();
}
假定这个函数被不同的线程访问,从 容器1 中移除一个元素添加到 容器2 中;然后把移除 容器2 中的元素添加到 容器1 中。这样会导致死锁(如果线程上下文仅仅在第一次请求的时候从一个线程到另外一个线程切换)。
int main()
{
srand((unsigned int)time(NULL));
container<int> cont1;
cont1.add(1);
cont1.add(2);
cont1.add(3);
container<int> cont2;
cont2.add(4);
cont2.add(5);
cont2.add(6);
std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);
std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);
t1.join();
t2.join();
return 0;
}
为了修正这个问题,你可以使用 std::lock
保证在 deadlock-free 的方式下请求 mutex :
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
std::lock(cont1._lock, cont2._lock);
cont1.remove(value);
cont2.add(value);
cont1._lock.unlock();
cont2._lock.unlock();
}