簡單的線程池(八)

概要

筆者根據 Anthony Williams 在《C++並發編程實戰》中所述,

某個工作線程在任務隊列的頭部操作,而其它工作線程在任務隊列的尾部操作。這實際上意味着這個隊列對於擁有線程來說是一個後進先出的棧。最近被放到隊列中的任務會最先被取出來執行。從緩存的角度來說這可以提高性能,因為對比之前被放入隊列中的任務,被取出的任務的數據更加有可能在緩存中。

嘗試調整了 《簡單的線程池(七)》 中提及的非阻塞互助式線程池的實現方案,用 std::deque 替換 std::queue 作為存儲工作任務的底層數據結構,並新定義非阻塞線程安全的雙端隊列 Lockwise_Deque<>。

工作任務始終被添加至雙端隊列的尾部。為了驗證從緩存的角度是否可以提高性能,筆者用四種方式實現工作任務的獲取,

  • 當前工作線程從自己的任務隊列的頭部獲取任務,如無工作任務則從其他任務隊列的頭部獲取任務(A);

  • 當前工作線程從自己的任務隊列的尾部獲取任務,如無工作任務則從其他任務隊列的頭部獲取任務(B);

  • 當前工作線程從自己的任務隊列的頭部獲取任務,如無工作任務則從其他任務隊列的尾部獲取任務(C);

  • 當前工作線程從自己的任務隊列的尾部獲取任務,如無工作任務則從其他任務隊列的尾部獲取任務(D)。

A 方式即為最初的非阻塞互助式,差別在於底層數據結構由 std::queue 更改為 std::deque;B 方式即按照 Anthony 所述的實現;C 和 D 是筆者為了全面確認緩存對性能的影響而增加的方式。

本文不再贅訴與 《簡單的線程池(七)》 相同的內容。如有不明之處,請參考該博客。

實現

以下代碼給出了非阻塞線程安全雙端隊列的實現,(lockwise_deque.h)

template<class T>
class Lockwise_Deque {

  private:
    struct Spinlock_Mutex { ...
    
    } mutable _m_;
    deque<T> _q_;

  public:
    void push(T&& element) {
        lock_guard<Spinlock_Mutex> lk(_m_);
        _q_.push_back(std::move(element));        // #1
    }

    bool pop(T& element) {
        lock_guard<Spinlock_Mutex> lk(_m_);
        if (_q_.empty())
            return false;
        element = std::move(_q_.front());         // #2
        _q_.pop_front();
        return true;
    }

    bool pull(T& element) {
        lock_guard<Spinlock_Mutex> lk(_m_);
        if (_q_.empty())
            return false;
        element = std::move(_q_.back());          // #3
        _q_.pop_back();
        return true;
    }

    ...

};

push(T&&) 函數實現從雙端隊列尾部添加任務(#1),pop(T&) 函數實現從雙端隊列頭部獲取任務(#2),pull(T&) 函數實現從雙端隊列尾部獲取任務(#3)。

以下代碼給出 A 方式的實現,(lockwise_mutual_2a_pool.h)

class Thread_Pool {

   ...

	void work(unsigned index) {
	    Task_Wrapper task;
	    while (!_done_.load(memory_order_acquire)) {
	        if (_workerqueues_[index].pop(task))        // #1
	            task();
	        else
	            for (unsigned i = 0; i < _workersize_; ++i)
	                if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) {      // #2
	                    task();
	                    break;
	                }
	        while (_suspend_.load(memory_order_acquire))
	            std::this_thread::yield();
	    }
	}
	
	...
	
}

當前工作線程從自己的任務隊列的頭部獲取工作任務(#1),如無工作任務則從其他任務隊列的頭部獲取任務(#2)。

以下代碼給出 B 方式的實現,(lockwise_mutual_2b_pool.h)

class Thread_Pool {

   ...

	void work(unsigned index) {
	    Task_Wrapper task;
	    while (!_done_.load(memory_order_acquire)) {
	        if (_workerqueues_[index].pull(task))        // #1
	            task();
	        else
	            for (unsigned i = 0; i < _workersize_; ++i)
	                if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) {      // #2
	                    task();
	                    break;
	                }
	        while (_suspend_.load(memory_order_acquire))
	            std::this_thread::yield();
	    }
	}
	
	...
	
}

當前工作線程從自己的任務隊列的尾部獲取工作任務(#1),如無工作任務則從其他任務隊列的頭部獲取任務(#2)。

以下代碼給出 C 方式的實現,(lockwise_mutual_2c_pool.h)

class Thread_Pool {

   ...

	void work(unsigned index) {
	    Task_Wrapper task;
	    while (!_done_.load(memory_order_acquire)) {
	        if (_workerqueues_[index].pop(task))        // #1
	            task();
	        else
	            for (unsigned i = 0; i < _workersize_; ++i)
	                if (_workerqueues_[(index + i + 1) % _workersize_].pull(task)) {      // #2
	                    task();
	                    break;
	                }
	        while (_suspend_.load(memory_order_acquire))
	            std::this_thread::yield();
	    }
	}
	
	...
	
}

當前工作線程從自己的任務隊列的頭部獲取工作任務(#1),如無工作任務則從其他任務隊列的尾部獲取任務(#2)。

以下代碼給出 D 方式的實現,(lockwise_mutual_2d_pool.h)

class Thread_Pool {

    ...

    void work(unsigned index) {
	    Task_Wrapper task;
	    while (!_done_.load(memory_order_acquire)) {
	        if (_workerqueues_[index].pull(task))        // #1
	            task();
	        else
	            for (unsigned i = 0; i < _workersize_; ++i)
	 	            if (_workerqueues_[(index + i + 1) % _workersize_].pull(task)) {      // #2
	                    task();
	                    break;
	                }
	        while (_suspend_.load(memory_order_acquire))
	            std::this_thread::yield();
	    }
	}
	
	...
	
}

當前工作線程從自己的任務隊列的尾部獲取工作任務(#1),如無工作任務則從其他任務隊列的尾部獲取任務(#2)。

邏輯

以下類圖展現了此線程池的代碼主要邏輯結構。

線程池用戶提交任務與工作線程執行任務的並發過程與 《簡單的線程池(一)》 中的一致,此處略。

驗證

驗證過程採用了 《簡單的線程池(三)》 中定義的的測試用例。筆者對比了測試結果與 《簡單的線程池(七)》 的數據,結果如下,

圖1 列舉了 吞吐量1的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

【注】四種非阻塞線程安全的雙端隊列略稱為 LM2A、LM2B、LM2C 和 LM2D,下同。

圖1

圖2 列舉了 吞吐量2的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

圖2

圖3 列舉了 吞吐量3的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

圖3

可以看到,B 方式的非阻塞互助式線程池與 A、C、D 方式的以及普通隊列的非阻塞互助式在吞吐量上的表現近乎一致。

基於以上的對比分析,筆者認為,在非阻塞式線程池中採用雙端隊列,

從緩存的角度來說這可以提高性能

這樣的論述是值得討論的。

最後

完整的代碼示例和測試數據請參考 [github] cnblogs/15723078

作者參考了 C++並發編程實戰 / (美)威廉姆斯 (Williams, A.) 著; 周全等譯. – 北京: 人民郵電出版社, 2015.6 (2016.4重印) 一書中的部分設計思路。致 Anthony Williams、周全等譯者。