设为首页 加入收藏

TOP

c++11实现一个半同步半异步线程池(一)
2016-08-19 17:03:36 】 浏览:505
Tags:实现 一个 同步 异步 线程

在处理大量并发任务的时候,如果按照传统的方式,一个请求一个线程来处理请求任务,大量的线程创建和销毁将消耗过多的系统资源,还增加了线程上下文切换的开销,而通过线程池技术就可以很好的解决这些问题,线程池技术通过在系统中预先创建一定数量的线程,当任务请求到来时从线程池中分配一个预先创建的线程去处理任务,线程在完成任务之后还可以重用,不会销毁,而是等待下次任务的到来.

分层

半同步半异步线程池分为三层:

同步服务层: 它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理的,而是将这些任务放到一个同步排队层中,等待处理.

同步排队层: 来自上层的任务请求都会加到排队层中等待处理.

异步服务层: 这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理.

这里写图片描述

线程池实现

#include 
  
   
#include 
   
     #include 
    
      #include 
     
       #include 
      
        #include 
       
         #include 
        
          #include 
         
           using namespace std; /********************************同步队列******************************/ template 
          
            class SyncQueue { public: SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) { } //添加事件 void Put(const T& x) { Add(x); } //添加事件 void Put(T && x) { //调用内部接口,进行完美转发 Add(std::forward
           
            (x)); } //从队列中取事件,取所有事件 void Take(std::list
            
              &list) { std::unique_lock
             
               locker(m_mutex); //当不满足任何一个则等待,但是若m_needStop为true是因为任务要终止了所以不阻塞 m_notEmpty.wait(locker, [this]{return (m_needStop || NotEmpty()); }); if (m_needStop) { return; } list = std::move(m_queue); m_notFull.notify_one(); } //取一个事件 void Take(T &t) { std::unique_lock
              
                locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); if (m_needStop) { return; } t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } //终止同步队列 void Stop() { { //锁作用域就在这对大括号内 std::lock_guard
               
                 locker(m_mutex); //将终止标志设为true m_needStop = true; } //唤醒所有进程一一终止 m_notFull.notify_all(); m_notEmpty.notify_all(); } //队列为空 bool Empty() { std::lock_guard
                
                  locker(m_mutex); return m_queue.empty(); } //队列为满 bool Full() { std::lock_guard
                 
                   locker(m_mutex); return m_queue.size() == m_maxSize; } //队列大小 size_t Size() { std::lock_guard
                  
                    locker(m_mutex); return m_queue.size(); } //队列大小 int Count() { return m_queue.size(); } private: //队列不为满 bool NotFull() const { bool full = (m_queue.size() >= m_maxSize); if (full) { cout << "the queue is full, need wait..." << endl; } return !full; } //队列不为空 bool NotEmpty() const { bool empty = m_queue.empty(); if (empty) { cout << "the queue is empty, need wait..., 异步层的线程ID: " << this_thread::get_id() << endl; } return !empty; } //向队列中添加事件,若不为满且终止标志为false则添加事件 template 
                   
                     void Add(F && x) { std::unique_lock
                    
                      locker(m_mutex); //当不满足任何一个则等待,但是若m_needStop为true是因为任务要终止了所以不阻塞 m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); if (m_needStop) { return; } m_queue.push_back(std::forward
                     
                      (x)); m_notEmpty.notify_one(); } private: //缓冲区 std::list
                      
                        m_queue; //互斥量 std::mutex m_mutex; //队列不为空的条件变量 std::condition_variable m_notEmpty; //队列不为满的条件变量 std::condition_variable m_notFull; //任务队列最大长度 int m_maxSize; //终止的标识,当为true时代表同步队列要终止 bool m_needStop; }; /**************************线程池********************************/ //传递给同步队列的最大个数 const int MaxTaskCount = 100; class ThreadPool { public: using Task = std::function
                       
                        ; //构造函数,默认参数hardware_concurrency()获取CPU核心数量 ThreadPool(int numThreads = std::thread::hardware_concurrency()):m_queue(MaxTaskCount) { cout << "numThreads: " << numThreads << endl; Start(numThreads); } ~ThreadPool() { Stop(); } //保证多线程环境下只调用一次StopThreadGroup函数 void Stop() { std::call_once(m_flag, [this]{ StopThreadGroup(); }); } //添加任务,右值完美转发 void AddTask(Task && task) { m_queue.Put(std::forward
                        
                          (task)); } //添加任务 void AddTask(const Task && task) { m_queue.Put(task); } private: //建立numThreads个数的线程组 void Start(int numThreads) { m_running = true; for (int i = 0; i < numThreads; i++) { //多个线程依次的处理 m_threadgroup.push_back(std::make_shared
                         
                          (&ThreadPool::RunInThread, this)); } } //取出任务队列中的全部,依次执行 void RunInThread() { while (m_running) { std::list
                          
                            list; m_queue.Take(list); for (auto & task : list) { if (!m_running) { return ; } //执行任务 task(); } } } //终止所有任务的执行 void StopThreadGroup() { //终止同步队列 m_queue.Stop(); m_running = false; for (auto thread : m_threadgroup) { if (thread) { thread->join(); } } m_threadgroup.clear(); } private: //处理任务的线程组 std::list
                           
                            > m_threadgroup; //同步队列 SyncQueue
                            
                              m_queue; //运行的标志,flase代表终止 atomic_bool m_running; //
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇图的深度优先搜索和广度优先搜索.. 下一篇C++泛型编程

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目