C++11/14 新特性 (多线程)
		
		目录
在 C++11 之前 ,C++ 标准并没有提供统一的并发编程标准,也没有提供语言级别的支持。这导致我们在编写可移植的多线程程序时很不方便,往往需要面向不同的平台进行不同的实现,或者引入一些第三方平台,如Boost,pthread_win32 等。 从C++11开始 ,对并发编程进行了语言级别的支持,使用使用C++进行并发编程方便了很多。这里介绍C++11并发编程的相关特性。
1 线程
1.1 线程的创建
std::thread 的构造函数如下:
| 
					 1 2  | 
						template< class Function, class... Args >  explicit thread( Function&& f, Args&&... args );  | 
					
我们只需要提供线程函数或函数对象,即可以创建线程,并可以同时指定线程函数的参数。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26  | 
						#include <iostream> #include <thread> using namespace std; void func(const string& str){     cout<<str<<endl; } class FuncCls{ public:     void operator()(){         cout<<"Funcls called"<<endl;     } }; int main(void){     thread t1(func,"Hello World");     FuncCls fcls;     thread t2(fcls);     if(t1.joinable()){         t1.join();     }     t2.join();     return 0; }  | 
					
join函数将会阻塞线程,直到线程函数执行完毕,主线程才会接着执行。如果线程函数有返回值,返回值将被忽略。 在使用线程对象的过程中,我们需要注意线程对象的生命周期。如果线程对象先于线程函数结束,那么将会出现不可预料的错误。可以通过线程阻塞的方式来等待线程函数执行完(join),或让线程在后台执行。 如果不希望线程被阻塞,可以调用线程的 detach() 函数,将线程与线程对象分离。但需要注意的是,detach 之后的线程无法再使用join来进行阻塞了,即detach之后的线程,我们无法控制了。当我们不确定一个线程是否可以join时,可以先使用 thead::joinable() 来进行判断。
另外,我们还可以通过 std::bind, lambda 来创建线程(其实就是使用函数对象创建线程)。
线程不可以被复制,但是可以被移动:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17  | 
						#include <iostream> #include <thread> using namespace std; void func(const string& str){     cout<<str<<endl; } int main(void){     thread t1(func,"Hello World");     thread t2(move(t1));     //t1.join();  //ERROR libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: thread::join failed: No such process     t2.join();     return 0; }  | 
					
线程被移动之后,原来的线程对象将不再代表任何线程。
1.1.1 thread 的一般性用法
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13  | 
						void func(int nSec){     std::this_thread::sleep_for(std::chrono::seconds(nSec));     cout<<"time out"<<endl; } int main(void){     thread t1(func,3);  //阻塞休眠3s     t1.join();     cout<<std::thread::hardware_concurrency()<<endl;    //获取cpu核心数     return 0; }  | 
					
2 互斥量和锁
互斥时用来保护被多个线程同时访问的共享数据 c++11提供了4种语义的互斥量:
- std::mutex 独占的互斥量
 - std::timed_mutex 带超时的独占互斥量,不可递归使用
 - std::recursive_mutex 递归互斥量,不带超时功能
 - std::recursive_timed_mutex 带超时的递归互斥量
 
2.1 独占互斥量
一般的用法是通过 lock() 函数来阻塞线程,直到获得互斥量的所有权,然后开始执行任务。在完成任务后,使用 unlock() 来释放互斥量。lock() 和 unlock() 必须成对出现,
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33  | 
						#include <iostream> #include <thread> #include <string> using namespace std; std::mutex g_mu; void func(const string& str,int i){     g_mu.lock();     try{         cout<<i<<"  "<<str<<endl;         //do something     }     catch(...){         g_mu.unlock();     }     g_mu.unlock(); } void sharefunc(const string& str){     for(int i = 0; i<20; i++){         func(str,i);     } } int main(void){     string str1("str_1"),str2("str_2");     thread t1(sharefunc,str1),t2(sharefunc,str2);     t1.join();     t2.join();     return 0; }  | 
					
这里需要注意的是,在使用 lock() 占用互斥量后,必须使用 unlock() 来解除占用,否则互斥量会一直被占用。 我们推荐使用另一种更安全更简单的方法: lock_guard 。因为 lock_guard 在构造时会自动锁定互斥量,而在退出作用域时会自动解锁,从而避免没有更新 unlock 。
那么上面的的例子中的func可以这么写:
| 
					 1 2 3 4  | 
						void func(const string& str,int i){     std::lock_guard<std::mutex> locker(g_mu);     cout<<i<<" "<<str<<endl; }  | 
					
2.2 递归的独占互斥量
std::recursive_mutex,递归的独占互斥量,它允许同一线程多次获得该互斥锁,可以用来解决同一线程需要多次获取互斥量时的死锁问题。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37  | 
						class MyPoint { public:     MyPoint(int x,int y):         m_x(x),m_y(y){} public:     void moveX(int i){         std::lock_guard<std::mutex> locker(m_mu);         m_x += i;     }     void moveY(int i){         std::lock_guard<std::mutex> locker(m_mu);         m_y += i;     }     void move(int x,int y){         std::lock_guard<std::mutex> locker(m_mu);         moveX(x);         moveY(y);     } private:     std::mutex m_mu;     int m_x,m_y; }; void func(){     MyPoint pt(0,0);     pt.move(-1, 1); } int main(void){     std::thread t1(func);     t1.join();     return 0; }  | 
					
上例会发生死锁。因为我们在调用 move() 时已经锁住了互斥量,move() 内调用 moveX() 时又请求互斥量,但这个时候互斥量被当前线程独占,未出作用域无法释放。这样就发生了死锁。要解决这个问题,一个简单的方法是使用std::recursive_mutex,它允许同一线程多次获得互斥量。
2.3 带超时的互斥量
time_mutex 和 recursive_mutex 是带超时的互斥锁。与前面两种互斥量的区别是它们带有超时等待功能。它们多了两个超时获取锁的接口, try_lock_for 和 try_lock_until 。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  | 
						std::timed_mutex g_tmu; void func(){     while (true) {         if(g_tmu.try_lock_for(std::chrono::seconds(3))){             cout<<"get mutex"<<endl;             g_tmu.unlock();         }         else{             break;         }     } } int main(void){     std::thread t1(func);     t1.join();     return 0; }  | 
					
2.4 lock_guard 和 unique_lock
在上面的例子中,我们提到了 lock_guard ,它可以在一个作用域内为互斥量提供锁,当 lock_guard 对象析构的时候,互斥量的锁同时被解除。 unique_lock 相比 lock_gurad 更加灵活,可以减小锁的粒度。
同 lock_guard 一样, unique_lock 在构造时立即加锁。但它也可以在构造时不加锁,而等到合适的时候再加锁,这需要为构造函数 unique_lock() 提供 std::defer_lock 参数,当需要加锁的时候,再执行 unique_lock:: lock() 函数来加锁,并可以在合适的时候调用 unique_lock::unlock() 来解锁。unique_lock 可以在其作用域内重复的加解锁,而 lock_guard 不可以。
2.5 call_once 和 once_flag
std::call_once 可以保证函数在多线程环境中只被调用一次。 once_flag 是 call_once 的一个参数。 例如在日志系统中,需要确保日志只被打开一次,我们可以使用 mutex 并加锁,但这并不是线程安全的:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12  | 
						class LogFile{ public:     LogFile(){         if(!m_file.is_open()){             std::unique_lock<std::mutex> ulocker(m_mu);             m_file.open("log.txt");         }     } private:     std::ofstream m_file;     std::mutex m_mu;  //用于锁定 m_file };  | 
					
这个类并不是线程安全的。当第一个线程得到互斥时 m_mu ,并正在对文件进行 open() 操作时,第二个线程可能已经在等待互斥量了,当第一个线程执行完后,第二个线程立刻获得互斥量进行 open() 操作。那么我们进行如下的改进:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14  | 
						class LogFile{ public:     LogFile(){         {             std::unique_lock<std::mutex> ulocker(m_mu);             if(!m_file.is_open()){                 m_file.open("log.txt");             }         }     } private:     std::ofstream m_file;     std::mutex m_mu;  //用于锁定 m_file };  | 
					
这段代码确实是线程安全的了。但是有另外一个问题,每次构造LogFile的实例的时候,都要执行加锁的代码,这造成了资源的浪费。C++11提供了有效的解决方案:使用 call_once:
| 
					 1 2 3 4 5 6 7 8 9  | 
						class LogFile{ public:     LogFile(){         std::call_once(m_fopenOnce,[&](){m_file.open("log.txt");});     } private:     std::once_flag m_fopenOnce;     std::ofstream m_file; };  | 
					
这段代码, m_file 文件只会被多线程打开一次,而且更高效更简洁更方便。
3 条件变量
条件变量是c++11 提供的另种用于等待的同步机制,它能够阻塞一个或多个线程,直到收到另一个线程发出的通知或者超时才会唤醒当前阻塞的线程。C++11 两种条件就是:
- condition_variable ,配合 unique_lock 进行 wait 操作
 - condition_variable_any ,配合任何带有 lock/unlock 语义的mutex 使用,比较灵活,但效率比 condition_variable 略低。
 
条件变量经常用于不同线程间共享数据的读取。例如一个简单的生产者消息者问题:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54  | 
						#include <iostream> #include <thread> #include <queue> std::queue<int> g_que; //产品池 std::mutex g_mu; const int g_maxSize = 10; //产品池最大容量 // 生产者 void producer(){     std::unique_lock<std::mutex> locker(g_mu,std::defer_lock);     int i = 0;     while (true) {         if(g_maxSize == g_que.size()){ //当产品池满时,暂停生产并等待消费者减少库存             std::cout<<"  full and wait for consumer"<<std::endl;             std::this_thread::sleep_for(std::chrono::milliseconds(200));         }         else{             i++;             std::this_thread::sleep_for(std::chrono::milliseconds(50));             locker.lock();             g_que.push(i);             std::cout<<"  push "<<i<<std::endl;             locker.unlock();         }     } } //消费者 void consumer(){     std::unique_lock<std::mutex> locker(g_mu,std::defer_lock);     while (true) {         if(g_que.empty()){ //当无产品时,暂停消费并等待生产者生产产品             std::cout<<"   empty and wait for producer"<<std::endl;             std::this_thread::sleep_for(std::chrono::milliseconds(200));         }         else{             locker.lock();             std::cout<<g_que.front()<<"  and left size = "<<g_que.size()<<std::endl;             g_que.pop();             locker.unlock();             std::this_thread::sleep_for(std::chrono::milliseconds(200));         }     } } int main(void){     std::thread t1(producer);     std::thread t2(consumer);     t1.join();     t2.join();     return 0; }  | 
					
在这段代码中,生产者和消费者都会去检查产品池的容量:对消费者来说,如果没有产品,则等待生产者向产品池中添加产品;对生产者来说,如果产品池已满,则等待消费者清理产品池空间。在这里,我们让线程 sleep 一段时间来达到这种效果,那么问题来了:等待的时长如何确定?如果等待的时间过长,必然会造成资源的浪费,如果等待的时间过短,则需要更多次的循环才能找到生产或消费的时机。这个时候我们可以使用条件变量。 使用条件变量修改后的代码:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54  | 
						#include <iostream> #include <thread> #include <queue> #include <condition_variable> std::queue<int> g_que; //产品池 std::mutex g_mu; //保护产品池的互斥量 std::condition_variable g_notEmpty; //产品池不为空的条件 std::condition_variable g_notFull;  //产品池不满的条件 const int g_maxSize = 10; //产品池最大容量 // 生产者 void producer(){     int i = 0;     while (true) {         std::unique_lock<std::mutex> locker(g_mu);         if(g_maxSize == g_que.size()){             std::cout<<"  full and wait for consumer"<<std::endl;             g_notFull.wait(locker);  //当产品池满时,阻塞当前线程,暂停生产并等待 notify 提醒         }         i++;         std::this_thread::sleep_for(std::chrono::milliseconds(50));         g_que.push(i);         std::cout<<"  push "<<i<<std::endl;         locker.unlock();         g_notEmpty.notify_one();     } } //消费者 void consumer(){     while (true) {         std::unique_lock<std::mutex> locker(g_mu);         if(g_que.empty()){             std::cout<<"   empty and wait for producer"<<std::endl;             g_notEmpty.wait(locker); //当无产品时,阻塞线程,暂停消费并等待 生产者添加产品后发送 notify 提醒         }         std::cout<<g_que.front()<<"  and left size = "<<g_que.size()<<std::endl;         g_que.pop();         locker.unlock();         g_notFull.notify_one();         std::this_thread::sleep_for(std::chrono::milliseconds(200));     } } int main(void){     std::thread t1(producer);     std::thread t2(consumer);     t1.join();     t2.join();     return 0; }  | 
					
条件变量使用的一般流程是:
- 拥有条件变量的线程获取互斥量
 - 检查条件,如果条件不满足则 wait 阻塞,直到 notify 提醒;如果条件满足,则向下执行
 - 某个线程使(2.)的条件满足后,调用 notify_one() 或 notify_all() 唤醒一个或所有等待的线程。
 
wait 还有一个重载的方法,可以接受一个条件函数f,如果满足条件(f return true),则重新获取mutex,然后结束wait;如果不满足条件(f return false),则释放 mutex,将线程置于 waitting状态,直到收到notify,上面的生产者部分的代码还可以这么写:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22  | 
						// 生产者 void producer(){     int i = 0;     while (true) {         std::unique_lock<std::mutex> locker(g_mu);         g_notFull.wait(locker,[](){             if(g_maxSize == g_que.size()){                 std::cout<<"  full and wait for consumer"<<std::endl;                 return false;             }             else{                 return true;             }         });         i++;         std::this_thread::sleep_for(std::chrono::milliseconds(50));         g_que.push(i);         std::cout<<"  push "<<i<<std::endl;         locker.unlock();         g_notEmpty.notify_one();     } }  | 
					
需要注意,条件 wait 时,会释放 mutex.当条件 wait 结束时,会重新获取 mutex.所以需要在条件wait 之后就获取互斥量,否则可能会引发死锁问题。
4 线程异步操作
4.1 std::future
在前面几节我们知道,通过 thread.join() 不能直接获取线程函数的返回值,需要定义一个变量,通过引用传递给线程函数,再在线程函数中给这个变量赋值。这个过程比较繁琐。 C++11提供了 future 来访问异步操作的结果(因为一个异步操作的结果不能马上获取,而不需要在未来的某个时间来获取,所以叫做future),它是一种获取异步操作结果的通道。
future 有3种状态:
- Defeered. 异步操作还没有开始
 - Ready. 异步操作已经完成
 - Timeout. 异步操作超时
 
我们可以通过 future 的状态来了解线程内部的执行情况。 获取 future 的状态有3种方式:
- get. 等待异步操作结束并返回结果
 - wait. 只是等待异步操作完成,没有返回值
 - wait_for. 超时等待返回结果
 
使用 std::async() 函数可以直接创建异步任务,并将结果保存在 future 中:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | 
						int func(){     std::this_thread::sleep_for(std::chrono::seconds(3));     return  9; } int main(void){     std::future<int> fn = std::async(func);     std::future_status stat;     do{         stat = fn.wait_for(std::chrono::milliseconds(500));         switch (stat) {             case std::future_status::deferred:                 std::cout<<"deferred"<<std::endl;                 break;             case std::future_status::ready:                 std::cout<<"ready"<<std::endl;                 break;             case std::future_status::timeout:                 std::cout<<"timeout"<<std::endl;                 break;             default:                 break;         }     }while(stat != std::future_status::ready );     int n = fn.get();     std::cout<<n<<std::endl;     return 0; }  | 
					
如果我们对异步执行的过程不关心,则直接使用 future.get() 函数来等待异步任务的结果就可以了。
例如,我们需要创建一个子线程让其执行任务,接下来可以在主线程里执行其它的任务。在某个时间上,我们需要用到子线程的返回值。如果此时子线程执行完毕,则可以直接获取结果,如果此时子线程还没有执行完毕,则主线程阻塞直到子线程执行完毕。 这个过程使用 mutex 和 condition 的实现是这样的:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32  | 
						#include <iostream> #include <thread> #include <future> #include <condition_variable> std::mutex g_mu; std::condition_variable g_val; std::unique_lock<std::mutex> ulocker(g_mu,std::defer_lock); void func(int& n){     ulocker.lock();     std::this_thread::sleep_for(std::chrono::seconds(3));     n = 9;     ulocker.unlock();     g_val.notify_one(); } int main(void){     int n = 0;     std::thread t1(func,std::ref(n));     t1.detach();     //do something     std::this_thread::sleep_for(std::chrono::seconds(5));     if(n == 0)         g_val.wait(ulocker);     std::cout<<n<<std::endl;     return 0; }  | 
					
这个过程显得繁琐,时间处理不得当,还有可能引发死锁。这个时候就是 future 上场的时候了:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15  | 
						int func(){     std::this_thread::sleep_for(std::chrono::seconds(3));     return  9; } int main(void){     std::future<int> fn = std::async(func);     //do something ...     std::this_thread::sleep_for(std::chrono::seconds(5));     int n = fn.get();     std::cout<<n<<std::endl;     return 0; }  | 
					
这个例子中,主线程在开始执行子线程之后立即开始主线程里的工作,而不用关心子线程的执行过程。在某个时间点(主线程执行 5s 之后)需要用到子线程的返回结果,这个时候使用 future.get() 来获取结果。
4.2 std::promise
future 可以实现子线程向主线程传递数据。而 promise 也可以实现从一个线程向另一个线程传递数据。在一个使用 promise 包装过的 future 做参数的线程函数中,可以在一个线程里阻塞,直到另一个线程向其赋值。有没有觉得很浪漫 :-) 。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | 
						#include <iostream> #include <thread> #include <future> int func(std::future<int>& f){     //do something     std::this_thread::sleep_for(std::chrono::seconds(3));     int n = f.get(); //阻塞,直到另一个线程给赋值     n = n + 9;     return n; } int main(void){     std::promise<int> pr;     std::future<int> f = pr.get_future();     std::future<int> fn = std::async( func, std::ref(f));     //do something     pr.set_value(8); //赋值     //do something     std::this_thread::sleep_for(std::chrono::seconds(5));     int n = fn.get();     std::cout<<n<<std::endl;     return 0; }  | 
					
这个“承诺”是双向的。可以在主线程里赋值子线程里取值,也可以在子线程里赋值主线程里取值:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12  | 
						void fun(std::promise<int>& pr){     pr.set_value(9); } int main(){     std::promise<int> pr;     std::thread t1(fun,std::ref(pr));     t1.detach();     std::future<int> fu = pr.get_future();     std::cout<<fu.get()<<std::endl;     return 0; }  | 
					
4.3 std::package_task
promise 可用于在线程间传递值,而 package_task 则可以在线程间传递可调用对象(如 function, lambda, bind):
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18  | 
						#include <thread> #include <future> void func(std::packaged_task<int(int)>& pt){     pt(5); } int main(){     auto af = [](int n){return n + 9;};     std::packaged_task<int(int)> task(af);     std::thread t1(func,std::ref(task));     t1.detach();     std::future<int> fu = task.get_future();     std::cout<<fu.get()<<std::endl;     return 0; }  | 
					
此例中,通过 task 向线程函数传递一个可调用对象,之后子线程中执行该调用。在主线程中,使用task.get_future().get() 来等待子线程调用执行并返回结果,从来实现异步的可调用对象传递。
4.4 std::async
线程异步操作函数在上几节中已经介绍过了。它的原型如下:
| 
					 1  | 
						async(launch __policy, _Fp&& __f, _Args&&... __args)  | 
					
launch 有三种定义:
- async, 立即创建线程并执行。
 - deferred, 延迟加载的方式创建线程(在调用 future 的 get(),wait() 函数时才创建线程并执行)
 - any, any = async | deferred, 是 async 函数的默认参数,立即创建线程并执行。
 
async 使我们不用过多的关注异步线程的细节,是创建异步操作的首选。