簡單的執行緒池(九)

◆ 概要

本文中,筆者嘗試組合了非阻塞式和阻塞式的隊列,成為新的組合式執行緒池。執行緒池有一個共享任務隊列,每個工作執行緒各有一個工作任務隊列。執行緒池用戶提交的任務,先被保存在共享任務隊列中。執行緒池的調度器執行緒將共享任務隊列中的任務分派給工作執行緒的工作任務隊列,工作執行緒從工作任務隊列中獲取任務並執行。

gist
【注】圖中 * 表示工作執行緒獲取任務的方式會因工作任務隊列類型的不同而變化。

筆者對共享任務隊列和工作任務隊列採用不同的類型,組合成了三種方案,

No 共享任務隊列類型 工作任務隊列類型
1 阻塞 阻塞獨佔
2 阻塞 非阻塞互助
3 阻塞 非阻塞互助2B

以下關於此執行緒池的說明中,將簡述與 阻塞共享任務隊列阻塞獨佔任務隊列非阻塞互助任務隊列非阻塞互助2B任務隊列 的內容。如有不明之處,請先參考鏈接對應的博文。

◆ 實現

以下程式碼給出了方案一的實現,(blocking_shared_blocking_unique_pool.h)

class Thread_Pool {

  private:

    struct Task_Wrapper { ...

    };

    atomic<bool> _suspend_;
    atomic<bool> _done_;
    Blocking_Queue<Task_Wrapper> _poolqueue_;          // #1
    thread _scheduler_;                           // #3
    unsigned _workersize_;
    thread* _workers_;
    Blocking_Queue<Task_Wrapper>* _workerqueues_;     // #2

    void work(unsigned index) {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            _workerqueues_[index].pop(task);              // #7
            task();
            while (_suspend_.load(memory_order_acquire))
                std::this_thread::yield();
        }
    }

    void stop() {
        size_t remaining = 0;
        _suspend_.store(true, memory_order_release);
        remaining = _poolqueue_.size();                  // #8
        for (unsigned i = 0; i < _workersize_; ++i)
            remaining += _workerqueues_[i].size();
        _suspend_.store(false, memory_order_release);
        while (!_poolqueue_.empty())
            std::this_thread::yield();
        for (unsigned i = 0; i < _workersize_; ++i)
            while (!_workerqueues_[i].empty())
                std::this_thread::yield();
        std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
        _done_.store(true, memory_order_release);
        _poolqueue_.push([] {});                            // #9
        for (unsigned i = 0; i < _workersize_; ++i)
            _workerqueues_[i].push([] {});
        for (unsigned i = 0; i < _workersize_; ++i)
            if (_workers_[i].joinable())
                _workers_[i].join();
        if (_scheduler_.joinable())
            _scheduler_.join();
        delete[] _workers_;
        delete[] _workerqueues_;
    }

    void schedule() {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            _poolqueue_.pop(task);                                        // #6
            _workerqueues_[rand() % _workersize_].push(std::move(task));
        }
    }

  public:
    Thread_Pool() : _suspend_(false), _done_(false) {
        try {
            _workersize_ = thread::hardware_concurrency();
            _workers_ = new thread[_workersize_]();
            _workerqueues_ = new Blocking_Queue<Task_Wrapper>[_workersize_]();     // #4
            for (unsigned i = 0; i < _workersize_; ++i)
                _workers_[i] = thread(&Thread_Pool::work, this, i);
            _scheduler_ = thread(&Thread_Pool::schedule, this);           // #5
        } catch (...) { ...
        
        }
    }

    ...

};

執行緒池中定義了阻塞式的共享任務隊列(#1)、阻塞式的工作任務隊列(#2)和調度器執行緒(#3)。執行緒池對象被創建時,它們一併被初始化(#4、#5)。執行緒池的調度器執行緒將共享任務隊列中的任務分派給工作任務隊列(#6),工作執行緒從各自的工作任務隊列的頭部獲取任務並執行(#7)。在統計剩餘的工作任務時,合計共享任務隊列和工作任務隊列中剩餘的任務(#8)。為了避免發生死鎖問題,向共享任務隊列和每個工作任務隊列中各放入一個假任務(#9),確保調度器執行緒和各個工作執行緒都能退出循環等待。

以下程式碼給出了方案二的實現,(blocking_shared_lockwise_mutual_pool.h)

class Thread_Pool {

  private:

    struct Task_Wrapper { ...

    };

    atomic<bool> _suspend_;
    atomic<bool> _done_;
    Blocking_Queue<Task_Wrapper> _poolqueue_;          // #1
    thread _scheduler_;                           // #3
    unsigned _workersize_;
    thread* _workers_;
    Lockwise_Queue<Task_Wrapper>* _workerqueues_;     // #2

    void work(unsigned index) {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            if (_workerqueues_[index].pop(task))
                task();
            else
                for (unsigned i = 0; i < _workersize_; ++i)
                    if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) {        // #7
                        task();
                        break;
                    }
            while (_suspend_.load(memory_order_acquire))
                std::this_thread::yield();
        }
    }

    void stop() {
        size_t remaining = 0;
        _suspend_.store(true, memory_order_release);
        remaining = _poolqueue_.size();                  // #8
        for (unsigned i = 0; i < _workersize_; ++i)
            remaining += _workerqueues_[i].size();
        _suspend_.store(false, memory_order_release);
        while (!_poolqueue_.empty())
            std::this_thread::yield();
        for (unsigned i = 0; i < _workersize_; ++i)
            while (!_workerqueues_[i].empty())
                std::this_thread::yield();
        std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
        _done_.store(true, memory_order_release);
        _poolqueue_.push([] {});                            // #9
        for (unsigned i = 0; i < _workersize_; ++i)
            if (_workers_[i].joinable())
                _workers_[i].join();
        if (_scheduler_.joinable())
            _scheduler_.join();
        delete[] _workers_;
        delete[] _workerqueues_;
    }

    void schedule() {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            _poolqueue_.pop(task);                                        // #6
            _workerqueues_[rand() % _workersize_].push(std::move(task));
        }
    }

  public:
    Thread_Pool() : _suspend_(false), _done_(false) {
        try {
            _workersize_ = thread::hardware_concurrency();
            _workers_ = new thread[_workersize_]();
            _workerqueues_ = new Lockwise_Queue<Task_Wrapper>[_workersize_]();     // #4
            for (unsigned i = 0; i < _workersize_; ++i)
                _workers_[i] = thread(&Thread_Pool::work, this, i);
            _scheduler_ = thread(&Thread_Pool::schedule, this);           // #5
        } catch (...) { ...
        }
    }

    ...

};

執行緒池中定義了阻塞式的共享任務隊列(#1)、非阻塞互助式的工作任務隊列(#2)和調度器執行緒(#3)。執行緒池對象被創建時,它們一併被初始化(#4、#5)。執行緒池的調度器執行緒將共享任務隊列中的任務分派給工作任務隊列(#6),工作執行緒從各自的工作任務隊列的頭部獲取任務並執行。當自己的工作任務隊列中無任務時,此工作執行緒會從其他工作執行緒的工作任務隊列頭部獲取任務(#7)。在統計剩餘的工作任務時,合計共享任務隊列和工作任務隊列中剩餘的任務(#8)。為了避免發生死鎖問題,向共享任務隊列中放入一個假任務(#9),確保調度器執行緒能退出循環等待。

以下程式碼給出了方案三的實現,(blocking_shared_lockwise_mutual_2b_pool.h)

class Thread_Pool {

  private:

    struct Task_Wrapper { ...

    };

    atomic<bool> _suspend_;
    atomic<bool> _done_;
    Blocking_Queue<Task_Wrapper> _poolqueue_;          // #1
    thread _scheduler_;                           // #3
    unsigned _workersize_;
    thread* _workers_;
    Lockwise_Deque<Task_Wrapper>* _workerqueues_;     // #2

    void work(unsigned index) {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            if (_workerqueues_[index].pull(task))
                task();
            else
                for (unsigned i = 0; i < _workersize_; ++i)
                    if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) {        // #7
                        task();
                        break;
                    }
            while (_suspend_.load(memory_order_acquire))
                std::this_thread::yield();
        }
    }

    void stop() {
        size_t remaining = 0;
        _suspend_.store(true, memory_order_release);
        remaining = _poolqueue_.size();                  // #8
        for (unsigned i = 0; i < _workersize_; ++i)
            remaining += _workerqueues_[i].size();
        _suspend_.store(false, memory_order_release);
        while (!_poolqueue_.empty())
            std::this_thread::yield();
        for (unsigned i = 0; i < _workersize_; ++i)
            while (!_workerqueues_[i].empty())
                std::this_thread::yield();
        std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
        _done_.store(true, memory_order_release);
        _poolqueue_.push([] {});                            // #9
        for (unsigned i = 0; i < _workersize_; ++i)
            if (_workers_[i].joinable())
                _workers_[i].join();
        if (_scheduler_.joinable())
            _scheduler_.join();
        delete[] _workers_;
        delete[] _workerqueues_;
    }

    void schedule() {
        Task_Wrapper task;
        while (!_done_.load(memory_order_acquire)) {
            _poolqueue_.pop(task);                                        // #6
            _workerqueues_[rand() % _workersize_].push(std::move(task));
        }
    }

  public:
    Thread_Pool() : _suspend_(false), _done_(false) {
        try {
            _workersize_ = thread::hardware_concurrency();
            _workers_ = new thread[_workersize_]();
            _workerqueues_ = new Lockwise_Deque<Task_Wrapper>[_workersize_]();     // #4
            for (unsigned i = 0; i < _workersize_; ++i)
                _workers_[i] = thread(&Thread_Pool::work, this, i);
            _scheduler_ = thread(&Thread_Pool::schedule, this);           // #5
        } catch (...) { ...
        }
    }
    
    ...

};

執行緒池中定義了阻塞式的共享任務隊列(#1)、非阻塞互助2B式的工作任務隊列(#2)和調度器執行緒(#3)。執行緒池對象被創建時,它們一併被初始化(#4、#5)。執行緒池的調度器執行緒將共享任務隊列中的任務分派給工作任務隊列(#6),工作執行緒從各自的工作任務隊列的尾部獲取任務並執行。當自己的工作任務隊列中無任務時,此工作執行緒會從其他工作執行緒的工作任務隊列的頭部獲取任務(#7)。在統計剩餘的工作任務時,合計共享任務隊列和工作任務隊列中剩餘的任務(#8)。為了避免發生死鎖問題,向共享任務隊列中放入一個假任務(#9),確保調度器執行緒能退出循環等待。

◆ 邏輯

以下類圖和順序圖分別展現了方案一的主要邏輯結構以及執行緒池用戶提交任務與調度器執行緒、工作執行緒執行任務的並發過程,

[注] 圖中用構造型(stereotype)標識出調度器執行緒和工作執行緒的初始函數,並在註解中加以說明調用關係,下同。

以下為方案二的邏輯,

以下為方案三的邏輯,

◆ 驗證

驗證過程採用了 《簡單的執行緒池(三)》 中定義的的測試用例。筆者對比了測試結果與 《簡單的執行緒池(八)》 的數據,結果如下,

圖1 列舉了 吞吐量1的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

【注】三種組合方案分別略稱為 BSBU、BSLM、BSLM2B,下同。

圖1

可以看到,

  • 當思考時間為 0 時,BSBU、BSLM、BSLM2B 的吞吐量明顯劣於其它類型的吞吐量;延長提交周期後,這種差異沒有發生明顯變化;
  • 當思考時間不為 0 時,BSBU、BSLM、BSLM2B 的吞吐量相當於 BS、BU 的吞吐量,優於其它類型的吞吐量,但差異不會因提交周期的延長而變化;隨著思考時間的增加,BSBU、BSLM、BSLM2B 的吞吐量與其它類型的吞吐量差異逐漸消失。

圖2 列舉了 吞吐量2的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

圖2

可以看到,

  • 當思考時間為 0 時,BSBU、BSLM、BSLM2B 的吞吐量明顯劣於 LM 和 LM2系列 的吞吐量,略優於 BS、BU 的吞吐量;延長提交周期後,這種差異沒有發生大幅變化;
  • 當思考時間不為 0 時,沒有可供比較的基礎數據。

圖3 列舉了 吞吐量3的差異 在 0.5 分鐘、1 分鐘和 3 分鐘的提交周期內不同思考時間上的對比。

圖3

  • 當思考時間為 0 時,BSBU、BSLM、BSLM2B 的吞吐量明顯劣於其它類型的吞吐量;延長提交周期後,這種差異沒有發生明顯變化;
  • 當思考時間不為 0 時,BSBU、BSLM、BSLM2B 的吞吐量相當於 BS、BU 的吞吐量,優於其它類型的吞吐量,但差異不會因提交周期的延長而變化;隨著思考時間的增加,BSBU、BSLM、BSLM2B 的吞吐量與其它類型的吞吐量差異逐漸消失。

基於以上的對比分析,筆者認為,

  • 組合式的吞吐能力偏向於阻塞式的;
  • 組合式的方案沒有改進吞吐能力,在應對思考時間為 0 的場合明顯劣於其它類型的。

◆ 最後

完整的程式碼示例和測試數據請參考 [github] cnblogs/15754987