C++11/14 新特性 (多线程)
目录
在 C++11 之前 ,C++ 标准并没有提供统一的并发编程标准,也没有提供语言级别的支持。这导致我们在编写可移植的多线程程序时很不方便,往往需要面向不同的平台进行不同的实现,或者引入一些第三方平台,如Boost,pthread_win32 等。 从C++11开始 ,对并发编程进行了语言级别的支持,使用使用C++进行并发编程方便了很多。这里介绍C++11并发编程的相关特性。
1 线程
1.1 线程的创建
std::thread 的构造函数如下:
1 2 |
template< class Function, class... Args > explicit thread( Function&& f, Args&&... args ); |
我们只需要提供线程函数或函数对象,即可以创建线程,并可以同时指定线程函数的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
#include <iostream> #include <thread> using namespace std; void func(const string& str){ cout<<str<<endl; } class FuncCls{ public: void operator()(){ cout<<"Funcls called"<<endl; } }; int main(void){ thread t1(func,"Hello World"); FuncCls fcls; thread t2(fcls); if(t1.joinable()){ t1.join(); } t2.join(); return 0; } |
join函数将会阻塞线程,直到线程函数执行完毕,主线程才会接着执行。如果线程函数有返回值,返回值将被忽略。 在使用线程对象的过程中,我们需要注意线程对象的生命周期。如果线程对象先于线程函数结束,那么将会出现不可预料的错误。可以通过线程阻塞的方式来等待线程函数执行完(join),或让线程在后台执行。 如果不希望线程被阻塞,可以调用线程的 detach() 函数,将线程与线程对象分离。但需要注意的是,detach 之后的线程无法再使用join来进行阻塞了,即detach之后的线程,我们无法控制了。当我们不确定一个线程是否可以join时,可以先使用 thead::joinable() 来进行判断。
另外,我们还可以通过 std::bind, lambda 来创建线程(其实就是使用函数对象创建线程)。
线程不可以被复制,但是可以被移动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#include <iostream> #include <thread> using namespace std; void func(const string& str){ cout<<str<<endl; } int main(void){ thread t1(func,"Hello World"); thread t2(move(t1)); //t1.join(); //ERROR libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: thread::join failed: No such process t2.join(); return 0; } |
线程被移动之后,原来的线程对象将不再代表任何线程。
1.1.1 thread 的一般性用法
1 2 3 4 5 6 7 8 9 10 11 12 13 |
void func(int nSec){ std::this_thread::sleep_for(std::chrono::seconds(nSec)); cout<<"time out"<<endl; } int main(void){ thread t1(func,3); //阻塞休眠3s t1.join(); cout<<std::thread::hardware_concurrency()<<endl; //获取cpu核心数 return 0; } |
2 互斥量和锁
互斥时用来保护被多个线程同时访问的共享数据 c++11提供了4种语义的互斥量:
- std::mutex 独占的互斥量
- std::timed_mutex 带超时的独占互斥量,不可递归使用
- std::recursive_mutex 递归互斥量,不带超时功能
- std::recursive_timed_mutex 带超时的递归互斥量
2.1 独占互斥量
一般的用法是通过 lock() 函数来阻塞线程,直到获得互斥量的所有权,然后开始执行任务。在完成任务后,使用 unlock() 来释放互斥量。lock() 和 unlock() 必须成对出现,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#include <iostream> #include <thread> #include <string> using namespace std; std::mutex g_mu; void func(const string& str,int i){ g_mu.lock(); try{ cout<<i<<" "<<str<<endl; //do something } catch(...){ g_mu.unlock(); } g_mu.unlock(); } void sharefunc(const string& str){ for(int i = 0; i<20; i++){ func(str,i); } } int main(void){ string str1("str_1"),str2("str_2"); thread t1(sharefunc,str1),t2(sharefunc,str2); t1.join(); t2.join(); return 0; } |
这里需要注意的是,在使用 lock() 占用互斥量后,必须使用 unlock() 来解除占用,否则互斥量会一直被占用。 我们推荐使用另一种更安全更简单的方法: lock_guard 。因为 lock_guard 在构造时会自动锁定互斥量,而在退出作用域时会自动解锁,从而避免没有更新 unlock 。
那么上面的的例子中的func可以这么写:
1 2 3 4 |
void func(const string& str,int i){ std::lock_guard<std::mutex> locker(g_mu); cout<<i<<" "<<str<<endl; } |
2.2 递归的独占互斥量
std::recursive_mutex,递归的独占互斥量,它允许同一线程多次获得该互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
class MyPoint { public: MyPoint(int x,int y): m_x(x),m_y(y){} public: void moveX(int i){ std::lock_guard<std::mutex> locker(m_mu); m_x += i; } void moveY(int i){ std::lock_guard<std::mutex> locker(m_mu); m_y += i; } void move(int x,int y){ std::lock_guard<std::mutex> locker(m_mu); moveX(x); moveY(y); } private: std::mutex m_mu; int m_x,m_y; }; void func(){ MyPoint pt(0,0); pt.move(-1, 1); } int main(void){ std::thread t1(func); t1.join(); return 0; } |
上例会发生死锁。因为我们在调用 move() 时已经锁住了互斥量,move() 内调用 moveX() 时又请求互斥量,但这个时候互斥量被当前线程独占,未出作用域无法释放。这样就发生了死锁。要解决这个问题,一个简单的方法是使用std::recursive_mutex,它允许同一线程多次获得互斥量。
2.3 带超时的互斥量
time_mutex 和 recursive_mutex 是带超时的互斥锁。与前面两种互斥量的区别是它们带有超时等待功能。它们多了两个超时获取锁的接口, try_lock_for 和 try_lock_until 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
std::timed_mutex g_tmu; void func(){ while (true) { if(g_tmu.try_lock_for(std::chrono::seconds(3))){ cout<<"get mutex"<<endl; g_tmu.unlock(); } else{ break; } } } int main(void){ std::thread t1(func); t1.join(); return 0; } |
2.4 lock_guard 和 unique_lock
在上面的例子中,我们提到了 lock_guard ,它可以在一个作用域内为互斥量提供锁,当 lock_guard 对象析构的时候,互斥量的锁同时被解除。 unique_lock 相比 lock_gurad 更加灵活,可以减小锁的粒度。
同 lock_guard 一样, unique_lock 在构造时立即加锁。但它也可以在构造时不加锁,而等到合适的时候再加锁,这需要为构造函数 unique_lock() 提供 std::defer_lock 参数,当需要加锁的时候,再执行 unique_lock:: lock() 函数来加锁,并可以在合适的时候调用 unique_lock::unlock() 来解锁。unique_lock 可以在其作用域内重复的加解锁,而 lock_guard 不可以。
2.5 call_once 和 once_flag
std::call_once 可以保证函数在多线程环境中只被调用一次。 once_flag 是 call_once 的一个参数。 例如在日志系统中,需要确保日志只被打开一次,我们可以使用 mutex 并加锁,但这并不是线程安全的:
1 2 3 4 5 6 7 8 9 10 11 12 |
class LogFile{ public: LogFile(){ if(!m_file.is_open()){ std::unique_lock<std::mutex> ulocker(m_mu); m_file.open("log.txt"); } } private: std::ofstream m_file; std::mutex m_mu; //用于锁定 m_file }; |
这个类并不是线程安全的。当第一个线程得到互斥时 m_mu ,并正在对文件进行 open() 操作时,第二个线程可能已经在等待互斥量了,当第一个线程执行完后,第二个线程立刻获得互斥量进行 open() 操作。那么我们进行如下的改进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class LogFile{ public: LogFile(){ { std::unique_lock<std::mutex> ulocker(m_mu); if(!m_file.is_open()){ m_file.open("log.txt"); } } } private: std::ofstream m_file; std::mutex m_mu; //用于锁定 m_file }; |
这段代码确实是线程安全的了。但是有另外一个问题,每次构造LogFile的实例的时候,都要执行加锁的代码,这造成了资源的浪费。C++11提供了有效的解决方案:使用 call_once:
1 2 3 4 5 6 7 8 9 |
class LogFile{ public: LogFile(){ std::call_once(m_fopenOnce,[&](){m_file.open("log.txt");}); } private: std::once_flag m_fopenOnce; std::ofstream m_file; }; |
这段代码, m_file 文件只会被多线程打开一次,而且更高效更简洁更方便。
3 条件变量
条件变量是c++11 提供的另种用于等待的同步机制,它能够阻塞一个或多个线程,直到收到另一个线程发出的通知或者超时才会唤醒当前阻塞的线程。C++11 两种条件就是:
- condition_variable ,配合 unique_lock 进行 wait 操作
- condition_variable_any ,配合任何带有 lock/unlock 语义的mutex 使用,比较灵活,但效率比 condition_variable 略低。
条件变量经常用于不同线程间共享数据的读取。例如一个简单的生产者消息者问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#include <iostream> #include <thread> #include <queue> std::queue<int> g_que; //产品池 std::mutex g_mu; const int g_maxSize = 10; //产品池最大容量 // 生产者 void producer(){ std::unique_lock<std::mutex> locker(g_mu,std::defer_lock); int i = 0; while (true) { if(g_maxSize == g_que.size()){ //当产品池满时,暂停生产并等待消费者减少库存 std::cout<<" full and wait for consumer"<<std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } else{ i++; std::this_thread::sleep_for(std::chrono::milliseconds(50)); locker.lock(); g_que.push(i); std::cout<<" push "<<i<<std::endl; locker.unlock(); } } } //消费者 void consumer(){ std::unique_lock<std::mutex> locker(g_mu,std::defer_lock); while (true) { if(g_que.empty()){ //当无产品时,暂停消费并等待生产者生产产品 std::cout<<" empty and wait for producer"<<std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } else{ locker.lock(); std::cout<<g_que.front()<<" and left size = "<<g_que.size()<<std::endl; g_que.pop(); locker.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } } int main(void){ std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; } |
在这段代码中,生产者和消费者都会去检查产品池的容量:对消费者来说,如果没有产品,则等待生产者向产品池中添加产品;对生产者来说,如果产品池已满,则等待消费者清理产品池空间。在这里,我们让线程 sleep 一段时间来达到这种效果,那么问题来了:等待的时长如何确定?如果等待的时间过长,必然会造成资源的浪费,如果等待的时间过短,则需要更多次的循环才能找到生产或消费的时机。这个时候我们可以使用条件变量。 使用条件变量修改后的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#include <iostream> #include <thread> #include <queue> #include <condition_variable> std::queue<int> g_que; //产品池 std::mutex g_mu; //保护产品池的互斥量 std::condition_variable g_notEmpty; //产品池不为空的条件 std::condition_variable g_notFull; //产品池不满的条件 const int g_maxSize = 10; //产品池最大容量 // 生产者 void producer(){ int i = 0; while (true) { std::unique_lock<std::mutex> locker(g_mu); if(g_maxSize == g_que.size()){ std::cout<<" full and wait for consumer"<<std::endl; g_notFull.wait(locker); //当产品池满时,阻塞当前线程,暂停生产并等待 notify 提醒 } i++; std::this_thread::sleep_for(std::chrono::milliseconds(50)); g_que.push(i); std::cout<<" push "<<i<<std::endl; locker.unlock(); g_notEmpty.notify_one(); } } //消费者 void consumer(){ while (true) { std::unique_lock<std::mutex> locker(g_mu); if(g_que.empty()){ std::cout<<" empty and wait for producer"<<std::endl; g_notEmpty.wait(locker); //当无产品时,阻塞线程,暂停消费并等待 生产者添加产品后发送 notify 提醒 } std::cout<<g_que.front()<<" and left size = "<<g_que.size()<<std::endl; g_que.pop(); locker.unlock(); g_notFull.notify_one(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } int main(void){ std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; } |
条件变量使用的一般流程是:
- 拥有条件变量的线程获取互斥量
- 检查条件,如果条件不满足则 wait 阻塞,直到 notify 提醒;如果条件满足,则向下执行
- 某个线程使(2.)的条件满足后,调用 notify_one() 或 notify_all() 唤醒一个或所有等待的线程。
wait 还有一个重载的方法,可以接受一个条件函数f,如果满足条件(f return true),则重新获取mutex,然后结束wait;如果不满足条件(f return false),则释放 mutex,将线程置于 waitting状态,直到收到notify,上面的生产者部分的代码还可以这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 生产者 void producer(){ int i = 0; while (true) { std::unique_lock<std::mutex> locker(g_mu); g_notFull.wait(locker,[](){ if(g_maxSize == g_que.size()){ std::cout<<" full and wait for consumer"<<std::endl; return false; } else{ return true; } }); i++; std::this_thread::sleep_for(std::chrono::milliseconds(50)); g_que.push(i); std::cout<<" push "<<i<<std::endl; locker.unlock(); g_notEmpty.notify_one(); } } |
需要注意,条件 wait 时,会释放 mutex.当条件 wait 结束时,会重新获取 mutex.所以需要在条件wait 之后就获取互斥量,否则可能会引发死锁问题。
4 线程异步操作
4.1 std::future
在前面几节我们知道,通过 thread.join() 不能直接获取线程函数的返回值,需要定义一个变量,通过引用传递给线程函数,再在线程函数中给这个变量赋值。这个过程比较繁琐。 C++11提供了 future 来访问异步操作的结果(因为一个异步操作的结果不能马上获取,而不需要在未来的某个时间来获取,所以叫做future),它是一种获取异步操作结果的通道。
future 有3种状态:
- Defeered. 异步操作还没有开始
- Ready. 异步操作已经完成
- Timeout. 异步操作超时
我们可以通过 future 的状态来了解线程内部的执行情况。 获取 future 的状态有3种方式:
- get. 等待异步操作结束并返回结果
- wait. 只是等待异步操作完成,没有返回值
- wait_for. 超时等待返回结果
使用 std::async() 函数可以直接创建异步任务,并将结果保存在 future 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
int func(){ std::this_thread::sleep_for(std::chrono::seconds(3)); return 9; } int main(void){ std::future<int> fn = std::async(func); std::future_status stat; do{ stat = fn.wait_for(std::chrono::milliseconds(500)); switch (stat) { case std::future_status::deferred: std::cout<<"deferred"<<std::endl; break; case std::future_status::ready: std::cout<<"ready"<<std::endl; break; case std::future_status::timeout: std::cout<<"timeout"<<std::endl; break; default: break; } }while(stat != std::future_status::ready ); int n = fn.get(); std::cout<<n<<std::endl; return 0; } |
如果我们对异步执行的过程不关心,则直接使用 future.get() 函数来等待异步任务的结果就可以了。
例如,我们需要创建一个子线程让其执行任务,接下来可以在主线程里执行其它的任务。在某个时间上,我们需要用到子线程的返回值。如果此时子线程执行完毕,则可以直接获取结果,如果此时子线程还没有执行完毕,则主线程阻塞直到子线程执行完毕。 这个过程使用 mutex 和 condition 的实现是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
#include <iostream> #include <thread> #include <future> #include <condition_variable> std::mutex g_mu; std::condition_variable g_val; std::unique_lock<std::mutex> ulocker(g_mu,std::defer_lock); void func(int& n){ ulocker.lock(); std::this_thread::sleep_for(std::chrono::seconds(3)); n = 9; ulocker.unlock(); g_val.notify_one(); } int main(void){ int n = 0; std::thread t1(func,std::ref(n)); t1.detach(); //do something std::this_thread::sleep_for(std::chrono::seconds(5)); if(n == 0) g_val.wait(ulocker); std::cout<<n<<std::endl; return 0; } |
这个过程显得繁琐,时间处理不得当,还有可能引发死锁。这个时候就是 future 上场的时候了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
int func(){ std::this_thread::sleep_for(std::chrono::seconds(3)); return 9; } int main(void){ std::future<int> fn = std::async(func); //do something ... std::this_thread::sleep_for(std::chrono::seconds(5)); int n = fn.get(); std::cout<<n<<std::endl; return 0; } |
这个例子中,主线程在开始执行子线程之后立即开始主线程里的工作,而不用关心子线程的执行过程。在某个时间点(主线程执行 5s 之后)需要用到子线程的返回结果,这个时候使用 future.get() 来获取结果。
4.2 std::promise
future 可以实现子线程向主线程传递数据。而 promise 也可以实现从一个线程向另一个线程传递数据。在一个使用 promise 包装过的 future 做参数的线程函数中,可以在一个线程里阻塞,直到另一个线程向其赋值。有没有觉得很浪漫 :-) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#include <iostream> #include <thread> #include <future> int func(std::future<int>& f){ //do something std::this_thread::sleep_for(std::chrono::seconds(3)); int n = f.get(); //阻塞,直到另一个线程给赋值 n = n + 9; return n; } int main(void){ std::promise<int> pr; std::future<int> f = pr.get_future(); std::future<int> fn = std::async( func, std::ref(f)); //do something pr.set_value(8); //赋值 //do something std::this_thread::sleep_for(std::chrono::seconds(5)); int n = fn.get(); std::cout<<n<<std::endl; return 0; } |
这个“承诺”是双向的。可以在主线程里赋值子线程里取值,也可以在子线程里赋值主线程里取值:
1 2 3 4 5 6 7 8 9 10 11 12 |
void fun(std::promise<int>& pr){ pr.set_value(9); } int main(){ std::promise<int> pr; std::thread t1(fun,std::ref(pr)); t1.detach(); std::future<int> fu = pr.get_future(); std::cout<<fu.get()<<std::endl; return 0; } |
4.3 std::package_task
promise 可用于在线程间传递值,而 package_task 则可以在线程间传递可调用对象(如 function, lambda, bind):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
#include <thread> #include <future> void func(std::packaged_task<int(int)>& pt){ pt(5); } int main(){ auto af = [](int n){return n + 9;}; std::packaged_task<int(int)> task(af); std::thread t1(func,std::ref(task)); t1.detach(); std::future<int> fu = task.get_future(); std::cout<<fu.get()<<std::endl; return 0; } |
此例中,通过 task 向线程函数传递一个可调用对象,之后子线程中执行该调用。在主线程中,使用task.get_future().get() 来等待子线程调用执行并返回结果,从来实现异步的可调用对象传递。
4.4 std::async
线程异步操作函数在上几节中已经介绍过了。它的原型如下:
1 |
async(launch __policy, _Fp&& __f, _Args&&... __args) |
launch 有三种定义:
- async, 立即创建线程并执行。
- deferred, 延迟加载的方式创建线程(在调用 future 的 get(),wait() 函数时才创建线程并执行)
- any, any = async | deferred, 是 async 函数的默认参数,立即创建线程并执行。
async 使我们不用过多的关注异步线程的细节,是创建异步操作的首选。