Linux網路通訊(執行緒池和執行緒池版本的伺服器程式碼)

執行緒池

介紹

執行緒池: 一種執行緒使用模式。執行緒過多會帶來調度開銷,進而影響快取局部性和整體性能。而執行緒池維護著多個執行緒,等待著監督管理者分配可並發執行的任務。這避免了在處理短時間任務時創建與銷毀執行緒的代價。執行緒池不僅能夠保證內核的充分利用,還能防止過分調度。可用執行緒數量應該取決於可用的並發處理器、處理器內核、記憶體、網路sockets等的數量

執行緒池的價值:

  1. 需要大量的執行緒來完成任務,且完成任務的時間比較短。可同時處理多任務,多請求。
  2. 有任務可以立即從執行緒池中調取執行緒取處理,節省了執行緒創建的時間
  3. 有效防止服務端執行緒過多而導致系統過載的問題

實現

執行緒池中首先需要有很多個執行緒,用戶可以自己選擇創建多少個執行緒。為了實現執行緒間的同步與互斥,還需要增加兩個變數——互斥量和條件變數。我們還需要一個任務隊列,主執行緒不斷往裡面塞任務,執行緒池的執行緒不斷去處理。需要注意的是:這裡的任務隊列可以為空,但不能滿,所以任務隊列的容量不限定(實際場景中,任務隊列容量不夠就需要考慮換一台更大的伺服器)

執行緒池的四個成員變數:

  • 一個隊列: 存放任務
  • 執行緒池中執行緒數: 記錄執行緒池中創建的執行緒數
  • 互斥量: 一個互斥鎖
  • 條件變數: 兩個條件變數

執行緒池:首先需要創建幾個執行緒,還有一個任務隊列,當任務隊列有任務的時候就喚醒一個正在等待的執行緒,讓執行緒去執行任務,執行緒池中的執行緒執行完任務不會銷毀,大大減少的cpu的消耗。

需要兩個條件變數和一個互斥鎖,這個互斥鎖用來鎖住任務隊列,因為任務隊列是公共資源,其次還需要兩個條件變數,一個條件變數用來阻塞取任務的執行緒,當隊列中有任務的時候,直接取任務,然後解鎖,當任務隊列中沒有任務的時候,解鎖等待條件,條件滿足搶鎖,取任務,解鎖。另一個條件變數用來阻塞添加者進程,當任務隊列滿了,會讓添加者進程等待,當有執行緒取走一個任務的時候,會喚醒添加者進程。

版本一

任務函數

這裡的任務函數採用的時回調函數的方式,提高了程式碼的通用性,可以根據自己的需求改寫任務函數

//任務回調函數
void taskRun(void *arg)
{
      PoolTask *task = (PoolTask*)arg;
      int num = task->tasknum;
      printf("task %d is runing %lu\n",num,pthread_self());
  
      sleep(1);
      printf("task %d is done %lu\n",num,pthread_self());
}

執行緒池的主要程式碼框架

class ThreadPool
{
public:
    //構造函數,初始化執行緒池
    ThreadPool(int thrnum, int maxtasknum)
    {
    }
   static void* thrRun(void* arg)
    {     
    }
   //析構函數,摧毀執行緒池
   ~ThreadPool()
   {
   }
public:
   //添加任務到執行緒池
   void addtask(){};
private:
   //任務隊列相關的參數
   int max_job_num;//最大任務個數
   int job_num;//實際任務個數
   PoolTask *tasks;//任務隊列數組
   int job_push;//入隊位置
   int job_pop;// 出隊位置
 
   //執行緒相關參數
   int thr_num;//執行緒池內執行緒個數
   pthread_t *threads;//執行緒池內執行緒數組
   int shutdown;//是否關閉執行緒池
   pthread_mutex_t pool_lock;//執行緒池的鎖
   pthread_cond_t empty_task;//任務隊列為空的條件
   pthread_cond_t not_empty_task;//任務隊列不為空的條件
};

放任務: 主執行緒無腦往任務隊列中塞任務,塞任務之前進行加鎖,塞完任務解鎖,如果任務隊列已經滿了,等待執行緒取任務,然後喚醒在條件變數下等待的隊列;放入了任務就給執行緒發送訊號,喚醒執行緒來取
取任務: 執行緒池中的執行緒從任務隊列中取任務,需要對任務隊列上鎖,因為對公共資源的操作都需要上鎖,如果沒有任務就阻塞,等待放任務喚醒;如果取完了一個任務,就喚醒添加任務

這就是兩個條件變數和一個互斥鎖的用法

//添加任務到執行緒池
   void addtask()
   {
     pthread_mutex_lock(&(this->pool_lock));
     //實際任務總數大於最大任務個數則阻塞等待(等待任務被處理)
     while(this->max_job_num <= this->job_num)
     {
          pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
     }
 
     int taskpos = (this->job_push++)%this->max_job_num;

     this->tasks[taskpos].tasknum = beginnum++;
     this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
     this->tasks[taskpos].task_func = taskRun;
     this->job_num++;

     pthread_mutex_unlock(&(this->pool_lock));
 
     pthread_cond_signal(&(this->not_empty_task));//通知包身工
   }
  //取任務
  static void* thrRun(void* arg)
    {
      ThreadPool *pool = (ThreadPool*)arg;
      int taskpos = 0;//任務位置
      PoolTask *task = new PoolTask();
      while(1)
      {
          //獲取任務,先要嘗試加鎖
          pthread_mutex_lock(&pool->pool_lock);
 
          //無任務並且執行緒池不是要摧毀
          while(pool->job_num <= 0 && !pool->shutdown )
          {
              //如果沒有任務,執行緒會阻塞
              pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
          }
          if(pool->job_num)
          {
              //有任務需要處理
              taskpos = (pool->job_pop++)%pool->max_job_num;
              //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
              //為什麼要拷貝?避免任務被修改,生產者會添加任務
              memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
              task->arg = task;
              pool->job_num--;
              //task = &thrPool->tasks[taskpos];
              pthread_cond_signal(&pool->empty_task);//通知生產者
          }
  
          if(pool->shutdown)
          {
              //代表要摧毀執行緒池,此時執行緒退出即可
              //pthread_detach(pthread_self());//臨死前分家
              pthread_mutex_unlock(&pool->pool_lock);
              delete(task);
              pthread_exit(NULL);
          }
       //釋放鎖
       pthread_mutex_unlock(&pool->pool_lock);
       task->task_func(task->arg);//執行回調函數
       } 
     }

整體程式碼:

#include<iostream>
#include<string.h>
#include<pthread.h>
#include<sys/types.h>
#include<stdio.h>
#include<unistd.h>
using namespace std;
int beginnum = 1;

class PoolTask
{
public:
    int tasknum;//模擬任務編號
    void *arg;//回調函數參數
    void (*task_func)(void *arg);//任務的回調函數
};
//任務回調函數
void taskRun(void *arg)
{
      PoolTask *task = (PoolTask*)arg;
      int num = task->tasknum;
      printf("task %d is runing %lu\n",num,pthread_self());
  
      sleep(1);
      printf("task %d is done %lu\n",num,pthread_self());
}
class ThreadPool
{
public:
    //構造函數,初始化執行緒池
    ThreadPool(int thrnum, int maxtasknum)
    {
      this->thr_num = thrnum;
      this->max_job_num = maxtasknum;
      this->shutdown = 0;//是否摧毀執行緒池,1代表摧毀
      this->job_push = 0;//任務隊列添加的位置
      this->job_pop = 0;//任務隊列出隊的位置
      this->job_num = 0;//初始化的任務個數為0
  
      //申請最大的任務隊列
      this->tasks = new PoolTask[thrnum];
 
      //初始化鎖和條件變數
      pthread_mutex_init(&(this->pool_lock),NULL);
      pthread_cond_init(&(this->empty_task),NULL);
      pthread_cond_init(&(this->not_empty_task),NULL);
  
      int i = 0;
      this->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申請n個執行緒id的空間
  
      pthread_attr_t attr;
      pthread_attr_init(&attr);
      pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
      for(i = 0;i < thrnum;i++)
      {
          pthread_create(&(this->threads[i]),&attr,thrRun,this);//創建多個執行緒
      }
    }
   static void* thrRun(void* arg)
    {
      ThreadPool *pool = (ThreadPool*)arg;
      int taskpos = 0;//任務位置
      PoolTask *task = new PoolTask();
      while(1)
      {
          //獲取任務,先要嘗試加鎖
          pthread_mutex_lock(&pool->pool_lock);
 
          //無任務並且執行緒池不是要摧毀
          while(pool->job_num <= 0 && !pool->shutdown )
          {
              //如果沒有任務,執行緒會阻塞
              pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
          }
          if(pool->job_num)
          {
              //有任務需要處理
              taskpos = (pool->job_pop++)%pool->max_job_num;
              //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
              //為什麼要拷貝?避免任務被修改,生產者會添加任務
              memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
              task->arg = task;
              pool->job_num--;
              //task = &thrPool->tasks[taskpos];
              pthread_cond_signal(&pool->empty_task);//通知生產者
          }
  
          if(pool->shutdown)
          {
              //代表要摧毀執行緒池,此時執行緒退出即可
              //pthread_detach(pthread_self());//臨死前分家
              pthread_mutex_unlock(&pool->pool_lock);
              delete(task);
              pthread_exit(NULL);
          }
       //釋放鎖
       pthread_mutex_unlock(&pool->pool_lock);
       task->task_func(task->arg);//執行回調函數
       }
     }
   //析構函數,摧毀執行緒池
   ~ThreadPool()
   {
     this->shutdown = 1;//關閉執行緒池
     pthread_cond_broadcast(&(this->not_empty_task));//誘殺
 
     int i = 0;
     for(i = 0; i<this->thr_num ; i++)
     {
         pthread_join(this->threads[i],NULL);
     }
 
     pthread_cond_destroy(&(this->not_empty_task));
     pthread_cond_destroy(&(this->empty_task));
     pthread_mutex_destroy(&(this->pool_lock));
     delete []tasks;
     tasks = NULL;
     free(this->threads);
   }
 
public:
   //添加任務到執行緒池
   void addtask()
   {
     pthread_mutex_lock(&(this->pool_lock));
     cout << "當前任務隊列中任務的個數是: " <<this-> job_num <<endl; 
     //實際任務總數大於最大任務個數則阻塞等待(等待任務被處理)
     while(this->max_job_num <= this->job_num)
     {
          pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
     }
 
     int taskpos = (this->job_push++)%this->max_job_num;

     this->tasks[taskpos].tasknum = beginnum++;
     this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
     this->tasks[taskpos].task_func = taskRun;
     this->job_num++;

     pthread_mutex_unlock(&(this->pool_lock));
 
     pthread_cond_signal(&(this->not_empty_task));//通知包身工
   }
private:
   //任務隊列相關的參數
   int max_job_num;//最大任務個數
   int job_num;//實際任務個數
   PoolTask *tasks;//任務隊列數組
   int job_push;//入隊位置
   int job_pop;// 出隊位置
 
   //執行緒相關參數
   int thr_num;//執行緒池內執行緒個數
   pthread_t *threads;//執行緒池內執行緒數組
   int shutdown;//是否關閉執行緒池
   pthread_mutex_t pool_lock;//執行緒池的鎖
   pthread_cond_t empty_task;//任務隊列為空的條件
   pthread_cond_t not_empty_task;//任務隊列不為空的條件
};
int main()
{
   ThreadPool *m = new ThreadPool(3,20);
   int j = 0;
   for(j=0;j<20;j++)
   {
     m->addtask();
   }
   sleep(20);
   delete m;
   m = NULL;
   system("pause");
   return EXIT_SUCCESS;
}

運行結果如下:

可以看到執行緒最多處理三個任務,而任務隊列中最多可以存在20個任務,當執行緒取走了任務之後,喚醒生產者繼續添加任務。

版本二

首先封裝一個任務:

class Task
{
public:
  Task(int a = 0, int b = 0)
    :_a(a)
    ,_b(b)
  {}
  void Run()
  {
    //執行的任務可以自己編寫
  }
private:
  int _a;
  int _b;
};

執行緒池的主要程式碼框架(喚醒和等待操作都已經封裝好):

#define DEFAULT_MAX_PTHREAD 5

class ThreadPool
{
public:
  ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
    :_max_thread(max_pthread)
  {}
  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
  }
public:
  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }
  void UnlockQueue()
  {
    pthread_mutex_unlock(&_mutex);
  }
  void ThreadWait()
  {
    pthread_cond_wait(&_cond, &_mutex);
  }
  void WakeUpThread()
  {
    pthread_cond_signal(&_cond);
    //pthread_cond_broadcast(&_cond);
  }
  bool IsEmpty()
  {
    return _q.empty();
  } 
private:
  queue<Task*>  _q;
  int  _max_thread;
  pthread_mutex_t _mutex;
  pthread_cond_t  _cond;
};

創建多個執行緒
創建多個執行緒可以用一個循環進行創建。需要注意的是,創建一個執行緒還需要提供一個執行緒啟動後要執行的函數,這個啟動函數只能有一個參數。如果把這個函數設置為成員函數,那麼這個函數的第一個參數默認是this指針,這樣顯然是不可行的,所以這裡我們考慮把這個啟動函數設置為靜態的。但是設置為靜態的成員函數又會面臨一個問題:如何調用其他成員函數和成員變數? 所以這裡我們考慮創建執行緒的時候,把this指針傳過去,讓啟動函數的arg 參數去接收即可

static void* Runtine(void* arg)
{
  pthread_detach(pthread_self());
  ThreadPool* this_p = (ThreadPool*)arg;

  while (1){
    this_p->LockQueue();
    while (this_p->IsEmpty()){
      this_p->ThreadWait();
    }
    Task* t;
    this_p->Get(t);
    this_p->UnlockQueue();
    // 解鎖後處理任務
    t->Run();
    delete t;
  }
}
void ThreadPoolInit()
{
  pthread_mutex_init(&_mutex, nullptr);
  pthread_cond_init(&_cond, nullptr);
  pthread_t t[_max_thread];
  for(int i = 0; i < _max_thread; ++i)
  {
    pthread_create(t + i, nullptr, Runtine, this);
  }
}

注意: 執行緒創建後,執行啟動函數,在這個函數中,執行緒會去任務隊列中取任務並處理,取任務前需要進行加鎖的操作(如果隊列為空需要掛起等待),取完任務然後進行解鎖,然後處理任務,讓其它執行緒去任務隊列中取任務

放任務: 主執行緒無腦往任務隊列中塞任務,塞任務之前進行加鎖,塞完任務解鎖,然後喚醒在條件變數下等待的隊列
取任務: 執行緒池中的執行緒從任務隊列中取任務,這裡不需要加鎖,因為這個動作在啟動函數中加鎖的那一段區間中被調用的,其實已經上鎖了

// 放任務
void Put(Task* data)
{
  LockQueue();
  _q.push(data);
  UnlockQueue();
  WakeUpThread();
}
// 取任務
void Get(Task*& data)
{
  data = _q.front();
  _q.pop();
}

這兩個版本都可以實現簡易的執行緒池,下面執行緒池版本的伺服器主要是用版本二來實現,因為版本一要修改的內容有點多,小夥伴們可以自己修改一下

執行緒池版本伺服器

多執行緒版本效果看起來還不錯,但是來一個連接就創建一個執行緒,斷開一個連接就釋放一個執行緒,這樣頻繁地創建和釋放執行緒資源,對OS來說是一種負擔,同時也帶來資源的浪費,如果我們使用執行緒池,把每一個客戶端連接封裝成一個任務,讓執行緒池去處理,這樣就不需要頻繁地創建和銷毀消除,效率也能提升很多。
執行緒池採用版本二,程式碼如下:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Task.hpp"

#define DEFAULT_MAX_PTHREAD 5

class ThreadPool
{
public:
  ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
    :_max_thread(max_pthread)
  {}
  static void* Runtine(void* arg)
  {
    pthread_detach(pthread_self());
    ThreadPool* this_p = (ThreadPool*)arg;

    while (1){
      this_p->LockQueue();
      while (this_p->IsEmpty()){
        this_p->ThreadWait();
      }
      Task* t;
      this_p->Get(t);
      this_p->UnlockQueue();
      // 解鎖後處理任務
      t->Run();
      delete t;
    }
  }
  void ThreadPoolInit()
  {
    pthread_mutex_init(&_mutex, nullptr);
    pthread_cond_init(&_cond, nullptr);
    pthread_t t[_max_thread];
    for(int i = 0; i < _max_thread; ++i)
    {
      pthread_create(t + i, nullptr, Runtine, this);
    }
  }
  void Put(Task* data)
  {
    LockQueue();
    _q.push(data);
    UnlockQueue();
    WakeUpThread();
  }
  void Get(Task*& data)
  {
    data = _q.front();
    _q.pop();
  }
  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
  }
public:
  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }
  void UnlockQueue()
  {
    pthread_mutex_unlock(&_mutex);
  }
  void ThreadWait()
  {
    pthread_cond_wait(&_cond, &_mutex);
  }
  void WakeUpThread()
  {
    pthread_cond_signal(&_cond);
    //pthread_cond_broadcast(&_cond);
  }
  bool IsEmpty()
  {
    return _q.empty();
  } 
private:
  std::queue<Task*>  _q;
  int             _max_thread;
  pthread_mutex_t _mutex;
  pthread_cond_t  _cond;
};

這裡我們單獨寫一個頭文件——Task.hpp,其中有任務類,任務類裡面有三個成員變數,也就是埠號,IP和套接字,其中有一個成員方法——Run,裡面封裝了一個Service函數,也就是前面寫的,把它放在Task.hpp這個頭文件下,執行緒池裡面的執行緒執行run函數即可,頭文件內容如下:

#pragma once
#include <iostream>
#include <unistd.h>

static void Service(std::string ip, int port, int sock)
{
  while (1){
    char buf[256];
    ssize_t size = read(sock, buf, sizeof(buf)-1);
    if (size > 0){
      // 正常讀取size位元組的數據
      buf[size] = 0;
      std::cout << "[" << ip << "]:[" << port  << "]# "<< buf << std::endl;
      std::string msg = "server get!-> ";
      msg += buf;
      write(sock, msg.c_str(), msg.size());
    }
    else if (size == 0){
      // 對端關閉
      std::cout << "[" << ip << "]:[" << port  << "]# close" << std::endl;
      break;
    }
    else{
      // 出錯
      std::cerr << sock << "read error" << std::endl; 
      break;
    }
  }

  close(sock);
  std::cout << "service done" << std::endl;
}

struct Task
{
  int _port;
  std::string _ip;
  int _sock;

  Task(int port, std::string ip, int sock)
    :_port(port)
    ,_ip(ip)
     ,_sock(sock)
  {}
  void Run()
  {
      Service(_ip, _port, _sock);
  }
};

伺服器類的核心程式碼如下:

void loop()
{
  struct sockaddr_in peer;// 獲取遠端埠號和ip資訊
  socklen_t len = sizeof(peer);
  _tp = new ThreadPool(THREAD_NUM); 
  _tp->ThreadPoolInit();
  while (1){
    // 獲取鏈接
    // sock 是進行通訊的一個套接字  _listen_sock 是進行監聽獲取鏈接的一個套接字
    int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);
    if (sock < 0){
      std::cout << "accept fail, continue accept" << std::endl;
      continue;
    }
    int peerPort = ntohs(peer.sin_port);
    std::string peerIp = inet_ntoa(peer.sin_addr);
    std::cout << "get a new link, [" << peerIp << "]:[" << peerPort  << "]"<< std::endl;
    Task* task = new Task(peerPort, peerIp, sock);
    _tp->Put(task);

  }
}

注意幾點變化:

  1. 伺服器類增加一個執行緒池成員變數,初始化函數裡面增加執行緒池創建(在堆上申請)
  2. 析構函數增加釋放執行緒池資源一步
  3. loop函數中只需要封裝任務,並把任務丟進執行緒池中即可