无锁编程实战演练(一)

2014-11-24 12:55:01 · 作者: · 浏览: 5

前段时间研究过一阵子无锁化编程。刚写了几个简单的程序,来验证了下自己学到的一些概念。

测试场景:假设有一个应用:现在有一个全局变量,用来计数,再创建10个线程并发执行,每个线程中循环对这个全局变量进行++操作(i++),循环加2000000次。

所以很容易知道,这必然会涉及到并发互斥操作。下面通过三种方式来实现这种并发操作。并对比出其在效率上的不同之处。

这里先贴上代码,共5个文件:2个用于做时间统计的文件:timer.h timer.cpp。这两个文件是临时封装的,只用来计时,可以不必细看。

timer.h

#ifndef TIMER_H
#define TIMER_H

#include 
  
   
class Timer
{
public:	
	Timer();
	// 开始计时时间
	void Start();
	// 终止计时时间
	void Stop();
	// 重新设定
	void Reset();
	// 耗时时间
	void Cost_time();
private:
	struct timeva l t1;
	struct timeva l t2;
	bool b1,b2;
};
#endif
  


timer.cpp

#include "timer.h"
#include 
  
   

Timer::Timer()
{
	b1 = false;
	b2 = false;
}
void Timer::Start()
{
	gettimeofday(&t1,NULL);  
	b1 = true;
	b2 = false;
}

void Timer::Stop()
{	
	if (b1 == true)
	{
		gettimeofday(&t2,NULL);  
		b2 = true;
	}
}

void Timer::Reset()
{	
	b1 = false;
	b2 = false;
}

void Timer::Cost_time()
{
	if (b1 == false)
	{
		printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");
		return ;
	}
	else if (b2 == false)
	{
		printf("计时出错,应该执行完Stop(),再来执行Cost_time()");
		return ;
	}
	else
	{
		int usec,sec;
		bool borrow = false;
		if (t2.tv_usec > t1.tv_usec)
		{
			usec = t2.tv_usec - t1.tv_usec;
		}
		else
		{
			borrow = true;
			usec = t2.tv_usec+1000000 - t1.tv_usec;
		}

		if (borrow)
		{
			sec = t2.tv_sec-1 - t1.tv_sec;
		}
		else
		{
			sec = t2.tv_sec - t1.tv_sec;
		}
		printf("花费时间:%d秒 %d微秒\n",sec,usec);
	}
}


  


传统互斥量加锁方式 lock.cpp

#include 
  
   
#include 
   
     #include 
    
      #include 
     
       #include "timer.h" pthread_mutex_t mutex_lock; static volatile int count = 0; void *test_func(void *arg) { int i = 0; for(i = 0; i < 2000000; i++) { pthread_mutex_lock(&mutex_lock); count++; pthread_mutex_unlock(&mutex_lock); } return NULL; } int main(int argc, const char *argv[]) { Timer timer; // 为了计时,临时封装的一个类Timer。 timer.Start(); // 计时开始 pthread_mutex_init(&mutex_lock, NULL); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_join(thread_ids[i], NULL); } timer.Stop();// 计时结束 timer.Cost_time();// 打印花费时间 printf("结果:count = %d\n",count); return 0; } 
     
    
   
  


no lock 不加锁的形式 nolock.cpp

#include 
  
   
#include 
   
     #include 
    
      #include 
     
       #include 
      
        #include "timer.h" int mutex = 0; int lock = 0; int unlock = 1; static volatile int count = 0; void *test_func(void *arg) { int i = 0; for(i = 0; i < 2000000; i++) { while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000); count++; __sync_bool_compare_and_swap (&mutex, unlock, 0); } return NULL; } int main(int argc, const char *argv[]) { Timer timer; timer.Start(); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_join(thread_ids[i], NULL); } timer.Stop(); timer.Cost_time(); printf("结果:count = %d\n",count); return 0; } 
      
     
    
   
  


原子函数进行统计方式 atomic.cpp

#include 
  
   
#include 
   
     #include 
    
      #include 
     
       #include 
      
        #include "timer.h" static volatile int count = 0; void *test_func(void *arg) { int i = 0; for(i = 0; i < 2000000; i++) { __sync_fetch_and_add(&count, 1); } return NULL; } int main(int argc, const char *argv[]) { Timer timer; timer.Start(); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){ pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){ pthread_join(thread_ids[i], NULL); } timer.Stop(); timer.Cost_time(); printf("结果:count = %d\n",count); return 0; } 
      
     
    
   
  


#################################################################3

好,代码粘贴完毕。下面进入测试环节:

编译:

[adapter@ZHEJIANG test3]$ g++ lock.cpp ./timer.cpp -lpthread -o lock ;
[adapter@ZHEJIANG test3]$ g++ nolock.cpp ./timer.cpp -lpthread -o nolock ;
[adapter@ZHEJIANG test3]$ g++ atomic.cpp ./timer.cpp -lpthread -o atomic ;

每一个线程循环加2000000次。
第一组测验
[adapter@ZHEJIANG test3]$ ./lock
花费时间:3秒 109807微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:7秒 595784微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花费时间:0秒 381022微秒
结果:count = 20000000

结论:
可以看出,原子操作函数的速度是最快的,其他两种方式根本就没法比。而无锁操作是在原子操作函数的基础上形成的。
为什么无锁操作的效率会这么低 如果效率低的话,那还有什么意义,为什么现在大家都提倡无锁编程呢?为什么?咱先不
解释,先用数据说话。

第二组测验:
原子操作代码不变,加锁操作代码不变。改动一下无锁操作的代码。
将如下代码更改
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));
更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);
让他睡一微秒。
为什么要这样改代码?这样启不是会更慢?你的猜测是不无道理的,但是一个不休息的人干的活未必比有休息的人干的活多。

[adapter@ZHEJIANG test3]$ ./lock
花费时间:2秒 970773微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花费时间:0秒 685404微秒
结果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花费时间:0秒 380675微秒
结果:count = 20000000

结论:
不用明说,大家看到的结果是不是很诧异?是不是!有木有!怎么会是这样。无锁加上usleep(1),睡一会,反而会变得这么快。
虽和原子操作相比次了一点,但已经甩开有锁同步好几条街了,无锁比有锁快是应该的,但为什么睡一会会更快,不睡就比有锁
还慢那么多呢?怎么回事。是不是这个测试的时候cpu