C++ 互斥锁和条件变量的性能比较
介绍
本文以最简单生产者消费者模型,通过运行程序,观察该进程的cpu使用率,来对比使用互斥锁 和 互斥锁+条件变量的性能比较。
本例子的生产者消费者模型,1个生产者,5个消费者。
生产者线程往队列里放入数据,5个消费者线程从队列取数据,取数据前需要判断一下队列中是否有数据,这个队列是全局队列,是线程间共享的数据,所以需要使用互斥锁进行保护。即生产者在往队列里放入数据时,其余消费者不能取,反之亦然。
互斥锁实现的代码
#include <iostream> // std::cout#include <deque> // std::deque#include <thread> // std::thread#include <chrono> // std::chrono#include <mutex> // std::mutex// 全局队列std::deque<int> g_deque;// 全局锁std::mutex g_mutex;// 生产者运行标记bool producer_is_running = true;// 生产者线程函数void Producer(){ // 库存个数 int count = 8; do { // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁 // 可以手动解锁,从而控制互斥锁的细粒度 std::unique_lock<std::mutex> locker( g_mutex ); // 入队一个数据 g_deque.push_front( count ); // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护 locker.unlock(); std::cout << "生产者 :我现在库存有 :" << count << std::endl; // 放慢生产者生产速度,睡1秒 std::this_thread::sleep_for( std::chrono::seconds( 1 ) ); // 库存自减少 count--; } while( count > 0 ); // 标记生产者打样了 producer_is_running = false; std::cout << "生产者 : 我的库存没有了,我要打样了!" << std::endl;}// 消费者线程函数void Consumer(int id){ int data = 0; do { std::unique_lock<std::mutex> locker( g_mutex ); if( !g_deque.empty() ) { data = g_deque.back(); g_deque.pop_back(); locker.unlock(); std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl; } else { locker.unlock(); } } while( producer_is_running ); std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!" << std::endl;}int main(void){ std::cout << "1 producer start ..." << std::endl; std::thread producer( Producer ); std::cout << "5 consumer start ..." << std::endl; std::thread consumer[ 5 ]; for(int i = 0; i < 5; i++) { consumer[i] = std::thread(Consumer, i + 1); } producer.join(); for(int i = 0; i < 5; i++) { consumer[i].join(); } std::cout << "All threads joined." << std::endl; return 0;}
互斥锁实现运行结果:
结果输出
[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o main[root@lincoding condition]# ./main1 producer start ...5 consumer start ...生产者 :我现在库存有 :8消费者[1] : 我抢到货的编号是 :8消费者[1] : 我抢到货的编号是 :7生产者 :我现在库存有 :7生产者 :我现在库存有 :6消费者[3] : 我抢到货的编号是 :6生产者 :我现在库存有 :5消费者[1] : 我抢到货的编号是 :5生产者 :我现在库存有 :4消费者[2] : 我抢到货的编号是 :4生产者 :我现在库存有 :3消费者[5] : 我抢到货的编号是 :3生产者 :我现在库存有 :2消费者[2] : 我抢到货的编号是 :2生产者 :我现在库存有 :1消费者[1] : 我抢到货的编号是 :1生产者 : 我的库存没有了,我要打样了!消费者[5] :卖家没有货打样了,真可惜,下次再来抢!消费者[2] :卖家没有货打样了,真可惜,下次再来抢!消费者[3] :卖家没有货打样了,真可惜,下次再来抢!消费者[4] :卖家没有货打样了,真可惜,下次再来抢!消费者[1] :卖家没有货打样了,真可惜,下次再来抢!All threads joined.
可以看到,互斥锁其实可以完成这个任务,但是却存在着性能问题。
Producer
是生产者线程,在生产者数据过程中,会休息1秒
,所以这个生产过程是很慢的;
Consumer
是消费者线程,存在着一个while
循环,只有判断到生产者不运行了,才会退出while
循环,那么每次在循环体内,都是会先加锁,判断队列不空,然后从列队取出一个数据,最后解锁。所以说,在生产者休息1秒
的时候,消费者线程实际上会做很多无用功,导致CPU使用率非常高!
运行的环境是4核cpu
[root@lincoding ~]# grep 'model name' /proc/cpuinfo | wc -l4
top命令查看cpu使用情况,可见使用纯互斥锁cpu的开销是很大的,main
进程的cpu使用率达到了357.5%CPU
,系统开销的cpu为54.5%sy
,用户开销的cpu为18.2%us
[root@lincoding ~]# toptop - 19:13:41 up 36 min, 3 users, load average: 0.06, 0.05, 0.01Tasks: 179 total, 1 running, 178 sleeping, 0 stopped, 0 zombieCpu(s): 18.2%us, 54.5%sy, 0.0%ni, 27.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%stMem: 1004412k total, 313492k used, 690920k free, 41424k buffersSwap: 2031608k total, 0k used, 2031608k free, 79968k cached PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 35346 root 20 0 137m 3288 1024 S 357.5 0.3 0:05.92 main 1 root 20 0 19232 1492 1224 S 0.0 0.1 0:02.16 init 2 root 20 0 0 0 0 S 0.0 0.0 0:00.01 kthreadd 3 root RT 0 0 0 0 S 0.0 0.0 0:00.68 migration/0
解决的办法之一就是给消费者也加一个小延时,当消费者没取到数据时,就休息一下500毫秒
,这样可以减少互斥锁给cpu带来的开销。
// 消费者线程函数void Consumer(int id){ int data = 0; do { std::unique_lock<std::mutex> locker( g_mutex ); if( !g_deque.empty() ) { data = g_deque.back(); g_deque.pop_back(); locker.unlock(); std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl; } else { locker.unlock(); // 当消费者没取到数据时,就休息一下500毫秒 std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) ); } } while( producer_is_running ); std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!" << std::endl;}
从运行结果可知,cpu使用率大大降低了
[root@lincoding ~]# ps aux | grep -v grep |grep mainUSER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMANDroot 61296 0.0 0.1 141068 1244 pts/1 Sl+ 19:40 0:00 ./main
条件变量+互斥锁实现的代码
那么问题来了,如何确定消费者延时(休息)多久呢?
如果生产者生产的非常快,消费者却延时了
500毫秒
,也不是很好如果生产者生产的更慢,那么消费延时
500毫秒
,也会有无用功,占用了CPU
这就需要引入条件变量std::condition_variable
,应用于消费者生产模型中,就是生产者生产完一个数据后,通过notify_one()
唤醒正在wait()
消费者线程,使得消费者从队列取出一个数据。
#include <iostream> // std::cout#include <deque> // std::deque#include <thread> // std::thread#include <chrono> // std::chrono#include <mutex> // std::mutex#include <condition_variable> // std::condition_variable// 全局队列std::deque<int> g_deque;// 全局锁std::mutex g_mutex;// 全局条件变量std::condition_variable g_cond;// 生产者运行标记bool producer_is_running = true;// 生产者线程函数void Producer(){ // 库存个数 int count = 8; do { // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁 // 可以手动解锁,从而控制互斥锁的细粒度 std::unique_lock<std::mutex> locker( g_mutex ); // 入队一个数据 g_deque.push_front( count ); // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护 locker.unlock(); std::cout << "生产者 :我现在库存有 :" << count << std::endl; // 唤醒一个线程 g_cond.notify_one(); // 睡1秒 std::this_thread::sleep_for( std::chrono::seconds( 1 ) ); // 库存自减少 count--; } while( count > 0 ); // 标记生产者打样了 producer_is_running = false; // 唤醒所有消费线程 g_cond.notify_all(); std::cout << "生产者 : 我的库存没有了,我要打样了!" << std::endl;}// 消费者线程函数void Consumer(int id){ // 购买的货品编号 int data = 0; do { // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁 // 可以手动解锁,从而控制互斥锁的细粒度 std::unique_lock<std::mutex> locker( g_mutex ); // wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作 // 必须使用unique_lock,不能使用lock_guard,因为lock_guard没有lock和unlock接口,而unique_lock则都提供了 g_cond.wait(locker); // 队列不为空 if( !g_deque.empty() ) { // 取出队列里最后一个数据 data = g_deque.back(); // 删除队列里最后一个数据 g_deque.pop_back(); // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护 locker.unlock(); std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl; } // 队列为空 else { locker.unlock(); } } while( producer_is_running ); std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!" << std::endl;}int main(void){ std::cout << "1 producer start ..." << std::endl; std::thread producer( Producer ); std::cout << "5 consumer start ..." << std::endl; std::thread consumer[ 5 ]; for(int i = 0; i < 5; i++) { consumer[i] = std::thread(Consumer, i + 1); } producer.join(); for(int i = 0; i < 5; i++) { consumer[i].join(); } std::cout << "All threads joined." << std::endl; return 0;}
条件变量+互斥锁运行结果
[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o main[root@lincoding condition]# [root@lincoding condition]# ./main 1 producer start ...5 consumer start ...生产者 :我现在库存有 :8消费者[4] : 我抢到货的编号是 :8生产者 :我现在库存有 :7消费者[2] : 我抢到货的编号是 :7生产者 :我现在库存有 :6消费者[3] : 我抢到货的编号是 :6生产者 :我现在库存有 :5消费者[5] : 我抢到货的编号是 :5生产者 :我现在库存有 :4消费者[1] : 我抢到货的编号是 :4生产者 :我现在库存有 :3消费者[4] : 我抢到货的编号是 :3生产者 :我现在库存有 :2消费者[2] : 我抢到货的编号是 :2生产者 :我现在库存有 :1消费者[3] : 我抢到货的编号是 :1生产者 : 我的库存没有了,我要打样了!消费者[5] :卖家没有货打样了,真可惜,下次再来抢!消费者[1] :卖家没有货打样了,真可惜,下次再来抢!消费者[4] :卖家没有货打样了,真可惜,下次再来抢!消费者[2] :卖家没有货打样了,真可惜,下次再来抢!消费者[3] :卖家没有货打样了,真可惜,下次再来抢!All threads joined.
CPU开销非常的小
[root@lincoding ~]# ps aux | grep -v grep |grep mainUSER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMANDroot 73838 0.0 0.1 141068 1256 pts/1 Sl+ 19:54 0:00 ./main
总结
在不确定生产者的生产速度是快还是慢的场景里,不能只使用互斥锁保护共享的数据,这样会对CPU的性能开销非常大,可以使用互斥锁+条件变量的方式,当生产者线程生产了一个数据,就唤醒消费者线程进行消费,避免一些无用功的性能开销。
版权声明: 本文为 InfoQ 作者【小林coding】的原创文章。
原文链接:【http://xie.infoq.cn/article/1666f0ff713bc9eac4edc520e】。文章转载请联系作者。
小林coding
别问,问就是图解 2018.11.12 加入
公众号:小林coding,爱图解的小林
评论