C++一些新的特性的理解(二)

1 C++11多執行緒thread

重點:

  • join和detach的使用場景
  • thread構造函數參數
  • 綁定c函數
  • 綁定類函數
  • 執行緒封裝基礎類
  • 互斥鎖mutex
  • condition notify、wait
  • lock_guard/unique_lock
  • function和bind
  • 非同步future/packaged_task/promise
  • 執行緒池的實現,執行緒池涉及的技術點

1.1 thread

std::thread 在 #include 頭文件中聲明,因此使用 std::thread 時需要包含 #include 頭文件。

1.1.1語法

構造函數

  • 默認構造函數
//創建一個空的thread執行對象。
thread() _NOEXCEPT
{ // construct with no thread
_Thr_set_null(_Thr);
}
  • 初始化構造函數
//創建std::thread執行對象,該thread對象可被joinable,新產生的執行緒會調用threadFun函數,該函
數的參數由 args 給出
template<class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
  • 拷貝構造函數
// 拷貝構造函數(被禁用),意味著 thread 不可被拷貝構造。
thread(const thread&) = delete;
  • Move構造函數
/move 構造函數,調用成功之後 x 不代表任何 thread 執行對象。
注意:可被 joinable 的 thread 對象必須在他們銷毀之前被主執行緒 join 或者將其設置為
detached。
thread(thread&& x)noexcept
#include<iostream>
#include<thread>
using namespace std;
void threadFun(int& a) // 引用傳遞
{
	cout << "this is thread fun !" << endl;
	cout << " a = " << (a += 10) << endl;
}
int main()
{
	int x = 10;
	thread t1(threadFun, std::ref(x));
	thread t2(std::move(t1)); // t1 執行緒失去所有權
	thread t3;
	t3 = std::move(t2); // t2 執行緒失去所有權
	//t1.join(); // ?
	t3.join();
	cout << "Main End " << "x = " << x << endl;
	return 0;
}

主要函數

1.1.2 簡單執行緒的創建

  1. 傳入0個值
  2. 傳入2個值
  3. 傳入引用
  4. 傳入類函數
  5. detach
  6. move
#include<iostream>
#include<thread>
using namespace std;
//1 傳入0個值
void func1()
{
	cout << "func1 into" << endl;
}
//2 傳入2個值
void func2(int a,int b)
{
	cout << "func2 a + b =" << a + b << endl;
}

//3 傳入引用
void func3(int &c)
{
	cout << "func3 c = " << &c << endl;
}

class A
{
public:
	//4.傳入類函數
	void func4(int a)
	{
		cout << "thread:" << _name << " ,func a=" << a << endl;
	}
	void setName(string name)
	{
		_name = name;
	}
	void displayName()
	{
		cout << "this:" << this << ",name:" << _name << endl;
	}
	void play()
	{
		std::cout << "play call!" << std::endl;
	}
private:
	string _name;
};
//5.detach
void func5()
{
	cout << "func5 into sleep" << endl;
	std::this_thread::sleep_for(std::chrono::seconds(1));
	cout << "func 5 leave" << endl;
}

// 6. move
void func6()
{
	cout << "this is func6 !" << endl;
}

int main()
{
	//1.傳入0個值
	cout << "\n\n main1-----------------------------------\n";
	std::thread t1(&func1);
	t1.join();
	//2.傳入2個值
	cout << "\n\n main2-----------------------------------\n";
	int a = 10;
	int b = 20;
	std::thread t2(&func2, a, b);
	t2.join();
	//3.傳入引用
	cout << "\n\n main3-----------------------------------\n";
	int c = 10;
	std::thread t3(func3, std::ref(c));
	t3.join();
	cout << "main c =" << &c << ", " << c << endl;
	//4.傳入類函數
	cout << "\n\n main4-----------------------------------\n";
	std::unique_ptr<A> a4_ptr = make_unique<A>();
	a4_ptr->setName("daren");
	std::thread t4(&A::func4, a4_ptr.get(), 10);
	t4.join();
	//5.detach
	cout << "\n\n main5-----------------------------------\n";
	std::thread t5(&func5);
	t5.detach();
	cout << "\n main5 end\n";
	// 6.move
	cout << "\n\n main6--------------------------\n";
	int x = 10;
	thread t6_1(func6);
	thread t6_2(std::move(t6_1)); // t6_1 執行緒失去所有權

	t6_2.join();


	return 0;
}

1.1.3 執行緒封裝

zero_thread.h

#ifndef ZERO_THREAD_H
#define ZERO_THREAD_H
#include <thread>
class ZERO_Thread	
{
public:
ZERO_Thread(); // 構造函數
virtual ~ZERO_Thread(); // 析構函數
bool start();
void stop();
bool isAlive() const; // 執行緒是否存活.
std::thread::id id() { return th_->get_id(); }
std::thread* getThread() { return th_; }
void join(); // 等待當前執行緒結束, 不能在當前執行緒上調用
void detach(); //能在當前執行緒上調用
static size_t CURRENT_THREADID();
protected:
void threadEntry();
virtual void run() = 0; // 運行
protected:
bool running_; //是否在運行
std::thread* th_;
};
#endif // ZERO_THREAD_H

zero_thread.cpp

#include "ZERO_Thread.h"
#include "zero_thread.h"
#include <sstream>
#include <iostream>
#include <exception>
ZERO_Thread::ZERO_Thread() :
	running_(false), th_(NULL)
{
}
ZERO_Thread::~ZERO_Thread()
{
	if (th_ != NULL)
	{
		//如果到調用析構函數的時候,調用者還沒有調用join則觸發detach,此時是一個比較危險的動作,用戶必須知道他在做什麼
			if (th_->joinable())
			{
				std::cout << "~ZERO_Thread detach\n";
				th_->detach();
			}
		delete th_;
		th_ = NULL;
	}
	std::cout << "~ZERO_Thread()" << std::endl;
}
bool ZERO_Thread::start()
{
	if (running_)
	{
		return false;
	}
	try
	{
		th_ = new std::thread(& ZERO_Thread::threadEntry, this);
	}
	catch (...)
	{
		throw "[ZERO_Thread::start] thread start error";
	}
	return true;
}
void ZERO_Thread::stop()
{
	running_ = false;
}
bool ZERO_Thread::isAlive() const
{
	return running_;
}
void ZERO_Thread::join()
{
	if (th_->joinable())
	{
		th_->join(); // 不是detach才去join
	}
}
void ZERO_Thread::detach()
{
	th_->detach();
}
size_t ZERO_Thread::CURRENT_THREADID()
{
	// 聲明為thread_local的本地變數在執行緒中是持續存在的,不同於普通臨時變數的生命周期,
	// 它具有static變數一樣的初始化特徵和生命周期,即使它不被聲明為static。
	static thread_local size_t threadId = 0;
	if (threadId == 0)
	{
		std::stringstream ss;
		ss << std::this_thread::get_id();
		threadId = strtol(ss.str().c_str(), NULL, 0);
	}
	return threadId;
}
void ZERO_Thread::threadEntry()
{
	running_ = true;
	try
	{
		run(); // 函數運行所在
	}	
	catch (std::exception& ex)
	{
		running_ = false;
		throw ex;
	}
	catch (...)
	{
		running_ = false;
		throw;
	}
	running_ = false;
}

main.cpp

#include <iostream>
#include <chrono>
#include "zero_thread.h"
using namespace std;
class A : public ZERO_Thread
{
public:
	void run()
	{
		while (running_)
		{
			cout << "print A " << endl;
			std::this_thread::sleep_for(std::chrono::seconds(5));
		}
		cout << "----- leave A " << endl;
	}
};
class B : public ZERO_Thread
{
public:
	void run()
	{
		while (running_)
		{
			cout << "print B " << endl;
			std::this_thread::sleep_for(std::chrono::seconds(2));
		}
		cout << "----- leave B " << endl;
	}
};
int main()
{
	{
		A a;
		a.start();
		B b;
		b.start();
		std::this_thread::sleep_for(std::chrono::seconds(5));
		a.stop();
		a.join();
		b.stop();
		b.join();
	}
	cout << "Hello World!" << endl;
	return 0;
}

1.1.4 進一步閱讀

//cplusplus.com/reference/thread/this_thread/

  • get_id
    Get thread id (function )
  • yield
    Yield to other threads (function )
  • sleep_until
    Sleep until time point (function )
  • sleep_for
    Sleep for time span (function )

1.2互斥量

mutex又稱互斥量,C++ 11中與 mutex相關的類(包括鎖類型)和函數都聲明在 頭文件中,所以如果
你需要使用 std::mutex,就必須包含 頭文件。
C++11提供如下4種語義的互斥量(mutex)

  • std::mutex,獨佔的互斥量,不能遞歸使用。
  • std::time_mutex,帶超時的獨佔互斥量,不能遞歸使用。
  • std::recursive_mutex,遞歸互斥量,不帶超時功能。
  • std::recursive_timed_mutex,帶超時的遞歸互斥量。

1.2.1 獨佔互斥量std::mutex

std::mutex介紹

下面以 std::mutex 為例介紹 C++11 中的互斥量用法。
std::mutex 是C++11 中最基本的互斥量,std::mutex 對象提供了獨佔所有權的特性——即不支援遞歸地
對 std::mutex 對象上鎖,而 std::recursive_lock 則可以遞歸地對互斥量對象上鎖。

std::mutex 的成員函數

  • 構造函數,std::mutex不允許拷貝構造,也不允許 move 拷貝,最初產生的mutex對象是處於unlocked 狀態的。
  • lock(),調用執行緒將鎖住該互斥量。執行緒調用該函數會發生下面 3 種情況:(1). 如果該互斥量當前沒有被鎖住,則調用執行緒將該互斥量鎖住,直到調用 unlock之前,該執行緒一直擁有該鎖。(2). 如果當前互斥量被其他執行緒鎖住,則當前的調用執行緒被阻塞住。(3). 如果當前互斥量被當前調用執行緒鎖住,則會產生死鎖(deadlock)。
  • unlock(), 解鎖,釋放對互斥量的所有權。
  • try_lock(),嘗試鎖住互斥量,如果互斥量被其他執行緒佔有,則當前執行緒也不會被阻塞。執行緒調用該函數也會出現下面 3 種情況,(1). 如果當前互斥量沒有被其他執行緒佔有,則該執行緒鎖住互斥量,直到該執行緒調用 unlock 釋放互斥量。(2). 如果當前互斥量被其他執行緒鎖住,則當前調用執行緒返回false,而並不會被阻塞掉。(3). 如果當前互斥量被當前調用執行緒鎖住,則會產生死鎖(deadlock)。

範例1-2-mutex1

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

volatile int counter(0); // non-atomic counter
std::mutex mtx;           // locks access to counter

void increases_10k()
{
    for (int i=0; i<10000; ++i) {
        // 1. 使用try_lock的情況
        if (mtx.try_lock()) {   // only increase if currently not locked:
            ++counter;
            mtx.unlock();
        }
        // 2. 使用lock的情況
        //        {
        //            mtx.lock();
        //            ++counter;
        //            mtx.unlock();
        //        }
    }
}

int main()
{
    std::thread threads[10];
    for (int i=0; i<10; ++i)
        threads[i] = std::thread(increases_10k);

    for (auto& th : threads) th.join();
    std::cout << " successful increases of the counter "  << counter << std::endl;

    return 0;
}

1.2.2 遞歸互斥量std::recursive_mutex

遞歸鎖允許同一個執行緒多次獲取該互斥鎖,可以用來解決同一執行緒需要多次獲取互斥量時死鎖的問題。

死鎖範例1-2-mutex2-dead-lock

//死鎖範例1-2-mutex2-dead-lock
#include <iostream>
#include <thread>
#include <mutex>

struct Complex
{
    std::mutex mutex;
    int i;

    Complex() : i(0){}

    void mul(int x)
    {
        std::lock_guard<std::mutex> lock(mutex);
        i *= x;
    }

    void div(int x)
    {
        std::lock_guard<std::mutex> lock(mutex);
        i /= x;
    }

    void both(int x, int y)
    {
        //lock_guard 構造函數加鎖, 析構函數釋放鎖
        std::lock_guard<std::mutex> lock(mutex);
        mul(x); // 獲取不了鎖
        div(y);
    }

    void init()
    {
        //lock_guard 構造函數加鎖, 析構函數釋放鎖
        std::lock_guard<std::mutex> lock(mutex);
        sub_init();
    }
    void sub_init()
    {
        std::lock_guard<std::mutex> lock(mutex);
    }
};

int main(void)
{
    Complex complex;

    complex.both(32, 23);
    std::cout << "main finish\n";
    return 0;
}

運行後出現死鎖的情況。在調用both時獲取了互斥量,在調用mul時又要獲取互斥量,但both的並沒有
釋放,從而產生死鎖。
使用遞歸鎖

//遞歸鎖1-2-recursive_mutex1
#include <iostream>
#include <thread>
#include <mutex>

struct Complex
{
    std::recursive_mutex mutex;
    int i;

    Complex() : i(0){}

    void mul(int x)
    {
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i *= x;
    }

    void div(int x)
    {
        std::unique_lock<std::recursive_mutex> lock(mutex);


        i /= x;
    }

    void both(int x, int y)
    {
        std::lock_guard<std::recursive_mutex> lock(mutex);
        mul(x);
        div(y);
    }
};

int main(void)
{
    Complex complex;

    complex.both(32, 23);  //因為同一執行緒可以多次獲取同一互斥量,不會發生死鎖

    std::cout << "main finish\n";
    return 0;
}

雖然遞歸鎖能解決這種情況的死鎖問題,但是盡量不要使用遞歸鎖,主要原因如下:

  1. 需要用到遞歸鎖的多執行緒互斥處理本身就是可以簡化的,允許遞歸很容易放縱複雜邏輯的產生,並
    且產生晦澀,當要使用遞歸鎖的時候應該重新審視自己的程式碼是否一定要使用遞歸鎖;
  2. 遞歸鎖比起非遞歸鎖,效率會低;
  3. 遞歸鎖雖然允許同一個執行緒多次獲得同一個互斥量,但可重複獲得的最大次數並未具體說明,一旦
    超過一定的次數,再對lock進行調用就會拋出std::system錯誤。

1.2.3 帶超時的互斥量std::timed_mutex和std::recursive_timed_mutex

std::timed_mutex比std::mutex多了兩個超時獲取鎖的介面:try_lock_for和try_lock_until

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::timed_mutex mutex;
void work()
{
std::chrono::milliseconds timeout(100);
while (true)
{
if (mutex.try_lock_for(timeout))
{
std::cout << std::this_thread::get_id() << ": do work with the
mutex" << std::endl;
std::chrono::milliseconds sleepDuration(250);
std::this_thread::sleep_for(sleepDuration);
mutex.unlock();
std::this_thread::sleep_for(sleepDuration);
}
else
{
std::cout << std::this_thread::get_id() << ": do work without the
mutex" << std::endl;
std::chrono::milliseconds sleepDuration(100);
std::this_thread::sleep_for(sleepDuration);
}
}
}
int main(void)
{
std::thread t1(work);
std::thread t2(work);
t1.join();
t2.join();
std::cout << "main finish\n";
return 0;
}

1.2.4 lock_guard和unique_lock的使用和區別

相對於手動lock和unlock,我們可以使用RAII(通過類的構造析構)來實現更好的編碼方式。
RAII:也稱為「資源獲取就是初始化」,是c++等程式語言常用的管理資源、避免記憶體泄露的方法。它保證
在任何情況下,使用對象時先構造對象,最後析構對象。

1 unique_lock,lock_guard的使用

這裡涉及到unique_lock,lock_guard的使用。

#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock_guard
#include <stdexcept>      // std::logic_error

std::mutex mtx;

void print_even (int x) {
    if (x%2==0) std::cout << x << " is even\n";
    else throw (std::logic_error("not even"));
}

void print_thread_id (int id) {
    try {
//        這裡的lock_guard換成unique_lock是一樣的。
        // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
        std::lock_guard<std::mutex> lck (mtx);
        print_even(id);
    }
    catch (std::logic_error&) {
        std::cout << "[exception caught]\n";
    }
}

int main ()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i=0; i<10; ++i)
        threads[i] = std::thread(print_thread_id,i+1);

    for (auto& th : threads) th.join();

    return 0;
}

2 unique_lock,lock_guard的區別

  • unique_lock與lock_guard都能實現自動加鎖和解鎖,但是前者更加靈活,能實現更多的功能。
  • unique_lock可以進行臨時解鎖和再上鎖,如在構造對象之後使用lck.unlock()就可以進行解鎖,lck.lock()進行上鎖,而不必等到析構時自動解鎖。
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;
int count = 0;

void fun1() {
    while (true) {
//        {
        std::unique_lock<std::mutex> locker(mu); // 能否換成lock_guard
        q.push_front(count++);
        locker.unlock();        // 這裡是不是必須的?
        cond.notify_one();
//        }
        sleep(1);
    }
}

void fun2() {
    while (true) {
        std::unique_lock<std::mutex> locker(mu);
        cond.wait(locker, [](){return !q.empty();});
        auto data = q.back();
        q.pop_back();
//        locker.unlock(); // 這裡是不是必須的?
        std::cout << "thread2 get value form thread1: " << data << std::endl;
    }
}
int main() {
    std::thread t1(fun1);
    std::thread t2(fun2);
    t1.join();
    t2.join();
    return 0;
}

條件變數的目的就是為了,在沒有獲得某種提醒時長時間休眠; 如果正常情況下, 我們需要一直循環
(+sleep), 這樣的問題就是CPU消耗+時延問題,條件變數的意思是在cond.wait這裡一直休眠直到
cond.notify_one喚醒才開始執行下一句; 還有cond.notify_all()介面用於喚醒所有等待的執行緒。

那麼為什麼必須使用unique_lock呢?

原因: 條件變數在wait時會進行unlock再進入休眠, lock_guard並無該操作介面
wait: 如果執行緒被喚醒或者超時那麼會先進行lock獲取鎖, 再判斷條件(傳入的參數)是否成立, 如果成立則wait函數返回否則釋放鎖繼續休眠
notify:進行notify動作並不需要獲取鎖
使用場景:需要結合notify+wait的場景使用unique_lock; 如果只是單純的互斥使用lock_guard

3 總結

lock_guard

  1. std::lock_guard 在構造函數中進行加鎖,析構函數中進行解鎖。
  2. 鎖在多執行緒編程中,使用較多,因此c++11提供了lock_guard模板類;在實際編程中,我們也可以根
    據自己的場景編寫resource_guard RAII類,避免忘掉釋放資源。

std::unique_lock

  1. unique_lock 是通用互斥包裝器,允許延遲鎖定、鎖定的有時限嘗試、遞歸鎖定、所有權轉移和與條件變數一同使用。
  2. unique_lock比lock_guard使用更加靈活,功能更加強大。
  3. 使用unique_lock需要付出更多的時間、性能成本。

1.3 條件變數

互斥量是多執行緒間同時訪問某一共享變數時,保證變數可被安全訪問的手段。但單靠互斥量無法實現線
程的同步。執行緒同步是指執行緒間需要按照預定的先後次序順序進行的行為。C++11對這種行為也提供了
有力的支援,這就是條件變數。條件變數位於頭文件condition_variable下。
//www.cplusplus.com/reference/condition_variable/condition_variable
條件變數使用過程:

  1. 擁有條件變數的執行緒獲取互斥量;
  2. 循環檢查某個條件,如果條件不滿足則阻塞直到條件滿足;如果條件滿足則向下執行;
  3. 某個執行緒滿足條件執行完之後調用notify_one或notify_all喚醒一個或者所有等待執行緒。

1.3.1 成員函數

1 wait函數

函數原型

void wait (unique_lock<mutex>& lck);
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);

包含兩種重載,第一種只包含unique_lock對象,另外一個Predicate 對象(等待條件),這裡必須使用
unique_lock,因為wait函數的工作原理:

  • 當前執行緒調用wait()後將被阻塞並且函數會解鎖互斥量,直到另外某個執行緒調用notify_one或者
    notify_all喚醒當前執行緒;一旦當前執行緒獲得通知(notify),wait()函數也是自動調用lock(),同理不
    能使用lock_guard對象。
  • 如果wait沒有第二個參數,第一次調用默認條件不成立,直接解鎖互斥量並阻塞到本行,直到某一
    個執行緒調用notify_one或notify_all為止,被喚醒後,wait重新嘗試獲取互斥量,如果得不到,執行緒
    會卡在這裡,直到獲取到互斥量,然後無條件地繼續進行後面的操作。
  • 如果wait包含第二個參數,如果第二個參數不滿足,那麼wait將解鎖互斥量並堵塞到本行,直到某
    一個執行緒調用notify_one或notify_all為止,被喚醒後,wait重新嘗試獲取互斥量,如果得不到,線
    程會卡在這裡,直到獲取到互斥量,然後繼續判斷第二個參數,如果表達式為false,wait對互斥
    量解鎖,然後休眠,如果為true,則進行後面的操作。

2 wait_for函數

函數原型:

template <class Rep, class Period>
cv_status wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time);
template <class Rep, class Period, class Predicate>
bool wait_for (unique_lock<mutex>& lck,
const chrono::duration<Rep,Period>& rel_time, Predicate
pred);

和wait不同的是,wait_for可以執行一個時間段,在執行緒收到喚醒通知或者時間超時之前,該執行緒都會
處於阻塞狀態,如果收到喚醒通知或者時間超時,wait_for返回,剩下操作和wait類似。

3 wait_until函數

函數原型:

template <class Clock, class Duration>
cv_status wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time);
template <class Clock, class Duration, class Predicate>
bool wait_until (unique_lock<mutex>& lck,
const chrono::time_point<Clock,Duration>& abs_time,
Predicate pred);

與wait_for類似,只是wait_until可以指定一個時間點,在當前執行緒收到通知或者指定的時間點超時之
前,該執行緒都會處於阻塞狀態。如果超時或者收到喚醒通知,wait_until返回,剩下操作和wait類似

4 notify_one函數

函數原型:

void notify_one() noexcept;

解鎖正在等待當前條件的執行緒中的一個,如果沒有執行緒在等待,則函數不執行任何操作,如果正在等待
的執行緒多餘一個,則喚醒的執行緒是不確定的。

5 notify_all函數

函數原型:

void notify_all() noexcept;

解鎖正在等待當前條件的所有執行緒,如果沒有正在等待的執行緒,則函數不執行任何操作.

1.3.2 範例

使用條件變數實現一個同步隊列,同步隊列作為一個執行緒安全的數據共享區,經常用於執行緒之間數據讀
取。

sync_queue.h

//同步隊列的實現1-3-condition-sync-queue
#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
template<typename T>
class SyncQueue
{
private:
bool IsFull() const
{
return _queue.size() == _maxSize;
}
bool IsEmpty() const
{
return _queue.empty();
}
public:
SyncQueue(int maxSize) : _maxSize(maxSize)
{
}
void Put(const T& x)
{
std::lock_guard<std::mutex> locker(_mutex);
while (IsFull())
{
std::cout << "full wait..." << std::endl;
_notFull.wait(_mutex);
}
_queue.push_back(x);
_notFull.notify_one();
}
void Take(T& x)
{
std::lock_guard<std::mutex> locker(_mutex);
while (IsEmpty())
{
std::cout << "empty wait.." << std::endl;
_notEmpty.wait(_mutex);
}
x = _queue.front();
_queue.pop_front();
_notFull.notify_one();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.size() == _maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(_mutex);
return _queue.size();
}
int Count()
{
return _queue.size();
}
private:
std::list<T> _queue; //緩衝區
std::mutex _mutex; //互斥量和條件變數結合起來使用
std::condition_variable_any _notEmpty;//不為空的條件變數
std::condition_variable_any _notFull; //沒有滿的條件變數
int _maxSize; //同步隊列最大的size
};
#endif // SYNC_QUEUE_H

main.cpp

#include <iostream>
#include "sync_queue.h"
#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
SyncQueue<int> syncQueue(5);
void PutDatas()
{
for (int i = 0; i < 20; ++i)
{
syncQueue.Put(888);
}
std::cout << "PutDatas finish\n";
}
void TakeDatas()
{
int x = 0;
for (int i = 0; i < 20; ++i)
{
syncQueue.Take(x);
std::cout << x << std::endl;
}
std::cout << "TakeDatas finish\n";
}
int main(void)
{
std::thread t1(PutDatas);
std::thread t2(TakeDatas);
t1.join();
t2.join();
std::cout << "main finish\n";
return 0;
}

程式碼中用到了std::lock_guard,它利用RAII機制可以保證安全釋放mutex。

std::lock_guard<std::mutex> locker(_mutex);
while (IsFull())
{
std::cout << "full wait..." << std::endl;
_notFull.wait(_mutex);
}

可以改成

std::lock_guard<std::mutex> locker(_mutex);
_notFull.wait(_mutex, [this] {return !IsFull();});

兩種寫法效果是一樣的,但是後者更簡潔,條件變數會先檢查判斷式是否滿足條件,如果滿足條件則重
新獲取mutex,然後結束wait繼續往下執行;如果不滿足條件則釋放mutex,然後將執行緒置為waiting狀
態繼續等待。
這裡需要注意的是,wait函數中會釋放mutex,而lock_guard這時還擁有mutex,它只會在出了作用域
之後才會釋放mutex,所以這時它並不會釋放,但執行wait時會提前釋放mutex

從語義上看這裡使用lock_guard會產生矛盾,但是實際上並不會出問題,因為wait提前釋放鎖之後會處
於等待狀態,在被notify_one或者notify_all喚醒後會先獲取mutex,這相當於lock_guard的mutex在
釋放之後又獲取到了
,因此,在出了作用域之後lock_guard自動釋放mutex不會有問題。
這裡應該用unique_lock,因為unique_lock不像lock_guard一樣只能在析構時才釋放鎖,它可以隨時釋
放鎖,因此在wait時讓unique_lock釋放鎖從語義上更加準確。
使用unique_lock和condition_variable_variable改寫condition-sync-queue,改寫為用等待一個判斷式的方法來實現一個簡單的隊列。

condition-sync-queue2

#ifndef SIMPLE_SYNC_QUEUE_H
#define SIMPLE_SYNC_QUEUE_H
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>

template<typename T>
class SimpleSyncQueue
{
public:
    SimpleSyncQueue(){}

    void Put(const T& x)
    {
        std::lock_guard<std::mutex> locker(_mutex);
        _queue.push_back(x);
        _notEmpty.notify_one();
    }

    void Take(T& x)
    {
        std::unique_lock<std::mutex> locker(_mutex);
        _notEmpty.wait(locker, [this]{return !_queue.empty(); });

        x = _queue.front();
        _queue.pop_front();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> locker(_mutex);
        return _queue.empty();
    }

    size_t Size()
    {
        std::lock_guard<std::mutex> locker(_mutex);
        return _queue.size();
    }

private:
    std::list<T> _queue;
    std::mutex _mutex;
    std::condition_variable _notEmpty;
};
#endif // SIMPLE_SYNC_QUEUE_H

main.cpp

#include <iostream>
#include "sync_queue.h"
#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
SimpleSyncQueue<int> syncQueue;

void PutDatas()
{
    for (int i = 0; i < 20; ++i)
    {
        syncQueue.Put(888);
    }
}

void TakeDatas()
{
    int x = 0;

    for (int i = 0; i < 20; ++i)
    {
        syncQueue.Take(x);
        std::cout << x << std::endl;
    }
}

int main(void)
{
    std::thread t1(PutDatas);
    std::thread t2(TakeDatas);

    t1.join();
    t2.join();

     std::cout << "main finish\n";
    return 0;
}

1.4 原子變數

具體參考://www.cplusplus.com/reference/atomic/atomic/

// atomic::load/store example
#include <iostream> // std::cout
#include <atomic> // std::atomic, std::memory_order_relaxed
#include <thread> // std::thread
//std::atomic<int> count = 0;//錯誤初始化
std::atomic<int> count(0); // 準確初始化
void set_count(int x)
{
std::cout << "set_count:" << x << std::endl;
count.store(x, std::memory_order_relaxed); // set value atomically
}
void print_count()
{
int x;
do {
x = count.load(std::memory_order_relaxed); // get value atomically
} while (x==0);
std::cout << "count: " << x << '\n';
}
int main ()
{
std::thread t1 (print_count);
std::thread t2 (set_count, 10);
t1.join();
t2.join();
std::cout << "main finish\n";
return 0;
}

1.5. 非同步操作

  • std::future : 非同步指向某個任務,然後通過future特性去獲取任務函數的返回結果。
  • std::aysnc: 非同步運行某個任務函數
  • std::packaged_task :將任務和feature綁定在一起的模板,是一種封裝對任務的封裝。
  • std::promise
    參考C++官方手冊的範例。

1.5.1 std::aysnc和std::future

std::future期待一個返回,從一個非同步調用的角度來說,future更像是執行函數的返回值,C++標準庫使用std::future為一次性事件建模,如果一個事件需要等待特定的一次性事件,那麼這執行緒可以獲取一
個future對象來代表這個事件。
非同步調用往往不知道何時返回,但是如果非同步調用的過程需要同步,或者說後一個非同步調用需要使用前
一個非同步調用的結果。這個時候就要用到future。
執行緒可以周期性的在這個future上等待一小段時間,檢查future是否已經ready,如果沒有,該執行緒可以
先去做另一個任務,一旦future就緒,該future就無法複位(無法再次使用這個future等待這個事
件),所以future代表的是一次性事件。

future的類型

在庫的頭文件中聲明了兩種future,唯一future(std::future)和共享future(std::shared_future)這
兩個是參照。
std::unique_ptr和std::shared_ptr設立的,前者的實例是僅有的一個指向其關聯事件的實例,而後者可
以有多個實例指向同一個關聯事件,當事件就緒時,所有指向同一事件的std::shared_future實例會變成
就緒。

future的使用

std::future是一個模板,例如std::future,模板參數就是期待返回的類型,雖然future被用於執行緒間通
信,但其本身卻並不提供同步訪問,熱門必須通過互斥元或其他同步機制來保護訪問。
future使用的時機是當你不需要立刻得到一個結果的時候,你可以開啟一個執行緒幫你去做一項任務,並
期待這個任務的返回,但是std::thread並沒有提供這樣的機制,這就需要用到std::async和std::future
(都在頭文件中聲明)
std::async返回一個std::future對象,而不是給你一個確定的值(所以當你不需要立刻使用此值的時候才
需要用到這個機制)。當你需要使用這個值的時候,對future使用get(),執行緒就會阻塞直到future就
緒,然後返回該值。

#include<iostream>
#include<future>
#include<thread>
using namespace std;

int find_result_to_add()
{
	//std::this_thread::sleep_for(std::chrono::seconds(2));
	std::cout << "find_result_to add" << std::endl;
	return 1 + 1;
}

int find_result_to_add2(int a,int b)
{
	//std::this_thread::sleep_for(std::chrono::seconds(5));
	return a + b;
}

void do_other_thing()
{
	std::cout << "do_other_things" << std::endl;
}
int main()
{
	std::future<int> result = std::async(find_result_to_add);
	do_other_thing();
	std::cout << "result: " << result.get() << std::endl;
	auto result2 = std::async(find_result_to_add2, 10, 20);
	std::cout << "result2:" << result2.get() << std::endl;
	return 0;
}

跟thread類似,async允許你通過將額外的參數添加到調用中,來將附加參數傳遞給函數。如果傳入的
函數指針是某個類的成員函數,則還需要將類對象指針傳入(直接傳入,傳入指針,或者是std::ref封
裝)。
默認情況下,std::async是否啟動一個新執行緒,或者在等待future時,任務是否同步運行都取決於你給的
參數。這個參數為std::launch類型

  • std::launch::defered表明該函數會被延遲調用,直到在future上調用get()或者wait()為止
  • std::launch::async,表明函數會在自己創建的執行緒上運行
  • std::launch::any = std::launch::defered | std::launch::async
  • std::launch::sync = std::launch::defered
enum class launch
{
async,deferred,sync=deferred,any=async|deferred
};

PS:默認選項參數被設置為std::launch::any。如果函數被延遲運行可能永遠都不會運行。

1.5.2 std::packaged_task

如果說std::async和std::feature還是分開看的關係的話,那麼std::packaged_task就是將任務和feature
綁定在一起的模板,是一種封裝對任務的封裝。
The class template std::packaged_task wraps any Callable target (function, lambda expression,bind expression, or another function object) so that it can be invoked asynchronously. Its return value or exception thrown is stored in a shared state which can be accessed through std::future objects.

可以通過std::packaged_task對象獲取任務相關聯的feature,調用get_future()方法可以獲得
std::packaged_task對象綁定的函數的返回值類型的future。std::packaged_task的模板參數是函數簽
名。

PS:例如int add(int a, intb)的函數簽名就是int(int, int)

#include <iostream>
#include <future>
using namespace std;
int add(int a, int b, int c)
{
std::cout << "call add\n";
return a + b + c;
}
void do_other_things()
{
std::cout << "do_other_things" << std::endl;
}
int main()
{
std::packaged_task<int(int, int, int)> task(add); // 封裝任務
do_other_things();
std::future<int> result = task.get_future();
task(1, 1, 2); //必須要讓任務執行,否則在get()獲取future的值時會一直阻塞
std::cout << "result:" << result.get() << std::endl;
return 0;
}

1.5.3 std::promise

std::promise提供了一種設置值的方式,它可以在這之後通過相關聯的std::future對象進行讀取。換種說法,之前已經說過std::future可以讀取一個非同步函數的返回值了,那麼這個std::promise就提供一種方式手動讓future就緒。

//1-6-promise
// std::promise和std::future配合,可以在執行緒之間傳遞數據。
#include <future>
#include <string>
#include <thread>
#include <iostream>
using namespace std;
void print(std::promise<std::string>& p)
{
    p.set_value("There is the result whitch you want.");
}

void print2(std::promise<int>& p)
{
    p.set_value(1);
}

void do_some_other_things()
{
    std::cout << "Hello World" << std::endl;
}

int main()
{
    std::promise<std::string> promise;

    std::future<std::string> result = promise.get_future();
    std::thread t(print, std::ref(promise));
    do_some_other_things();
    std::cout <<"result " << result.get() << std::endl;
    t.join();

    std::promise<int> promise2;

    std::future<int> result2 = promise2.get_future();
    std::thread t2(print2, std::ref(promise2));
    do_some_other_things();
    std::cout << "result2 " << result2.get() << std::endl;
    t2.join();
    return 0;
}

由此可以看出在promise創建好的時候future也已經創建好了
執行緒在創建promise的同時會獲得一個future,然後將promise傳遞給設置他的執行緒,當前執行緒則持有
future,以便隨時檢查是否可以取值.

1.5.4 總結

future的表現為期望,當前執行緒持有future時,期望從future獲取到想要的結果和返回,可以把future當
做非同步函數的返回值。而promise是一個承諾,當執行緒創建了promise對象後,這個promise對象向執行緒承諾他必定會被人設置一個值,和promise相關聯的future就是獲取其返回的手段。

2 Function和bind用法

在設計回調函數的時候,無可避免地會接觸到可回調對象。在C++11中,提供了std::function和std::bind兩個方法來對可回調函數進行統一和封裝。
C++語言中有幾種可調用對象:lambda表達式、bind創建的對象以及重載了函數調用運算符的類。
和其他對象一樣,可調用對象也有類型。例如,每個lambda有它自己唯一的(未命名)類類型;函數及函數指針的類型則由其返回值類型和實參類型決定。

2.1 function的用法

包含頭文件:#include

  1. 保存普通函數
//保存普通函數
void func1(int a)
{
cout << a << endl;
}
//1. 保存普通函數
std::function<void(int a)> func;
func = func1;
func(2); //2

2.保存lambda表達式

std::function<void()> func_1 = [](){cout << "hello world" << endl;};
func_1(); //hello world

3.保存成員函數

//保存成員函數
class A{
public:
A(string name) : name_(name){}
void func3(int i) const {cout <<name_ << ", " << i << endl;}
private:
string name_;
};
//3 保存成員函數
std::function<void(const A&,int)> func3_ = &A::func3;
A a("darren");
func3_(a, 1);

完整程式碼:範例:2-1-function

#include <iostream>
#include <functional>
using namespace std;
//保存普通函數
void func1(int a)
{
cout << a << endl;
}
//保存成員函數
class A{
public:
A(string name) : name_(name){}
void func3(int i) const {cout <<name_ << ", " << i << endl;}
private:
string name_;
};
int main()
{
cout << "main1 -----------------" << endl;
//1. 保存普通函數
std::function<void(int a)> func1_;
func1_ = func1;
func1_(2); //2
cout << "\n\nmain2 -----------------" << endl;
//2. 保存lambda表達式
std::function<void()> func2_ = [](){cout << "hello lambda" << endl;};
func2_(); //hello world
cout << "\n\nmain3 -----------------" << endl;
//3 保存成員函數
std::function<void(const A&,int)> func3_ = &A::func3;
A a("darren");
func3_(a, 1);
return 0;
}

2.2bind用法

可將bind函數看作是一個通用的函數適配器,它接受一個可調用對象,生成一個新的可調用對象來「適
應」原對象的參數列表。
調用bind的一般形式:auto newCallable = bind(callable, arg_list);
其中,newCallable本身是一個可調用對象,arg_list是一個逗號分隔的參數列表,對應給定的callable的
參數。即,當我們調用newCallable時,newCallable會調用callable,並傳給它arg_list中的參數。

3 可變模板參數

C++11的新特性–可變模版參數(variadic templates)是C++11新增的最強大的特性之一,它對參數進
行了高度泛化,它能表示0到任意個數、任意類型的參數

3.1 可變模版參數的展開

可變參數模板語法

template <class... T>
void f(T... args);

上面的可變模版參數的定義當中,省略號的作用有兩個:

  1. 聲明一個參數包T… args,這個參數包中可以包含0到任意個模板參數;
  2. 在模板定義的右邊,可以將參數包展開成一個一個獨立的參數。

上面的參數args前面有省略號,所以它就是一個可變模版參數,我們把帶省略號的參數稱為「參數包」,
它裡面包含了0到N(N>=0)個模版參數。我們無法直接獲取參數包args中的每個參數的,只能通過展
開參數包的方式來獲取參數包中的每個參數,這是使用可變模版參數的一個主要特點,也是最大的難
點,即如何展開可變模版參數。
可變模版參數和普通的模版參數語義是一致的,所以可以應用於函數和類,即可變模版參數函數和可變
模版參數類,然而,模版函數不支援偏特化,所以可變模版參數函數和可變模版參數類展開可變模版參
數的方法還不盡相同,下面我們來分別看看他們展開可變模版參數的方法。

3.1.1 可變模版參數函數

//3-1-variable-parameter 一個簡單的可變模版參數函數
#include <iostream>
using namespace std;
template <class... T>
void f(T... args)
{
cout << sizeof...(args) << endl; //列印變參的個數
}
int main()
{
f(); //0
f(1, 2); //2
f(1, 2.5, ""); //3
return 0;
}

上面的例子中,f()沒有傳入參數,所以參數包為空,輸出的size為0,後面兩次調用分別傳入兩個和三個
參數,故輸出的size分別為2和3。由於可變模版參數的類型和個數是不固定的,所以我們可以傳任意類
型和個數的參數給函數f。這個例子只是簡單的將可變模版參數的個數列印出來,如果我們需要將參數包
中的每個參數列印出來的話就需要通過一些方法了。
展開可變模版參數函數的方法一般有兩種:

  1. 通過遞歸函數來展開參數包,
  2. 是通過逗號表達式來展開參數包。
    下面來看看如何用這兩種方法來展開參數包。

遞歸函數方式展開參數包

通過遞歸函數展開參數包,需要提供一個參數包展開的函數和一個遞歸終止函數,遞歸終止函數正是用
來終止遞歸的.

//3-1-variable-parameter2 遞歸函數方式展開參數包
#include <iostream>
using namespace std;
//遞歸終止函數
void print()
{
cout << "empty" << endl;
}
//展開函數
template <class T, class ...Args>
void print(T head, Args... rest)
{
cout << "parameter " << head << endl;
print(rest...);
}
int main(void)
{
print(1,2,3,4);
return 0;
}

上例會輸出每一個參數,直到為空時輸出empty。展開參數包的函數有兩個,一個是遞歸函數,另外一
個是遞歸終止函數,參數包Args…在展開的過程中遞歸調用自己,每調用一次參數包中的參數就會少一
個,直到所有的參數都展開為止,當沒有參數時,則調用非模板函數print終止遞歸過程。
上面的遞歸終止函數還可以寫成這樣:

template <class T>
void print(T t)
{
cout << t << endl;
}

逗號表達式展開參數包

遞歸函數展開參數包是一種標準做法,也比較好理解,但也有一個缺點,就是必須要一個重載的遞歸終止
函數,即必須要有一個同名的終止函數來終止遞歸,這樣可能會感覺稍有不便。有沒有一種更簡單的方
式呢?其實還有一種方法可以不通過遞歸方式來展開參數包,這種方式需要藉助逗號表達式和初始化列
表。比如前面print的例子可以改成這樣:

#include <iostream>
using namespace std;
template <class T>
void printarg(T t)
{
cout << t << endl;
}
template <class ...Args>
void expand(Args... args)
{
int arr[] = {(printarg(args), 0)...};
}
int main()
{
expand(1,2,3,4);
return 0;
}

這個例子將分別列印出1,2,3,4四個數字。這種展開參數包的方式,不需要通過遞歸終止函數,是直接在expand函數體中展開的, printarg不是一個遞歸終止函數,只是一個處理參數包中每一個參數的函數。
expand函數中的逗號表達式:(printarg(args), 0),先執行printarg(args),再得到逗號表達式的結果0。
同時還用到了C++11的另外一個特性——初始化列表,通過初始化列表來初始化一個變長數組,{(printarg(args), 0)…}將會展開成((printarg(arg1),0), (printarg(arg2),0), (printarg(arg3),0), etc… ),最終會創建一個元素值都為0的數組int arr[sizeof…(Args)]。由於是逗號表達式,在創建數組的過程中會先執行逗號表達式前面的部分printarg(args)列印出參數,也就是說在構造int數組的過程中就將參數包展開了,這個數組的目的純粹是為了在數組構造的過程展開參數包。我們可以把上面的例子再進一步改進一下,將函數作為參數,就可以支援lambda表達式了,從而可以少寫一個遞歸終止函數了,具體程式碼如下:

#include <iostream>
using namespace std;
template<class F, class... Args>void expand(const F& f, Args&&...args)
{
//這裡用到了完美轉發
initializer_list<int>{(f(std::forward< Args>(args)),0)...};
}
int main()
{
expand([](int i){cout<<i<<endl;}, 1,2,3);
return 0;
}

4 實現C++執行緒池

見課上分析
重點

  • 可變參數
  • std::future
  • decltype
  • packaged_task
  • bind
  • 支援可變參數列表
  • 支援獲取任務返回值

zero_threadpool.h

//zero_threadpool.h
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H

#include <future>
#include <functional>
#include <iostream>
#include <queue>
#include <mutex>
#include <memory>
#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif
using namespace std;



void getNow(timeval *tv);
int64_t getNowMs();

#define TNOW      getNow()
#define TNOWMS    getNowMs()

/////////////////////////////////////////////////
/**
 * @file zero_thread_pool.h
 * @brief 執行緒池類,採用c++11來實現了,
 * 使用說明:
 * ZERO_ThreadPool tpool;
 * tpool.init(5);   //初始化執行緒池執行緒數
 * //啟動執行緒方式
 * tpool.start();
 * //將任務丟到執行緒池中
 * tpool.exec(testFunction, 10);    //參數和start相同
 * //等待執行緒池結束
 * tpool.waitForAllDone(1000);      //參數<0時, 表示無限等待(注意有人調用stop也會推出)
 * //此時: 外部需要結束執行緒池是調用
 * tpool.stop();
 * 注意:
 * ZERO_ThreadPool::exec執行任務返回的是個future, 因此可以通過future非同步獲取結果, 比如:
 * int testInt(int i)
 * {
 *     return i;
 * }
 * auto f = tpool.exec(testInt, 5);
 * cout << f.get() << endl;   //當testInt在執行緒池中執行後, f.get()會返回數值5
 *
 * class Test
 * {
 * public:
 *     int test(int i);
 * };
 * Test t;
 * auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
 * //返回的future對象, 可以檢查是否執行
 * cout << f.get() << endl;
 */

class ZERO_ThreadPool
{
protected:
    struct TaskFunc
    {
        TaskFunc(uint64_t expireTime) : _expireTime(expireTime)
        { }

        std::function<void()>   _func;
        int64_t                _expireTime = 0;	//超時的絕對時間
    };
    typedef shared_ptr<TaskFunc> TaskFuncPtr;
public:
    /**
    * @brief 構造函數
    *
    */
    ZERO_ThreadPool();

    /**
    * @brief 析構, 會停止所有執行緒
    */
    virtual ~ZERO_ThreadPool();

    /**
    * @brief 初始化.
    *
    * @param num 工作執行緒個數
    */
    bool init(size_t num);

    /**
    * @brief 獲取執行緒個數.
    *
    * @return size_t 執行緒個數
    */
    size_t getThreadNum()
    {
        std::unique_lock<std::mutex> lock(mutex_);

        return threads_.size();
    }

    /**
    * @brief 獲取當前執行緒池的任務數
    *
    * @return size_t 執行緒池的任務數
    */
    size_t getJobNum()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        return tasks_.size();
    }

    /**
    * @brief 停止所有執行緒, 會等待所有執行緒結束
    */
    void stop();

    /**
    * @brief 啟動所有執行緒
    */
    bool start(); // 創建執行緒

    /**
    * @brief 用執行緒池啟用任務(F是function, Args是參數)
    *
    * @param ParentFunctor
    * @param tf
    * @return 返回任務的future對象, 可以通過這個對象來獲取返回值
    */
    template <class F, class... Args>
    auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

    /**
    * @brief 用執行緒池啟用任務(F是function, Args是參數)
    *
    * @param 超時時間 ,單位ms (為0時不做超時控制) ;若任務超時,此任務將被丟棄
    * @param bind function
    * @return 返回任務的future對象, 可以通過這個對象來獲取返回值
    */
    /*
    template <class F, class... Args>
    它是c++里新增的最強大的特性之一,它對參數進行了高度泛化,它能表示0到任意個數、任意類型的參數
    auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
    std::future<decltype(f(args...))>:返回future,調用者可以通過future獲取返回值
    返回值後置
    */
    template <class F, class... Args>
    auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    {
        int64_t expireTime =  (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 獲取現在時間
        //定義返回值類型
        using RetType = decltype(f(args...));  // 推導返回值
        // 封裝任務
        auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封裝任務指針,設置過期時間
        fPtr->_func = [task]() {  // 具體執行的函數
            (*task)();
        };

        std::unique_lock<std::mutex> lock(mutex_);
        tasks_.push(fPtr);              // 插入任務
        condition_.notify_one();        // 喚醒阻塞的執行緒,可以考慮只有任務隊列為空的情況再去notify

        return task->get_future();;
    }

    /**
    * @brief 等待當前任務隊列中, 所有工作全部結束(隊列無任務).
    *
    * @param millsecond 等待的時間(ms), -1:永遠等待
    * @return           true, 所有工作都處理完畢
    *                   false,超時退出
    */
    bool waitForAllDone(int millsecond = -1);

protected:
    /**
    * @brief 獲取任務
    *
    * @return TaskFuncPtr
    */
    bool get(TaskFuncPtr&task);

    /**
    * @brief 執行緒池是否退出
    */
    bool isTerminate() { return bTerminate_; }

    /**
    * @brief 執行緒運行態
    */
    void run();

protected:

    /**
    * 任務隊列
    */
    queue<TaskFuncPtr> tasks_;

    /**
    * 工作執行緒
    */
    std::vector<std::thread*> threads_;

    std::mutex                mutex_;

    std::condition_variable   condition_;

    size_t                    threadNum_;

    bool                      bTerminate_;

    std::atomic<int>          atomic_{ 0 };
};

#endif // ZERO_THREADPOOL_H

zero_threadpool.cpp

#include "zero_threadpool.h"

ZERO_ThreadPool::ZERO_ThreadPool()
    :  threadNum_(1), bTerminate_(false)
{
}

ZERO_ThreadPool::~ZERO_ThreadPool()
{
    stop();
}

bool ZERO_ThreadPool::init(size_t num)
{
    std::unique_lock<std::mutex> lock(mutex_);

    if (!threads_.empty())
    {
        return false;
    }

    threadNum_ = num;
    return true;
}

void ZERO_ThreadPool::stop()
{
    {
        std::unique_lock<std::mutex> lock(mutex_);

        bTerminate_ = true;

        condition_.notify_all();
    }

    for (size_t i = 0; i < threads_.size(); i++)
    {
        if(threads_[i]->joinable())
        {
            threads_[i]->join();
        }
        delete threads_[i];
        threads_[i] = NULL;
    }

    std::unique_lock<std::mutex> lock(mutex_);
    threads_.clear();
}

bool ZERO_ThreadPool::start()
{
    std::unique_lock<std::mutex> lock(mutex_);

    if (!threads_.empty())
    {
        return false;
    }

    for (size_t i = 0; i < threadNum_; i++)
    {
        threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
    }
    return true;
}

bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
    std::unique_lock<std::mutex> lock(mutex_);

    if (tasks_.empty())
    {
        condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
    }

    if (bTerminate_)
        return false;

    if (!tasks_.empty())
    {
        task = std::move(tasks_.front());  // 使用了移動語義

        tasks_.pop();

        return true;
    }

    return false;
}

void ZERO_ThreadPool::run()  // 執行任務的執行緒
{
    //調用處理部分
    while (!isTerminate()) // 判斷是不是要停止
    {
        TaskFuncPtr task;
        bool ok = get(task);        // 讀取任務
        if (ok)
        {
            ++atomic_;
            try
            {
                if (task->_expireTime != 0 && task->_expireTime  < TNOWMS )
                {
                    //超時任務,是否需要處理?
                }
                else
                {
                    task->_func();  // 執行任務
                }
            }
            catch (...)
            {
            }

            --atomic_;

            //任務都執行完畢了
            std::unique_lock<std::mutex> lock(mutex_);
            if (atomic_ == 0 && tasks_.empty())
            {
                condition_.notify_all();  // 這裡只是為了通知waitForAllDone
            }
        }
    }
}

bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
    std::unique_lock<std::mutex> lock(mutex_);

    if (tasks_.empty())
        return true;

    if (millsecond < 0)
    {
        condition_.wait(lock, [this] { return tasks_.empty(); });
        return true;
    }
    else
    {
        return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });
    }
}


int gettimeofday(struct timeval &tv)
{
#if WIN32
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tv.tv_sec = clock;
    tv.tv_usec = wtm.wMilliseconds * 1000;

    return 0;
#else
    return ::gettimeofday(&tv, 0);
#endif
}

void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX

    int idx = _buf_idx;
    *tv = _t[idx];
    if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc)
    {
        addTimeOffset(*tv, idx);
    }
    else
    {
        TC_Common::gettimeofday(*tv);
    }
#else
    gettimeofday(*tv);
#endif
}

int64_t getNowMs()
{
    struct timeval tv;
    getNow(&tv);

    return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}

main.cpp

#include <iostream>
#include <zero_threadpool.h>
using namespace std;

void func0()
{
    cout << "func0()" << endl;
}

void func1(int a)
{
    cout << "func1() a=" << a << endl;
}

void func2(int a, string b)
{
    cout << "func2() a=" << a << ", b=" << b<< endl;
}


void test1() // 簡單測試執行緒池
{
    ZERO_ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 啟動執行緒池
    // 假如要執行的任務
    threadpool.exec(1000,func0);
    threadpool.exec(func1, 10);
    threadpool.exec(func2, 20, "darren");
    threadpool.waitForAllDone();
    threadpool.stop();
}

int func1_future(int a)
{
    cout << "func1() a=" << a << endl;
    return a;
}

string func2_future(int a, string b)
{
    cout << "func1() a=" << a << ", b=" << b<< endl;
    return b;
}

void test2() // 測試任務函數返回值
{
    ZERO_ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 啟動執行緒池
    // 假如要執行的任務
    std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
    std::future<string> result2 = threadpool.exec(func2_future, 20, "darren");
//  auto result2 = threadpool.exec(func2_future, 20, "darren");

    std::cout << "result1: " << result1.get() << std::endl;
    std::cout << "result2: " << result2.get() << std::endl;
    threadpool.waitForAllDone();
    threadpool.stop();
}

class Test
{
public:
    int test(int i){
        cout << _name << ", i = " << i << endl;
        return i;
    }
    void setName(string name){
        _name = name;
    }
    string _name;
};

void test3() // 測試類對象函數的綁定
{
    ZERO_ThreadPool threadpool;
    threadpool.init(1);
    threadpool.start(); // 啟動執行緒池
    Test t1;
    Test t2;
    t1.setName("Test1");
    t2.setName("Test2");
    auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
    auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
    threadpool.waitForAllDone();
    cout << "t1 " << f1.get() << endl;
    cout << "t2 " << f2.get() << endl;
}
int main()
{
//    test1(); // 簡單測試執行緒池
//    test2(); // 測試任務函數返回值
    test3(); // 測試類對象函數的綁定
    cout << "main finish!" << endl;
    return 0;
}

5異常處理

C++ Core Guidelines (isocpp.github.io)
std::exception_ptr
Make exception_ptr
重點參考:MSVC 中的異常處理

5.1 異常處理基本語法

C++的提供的關於異常的三個關鍵字: try{ throw } catch{ }

#include <stdexcept>
#include <limits>
#include <iostream>
using namespace std;
void MyFunc(int c)
{
if (c > numeric_limits< char> ::max())
throw invalid_argument("throw MyFunc argument too large.");
//...
}
int main()
{
try
{
MyFunc(256); //cause an exception to throw
}
catch (invalid_argument& e)
{
cerr << "catch " << e.what() << endl;
return -1;
}
//...
return 0;
}

try在塊中,如果引發異常,則它將被其類型與異常匹配的第一個關聯catch塊捕獲。 換言之,執行
throw語句跳轉到catch語句。 如果未找到可用的 catch 塊,std::terminate則將調用並退出
程式。 在 c + + 中,可能會引發任何類型;但是,我們建議你引發直接或間接從std::exception派生
的類型。 在上面的示例中,異常類型invalid_argument 在標頭文件的標準庫 中定義。
語法比較簡單:throw(拋出)一個數據,然後再用catch(捕獲)接收。throw的數據類型可以是任意
的,所以當然也可以是一個對象:

struct Test
{
Test(const char* s, int i, double d)
: s(s)
, i(i)
, d(d) {};
const char* s;
int i;
double d;
void print() const
{
printf("%s %d %.2f\n", s, i, d);
}
};
int main()
{
try
{
throw Test("LLF", 520, 13.14);
}
catch (const Test& e)
{
e.print();
}
}

5.2 基本指導原則

強大的錯誤處理對於任何程式語言都很有挑戰性。 儘管異常提供了多個支援良好錯誤處理的功能,但它
們無法為你完成所有工作。 若要實現異常機制的優點,請在設計程式碼時記住異常。

  • 使用斷言來檢查絕不應發生的錯誤。 使用異常來檢查可能出現的錯誤,例如,公共函數參數的輸入
    驗證中的錯誤。 有關詳細資訊,請參閱 異常與斷言 部分。
  • 當處理錯誤的程式碼與通過一個或多個干預函數調用檢測到錯誤的程式碼分離時,使用異常。 當處理錯誤的程式碼與檢測到錯誤的程式碼緊密耦合時,考慮是否使用錯誤程式碼而不是在性能關鍵循環中。
  • 對於可能引發或傳播異常的每個函數,請提供以下三種異常保證之一:強保障、基本保證或nothrow (noexcept) 保證。 有關詳細資訊,請參閱 如何:設計異常安全性
  • 按值引發異常,按引用來捕獲異常。不要捕獲無法處理的內容。
  • 不要使用 c + + 11 中已棄用的異常規範。 有關詳細資訊,請參閱異常規範和 noexcept 部分。
  • 應用時使用標準庫異常類型。 從exception 類層次結構派生自定義異常類型。
  • 不允許對析構函數或記憶體釋放函數進行轉義。

5.3 Exception 類

對上面程式碼的分析,可以看到,發生異常時拋出一個對象而不是一個簡單的數據類型,可以傳遞更多的
錯誤資訊,但是這樣的話,我們需要針對不同的異常情況定義不同的類。有沒有統一的解決方法?
C++給出來了一個標準的異常類Exception。
看一下定義:

/**
* @brief Base class for all library exceptions.
*
* This is the base class for all exceptions thrown by the standard
* library, and by certain language expressions. You are free to derive
* your own %exception classes, or use a different hierarchy, or to
* throw non-class data (e.g., fundamental types).
*/
class exception
{
public:
exception() noexcept { }
virtual ~exception() noexcept;
exception(const exception&) = default;
exception& operator=(const exception&) = default;
exception(exception&&) = default;
exception& operator=(exception&&) = default;
/** Returns a C-style character string describing the general cause
* of the current error. */
virtual const char* what() const noexcept;
};

主要就是定義了一個what的虛函數,返回C_style的字元串,主要作用就是描述發生一場的原因。在使用
的時候,往往需要自定義一個異常類:

#include<exception>
#include<iostream>
using namespace std;
class MyException:public exception
{
public:
const char* what()const throw(){ //throw () 表示不允許任何異常產生
return "ERROR! Don't divide a number by integer zero.\n";
}
};
void check(int y) throw(MyException)
{ //throw (MyException)表示只允許myException的異常發生
if(y==0) throw MyException();
}
int main()
{
int x=100,y=0;
try{
check(y);
cout<<x/y;
}catch(MyException& me){
cout<<me.what();
cout << "finish exception\n";
return -1;
}
cout << "finish ok\n";
return 0;
}

5.4 標準異常擴展

C++定義了一些標準的異常,用於各種場景,他們都是繼承自std::exception的:

下表是對上面層次結構中出現的每個異常的說明:

異常 描述
std::exception 該異常是所有標準 C++ 異常的父類。
std::bad_alloc 該異常可以通過 new 拋出。
std::bad_cast 該異常可以通過 dynamic_cast 拋出。
std::bad_exception 這在處理 C++ 程式中無法預期的異常時非常有用。
std::bad_typeid 該異常可以通過 typeid 拋出。
std::logic_error 理論上可以通過讀取程式碼來檢測到的異常。
std::domain_error 當使用了一個無效的數學域時,會拋出該異常
std::invalid_argument 當使用了無效的參數時,會拋出該異常。
std::length_error 當創建了太長的 std::string 時,會拋出該異常。
std::out_of_range 該異常可以通過方法拋出,例如 std::vector 和std::bitset<>::operator。
std::runtime_error 理論上不可以通過讀取程式碼來檢測到的異常。
std::overflow_error 當發生數學上溢時,會拋出該異常。
std::range_error 當嘗試存儲超出範圍的值時,會拋出該異常。
std::underflow_error 當發生數學下溢時,會拋出該異常。

5.5 std::exception_ptr

根據官方文檔的介紹 std::exception_ptr是一個指向 exception object 的共享智慧指針。
關鍵在於理解 「exception object」 是什麼,是std::exception類的對象嗎?這種理解是不準的,按我的理
解,所謂「exception object」 應該是被throw拋出的對象,根據我們上面的學習,塔既可以是int、double
等簡單的數據類型、也可以是自定義的類對象,當然也可以是std::exception類對象。
有四個操作std::exception_ptr的函數:

//5-5-exception_ptr exception_ptr example
#include <iostream> // std::cout
#include <exception> // std::exception_ptr, std::current_exception,
std::rethrow_exception
#include <stdexcept> // std::logic_error
int main ()
{
std::exception_ptr p;
try {
throw std::logic_error("some logic_error exception"); // throws
} catch(const std::exception& e) {
p = std::current_exception();
std::cout << "exception caught, but continuing...\n";
}
std::cout << "(after exception)\n";
try {
std::rethrow_exception (p);
} catch (const std::exception& e) {
std::cout << "exception caught: " << e.what() << '\n';
}
return 0;
}

  • 首先定義了一個 std::exception_ptr變數p
  • 然後在第一個try中,拋出了一個標準異常(見上)
  • 在第一個catch中,調用 current_exception() ,這樣就讓p指向了捕獲的異常對象
  • 然後在第二個try中,調用 rethrow_exception ,將異常重新拋出
  • 然後在第二個catch中,依然正常的捕獲到了這個異常對象
// 5-5-make_exception_ptr make_exception_ptr example
#include <iostream> // std::cout
#include <exception> // std::make_exception_ptr, std::rethrow_exception
#include <stdexcept> // std::logic_error
int main ()
{
auto p = std::make_exception_ptr(std::logic_error("logic_error"));
try {
std::rethrow_exception (p); // 重新拋出異常
} catch (const std::exception& e) {
std::cout << "exception caught: " << e.what() << '\n'; // 捕獲異常
}
return 0;
}

首先創建了一個異常make_exception_ptr
然後再try中拋出該異常
接著在catch捕獲拋出的異常。

//5-5-nested_exception nested_exception example
#include <iostream> // std::cerr
#include <exception> // std::exception, std::throw_with_nested,
std::rethrow_if_nested
#include <stdexcept> // std::logic_error
// recursively print exception whats:
void print_what (const std::exception& e)
{
std::cout << __FUNCTION__ << ", L"<< __LINE__ << ", what:" << e.what() <<
'\n';
try {
std::rethrow_if_nested(e);
} catch (const std::exception& nested) {
std::cerr << "nested: ";
print_what(nested);
}
}
// throws an exception nested in another:
void throw_nested()
{
try {
throw std::logic_error ("first");
} catch (const std::exception& e) {
std::throw_with_nested(std::logic_error("second"));
}
}
int main ()
{
try {
std::cout << __FUNCTION__ << ", L"<< __LINE__ << std::endl;
throw_nested();
} catch (std::exception& e) {
std::cout << __FUNCTION__ << ", L"<< __LINE__ << std::endl;
print_what(e);
}
return 0;
}

推薦一個零聲學院免費教程,個人覺得老師講得不錯,
分享給大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,
TCP/IP,協程,DPDK等技術內容,點擊立即學習:
伺服器
音影片
dpdk
Linux內核