<返回更多

C++实现线程池

2021-05-14  微信公众号  小懵白
加入收藏

在上一篇文章C++使用socket实现与微信小程序通信(下)中,小懵白就给大家简要地讲解了线程池的原理。

C++实现线程池

 

今天呢,小懵白就给大家继续讲解C++如何实现封装线程池类。

第一步

首先,我们需要定义生产者、消费者的存储容器类型。

在这里呢,消费者的容器是通过使用SLT中的vector容器来实现的:std::vector<std::thread> pool;

而任务生产者容器则是由SLT的queue容器来实现:std::queue<Task>task ;

C++实现线程池

 

定义完生产者、消费者的容器类型后,接着就是要构建消费者容器:往pool里添加一定数量N的阻塞线程。

C++实现线程池

 

简要代码如下:

bool Init_ThreadPool(){
  for(int i=0;i<max_thread;i++)  //max_thread为初始化线程最大化量     
        pool.push_back(std::thread(&ThreadPool::runtask,this));
    return true;
}

在初始化中,每一个线程都绑定了runtask函数,也就是线程要执行的函数,该函数是用来实现获取task的。

 

简要代码如下:

void runtask(){
    
  while(!stop){
      
      std::unique_lock<std::mutex>lk(_mutex);
    
      //当任务队列为空或者stop为假时,阻塞当前线程,直到条件变量唤醒 
      condition.wait(lk,[this]{return (!task.empty()||stop);});  
        
      /**********************************************
       std::move是将对象的状态或者所有权从一个对象转移到另一个对象,
        只是转移,没有内存的搬迁或者内存拷贝。
      ***********************************************/ 
       Task ta=move(task.front());// 取一个 task
       task.pop();  //从队列移除正在执行的任务
       lk.unlock();   
       ta();  //执行task任务 
       condition.notify_one(); //通知等待一个线程
  }
  //此处补充销毁线程代码
}

在该代码中,bool型的stop变量是用来判定线程是否终止的,每当一次while循环后进行判断stop,若为假则继续阻塞线程然后获取任务,否则的话结束线程。

消费者实现后,接下来就是实现生产者了。在此之前,还需要做一件事情:同一规划任务变量。

using Task = std::function<void()>;

这是由于每一个task函数的返回类型都不可能是统一的,有的是void类型的,有的是bool类型的,更还有的是指针类型的。

为了能够实现代码的高效性,这里采用了std::function<void()>思想,定义了Task类型。

接着就是实现生产者了:

bool Add_task(const Task&t){
  
  if(task.size()==max_queue)Warn_LOG("添加任务,任务队列已满,准备阻塞");
  if(thread_run==max_thread)Warn_LOG("添加任务,但线程已经分配完");
  
  std::unique_lock<std::mutex>lk(_mutex);
            
    while(task.size()==max_queue||stop){ //要是任务数量到了最大,就等待处理完再添加
            condition.wait(lk);  
        }
    if(stop)return false;
      
    task.push(t);//给队列中添加任务
    task_numble=task.size();
        
    std::cout<<"添加成功。"<<std::endl<<std::endl;
    condition.notify_one(); //通知等待一个线程 
    return true;
}

为此,到这里的时候,生产者和消费者的线程池初步构建完成了

 

第二步:解说

在第一步中,我们通过使用STL中的vector容器存储了已初始化的线程后,那么接下来是如何实现生产者-消费者模式呢。

在生产者pool容器中,每一个线程都是绑定了runtask函数,也就是说每一个线程都是在执行runtask函数。

C++实现线程池

 

而在runtask函数代码块里,首先是判断!stop条件是否满足,若不满足,则结束线程。

否则的话,添加互斥锁,若任务队列为空或者stop为假时,使用条件锁std::condition_variable阻塞当前线程。

直到条件变量唤醒后,才从任务队列中获取一个task,解锁互斥锁并执行task,直至完成并通知等待一个线程,然后重新循环判断。

第三步:封装

实现完上面的功能后,接下来就是对功能进行封装了。代码如下:

class ThreadPool{
  using Task = std::function<void()>;


  public:    
      ThreadPool();
      ~ThreadPool();
      
      bool Init_ThreadPool();  //初始化线程       
      bool Add_task(const Task&t);  //添加任务 
      void End_threadpool();  //销毁线程            
      
      int thread_num;  //当前的线程数量
      int task_numble;   //任务队列 
    
  protected:
     void runtask(); 
     void Expand_thread();
     bool Destroy_thread();
     int id;
     
    private:
      
      int max_thread;//初始化线程数量 
      int max_queue;//初始化 任务队列数量 
    
      std::vector<std::thread> pool;// 线程池、任务队列 
      std::queue<Task>task ;
    
      std::mutex _mutex ;//互斥锁条件变量
      std::condition_variable condition; 
      
      bool stop;  //停止标志位 
    
      int key;  //目前正在执行任务的线程数量 
};

其中,~ThreadPool()的代码实现如下:

ThreadPool::~ThreadPool(){
  End_threadpool();
}
void ThreadPool::End_threadpool(){
  
  stop=true;  //停止读取任务 
  
  while(thread_run){}//等待线程还没有执行完的任务
  
  for(int i =0 ;i<pool.size();i++)  //销毁线程 
        pool[i].join() ;
   
  std::queue<Task> empty;
  swap(empty,task);//清空任务队列 
  pool.clear(); //清空线程池 
  
}

至此,线程池类封装就实现了,其余功能函数在这就不多讲了,有兴趣的同学可自行完成。

好了,今天关于C++实现线程池的话题,就讲到这里了,关于源代码的问题,我整理好后会放在公众号(小懵白生活小趣谈)后台里面。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>