現代 C++ 對多執行緒/並發的支援(上) — 節選自 C++ 之父的 《A Tour of C++》

本文翻譯自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅(A Tour of C++)一書的第 13 章 Concurrency。用短短數十頁,帶你一窺現代 C++ 對並發/多執行緒的支援。原文地址:現代 C++ 對多執行緒/並發的支援(上) — 節選自 C++ 之父的 《A Tour of C++》 水平有限,難以用另一種語言原汁原味地還原,有條件的建議直接閱讀原版書籍。

13 並發

13.1 介紹

並發,即同時執行多個任務,常用來提高吞吐量(通過利用多處理器進行同一個計算)或者改善響應性(等待回復的時候,允許程式的其他部分繼續執行)。所有現代語言都支援並發。C++ 標準庫提供了可移植、類型安全的並發支援,經過 20 多年的發展,已經幾乎被所有現代硬體所支援。標準庫提供的主要是系統級的並發支援,而非複雜的、更高層次的並發模型;其他庫可以基於標準庫,提供更高級別的並發支援。

C++ 提供了適當的記憶體模型(memory model)和一組原子操作(atomic operation),以支援在同一地址空間內並發執行多個執行緒。原子操作使得無鎖編程成為可能。記憶體模型保證了在避免數據競爭(data races,不受控地同時訪問可變數據)的前提下,一切按照預期工作。

本章將給出標準庫對並發的主要支援示例:threadmutexlock()packaged_task 以及 future。這些特徵直接基於作業系統構建,相較於作業系統原生支援,不會帶來性能損失,也不保證會有顯著的性能提升。

那為什麼要用標準庫而非作業系統的並發?可移植性。

不要把並發當作靈丹妙藥:如果順序執行可以搞定,通常順序會比並發更簡單、更快速!

13.2 任務和執行緒

如果一個計算有可能(potentially)和另一個計算並發執行,我們稱之為任務(task)。執行緒是任務的系統級表示。任務可以通過構造一個 std::thread 來啟動,任務作為參數。任務是一個函數或者函數對象

void f();              // 函數

struct F {             // 函數對象
    void operator()()  // F 的調用操作符
};

void user()
{
    thread t1 {f};     // f() 在另一個執行緒中執行
    thread t2 {F()};   // F()() 在另一個執行緒中執行

    t1.join();  // 等待 t1
    t2.join();  // 等待 t2
}

join() 確保執行緒完成後才退出 user(),「join 執行緒」的意思是「等待執行緒結束」。

一個程式的執行緒共享同一地址空間。執行緒不同於進程,進程通常不直接共享數據。執行緒間可以通過共享對象(shared object)通訊,這類通訊一般用鎖或其他機制控制,以避免數據競爭。

編寫並發任務可能會非常棘手:

void f() {cout << "Hello ";}
struct F {
    void operator()() {cout << "Parallel World!\n";}
};

上述例子有個錯誤:fF() 都用到了 cout 對象,卻沒有任何形式的同步。這會導致輸出的結果不可預測,多次執行的結果可能會得到不同的結果:因為兩個任務的執行順序是未定義的。程式可能有詭異的輸出,比如:

PaHerallllel o World!

定義一個並發程式中的任務時,我們的目標是保持任務之間的完全獨立,除非他們之間僅僅是簡單、明顯的通訊。最簡單的方法就是把並發任務看作是一個恰巧可以和調用者同時運行的函數:我們只要傳遞參數、取回結果,保證該過程中沒有使用共享數據(沒有數據競爭)即可。

13.3 傳遞參數

一般來說,任務需要基於數據執行。我們可以通過參數傳遞數據(或者數據的指針或引用)。

void f(vector<double>& v); // 處理 v 的函數

struct F {                 // 處理 v 的函數對象
    vector<double>& v;
    F(vector<double>& vv) : v(vv) {}
    void operator()();
};

int main()
{
    vector<double> some_vec{1,2,3,4,5,6,7,8,9};
    vector<double> vec2{10,11,12,13,14};

    thread t1{f,ref(some_vec)}; // f(some_vec) 在另一個執行緒中執行
    thread t2{F{vec2}};         // F{vec2}() 在另一個執行緒中執行

    t1.join();
    t2.join();
}

F{vec2}F 中保存了參數 vector 的引用。F 現在可以使用這個 vector。但願在 F 執行時,沒有其他任務訪問 vec2。如果通過值傳遞 vec2 就可以消除這個隱患。

t1 通過 {f,ref(some_vec)} 初始化,用到了 thread可變參數模板構造,可以接受任意序列的參數。ref() 是來自 <functional> 的類型函數。為了讓可變參數模板把 some_vec 當作一個引用而非對象,ref() 不能省略。編譯器檢查第一個參數可以通過其後面的參數調用,並構建必要的函數對象,傳遞給執行緒。如果 F::operator()()f() 執行了相同的演算法,兩個任務的處理幾乎是等同的:兩種情況下,都各自構建了一個函數對象,讓 thread 去執行。

可變參數模板需要用 ref()cref() 傳遞引用

13.4 返回結果

13.3 的例子中,我傳了一個非 const 的引用。我只有在期待任務修改引用數據的值時才這麼做。儘管這是一種很常見的獲取返回結果的方式,但這麼做並不能向他人傳達清晰、明確的意圖,有些偷偷摸摸暗中操作的感覺。一種不那麼晦澀的方式是通過 const 引用傳遞輸入數據,通過另外單獨的參數傳遞儲存結果的指針。

void f(vector<double>& v, double *res); // 從 v 獲取輸入; 結果存入 *res

class F {
public:
    F(vector<double>& vv, double *p) : v(vv), res(p) {}
    void operator()();  // 結果保存到 *res

private:
    vector<double>& v;  // 輸入源
    double *p;          // 輸出地址
};

int main()
{
    vector<double> some_vec;
    vector<double> vec2;

    double res1;
    double res2;

    thread t1{f,ref(some_vec),&res1}; // f(some_vec,&res1) 在另一個執行緒中執行
    thread t2{F{vec2,&res2}};         // F{vec2,&res2}() 在另一個執行緒中執行

    t1.join();
    t2.join();
}

這樣行得通,也很常用。但我不覺得通過參數傳遞返回結果有多優雅,我會在 13.7.1 節再次討論這個話題。

通過參數(出參)傳遞結果並不優雅

13.5 共享數據

有時任務需要共享數據,這種情況下,對共享數據的訪問需要進行同步,同一時刻只能有一個任務訪問數據(但是多任務同時讀取不變的數據是沒有問題的)。我們要考慮如何保證在同一時刻最多只有一個任務能夠訪問一組對象。

解決這個問題的基本元素是 mutex(mutual exclusion object,互斥對象)。thread 通過 lock() 獲取 mutex

int shared_data;
mutex m;          // 用於控制 shared_data 的 mutex

void f()
{
    unique_lock<mutex> lck{m};  // 獲取 mutex
    shared_data += 7;           // 操作共享數據
}   // 離開 f() 作用域,隱式自動釋放 mutex

unique_lock 的構造函數通過 m.lock() 獲取 mutex。如果另一個執行緒已經獲取這個 mutex,當前執行緒等待(阻塞)直到另一個執行緒(通過 m.unlock())釋放該 mutex。當 mutex 釋放,等待該 mutex 的執行緒恢復執行(喚醒)。互斥、鎖聲明在 <mutex> 頭文件中。

共享數據和 mutex 之間的關聯需要達成一致:程式設計師需要知道哪個 mutex 對應哪個數據。這樣很容易出錯,但是我們可以通過一些方式使得他們之間的關係更清晰:

class Record {
public:
    mutex rm;
};

不難猜到,對於一個 Record 對象 rec,在訪問 rec 其他數據之前,你應該先獲取 rec.rm。最好通過注釋或者良好的命名讓讀者清楚地知道 mutex 和數據的關聯。

有時執行某些操作需要同時訪問多個資源,有可能導致死鎖。例如,thread1 已經獲取了 mutex1,然後嘗試獲取 mutex2;與此同時,thread2 已經獲取 mutex2,嘗試獲取 mutex1。在這種情況下,兩個任務都無法進行下去。標準庫支援同時獲取多個鎖:

void f()
{
    unique_lock<mutex> lck1{m1,defer_lock};  // defer_lock:不立即獲取 mutex
    unique_lock<mutex> lck2{m2,defer_lock};
    unique_lock<mutex> lck3{m3,defer_lock};

    lock(lck1,lck2,lck3);
    // 操作共享數據
}   // 離開 f() 作用域,隱式自動釋放所有 mutexes

lock() 只有在獲取所有參數里的的 mutex 之後繼續執行,並且在其持有 mutex 期間,不會阻塞(go to sleep)。每個 unique_lock 的析構會確保離開作用域時,自動釋放所有的 mutex。

通過共享數據通訊是相對底層的操作。編程人員要設計一套機制,弄清楚哪些任務完成了哪些工作,還有哪些未完成。從這個角度看, 使用共享數據不如直接調用函數、返回結果。另一方面,有些人認為共享數據比拷貝參數和返回值效率更高。這個觀點可能在涉及大量數據的時候成立,但是 locking 和 unlocking 也是相對耗時的操作。不僅如此,現代電腦很擅長拷貝數據,尤其是像 vector 這種連續存儲的元素。所以,不要僅僅因為「效率」而選用共享數據進行通訊,除非你真正實際測量過。

13.6 等待事件

有時執行緒需要等待外部事件,比如另一個執行緒完成了任務或者經過了一段時間。最簡單的事件是時間。藉助 <chrono>,可以寫出:

using namespace std::chrono;
auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();

cout << duration_cast<nanoseconds>(t1-t0).count() << " nanoseconds passed\b";

注意,我甚至沒有啟動一個執行緒;默認情況下,this_thread 指當前唯一的執行緒。

我用 duration_cast 把時間單位轉成了我想要的 nanoseconds。

condition_variable(定義在 <condition_variable>)提供了對通過外部事件通訊的支援。condition_variable 允許一個執行緒等待另一個執行緒,尤其是允許一個執行緒等待某個(由於其他執行緒工作結束)條件/事件發生。

condition_variable 支援很多優雅、高效的共享形式,但也可能會很棘手。考慮一個經典的生產者-消費者例子:兩個執行緒通過一個隊列傳遞消息:

class Message { /**/ }; // 通訊的對象

queue<Message> q;       // 消息隊列
condition_variable cv;  // 傳遞事件的變數
mutex m;                // locking 機制

queuecondition_variable 以及 mutex 由標準庫提供。

消費者讀取並處理 Message

void consumer()
{
    while(true){
        unique_lock<mutex> lck{m}; // 獲取 mutex m
        cv.wait(lck);              // 釋放 lck,等待
                                   // 喚醒時重新獲得 lck
        auto m = q.front();        // 取出 Message m
        q.pop();
        lck.unlock();              // 後續處理消息不再操作隊列 q,提前釋放 lck
        // 處理 m
    }
}

這裡我顯式地用 unique_lock<mutex> 保護 queuecondition_variable 上的操作。condition_variable 上的 cv.wait(lck) 會釋放參數中的鎖 lck,直到等待結束(隊列非空),然後再次獲取 lck

相應的生產者程式碼:

void producer()
{
    while(true) {
        Message m;
        // 填充 m
        unique_lock<mutex> lck{m}; // 保護操作
        q.push(m);
        cv.notify_one();           // 通知
    } // 作用域結束自動釋放鎖
}

到目前為止,不論是 thread、mutex、lock 還是 condition_variable,都還是低層次的抽象。接下來我們馬上就能看到 C++ 對並發的高級抽象支援。

13.7 通訊任務

標準庫還提供了一些機制,能夠讓程式設計師在更高的任務的概念層次上工作,而不是直接使用低層的執行緒、鎖:

  1. futurepromise:用於從另一個執行緒生成的任務中返回值
  2. packaged_task:幫助啟動任務,封裝了 futurepromise,並且建立兩者之間的關聯
  3. async():像調用一個函數那樣啟動一個任務。形式最簡單,但也最強大!

上述機制在頭文件 <future> 中。

篇幅有點長,先到這裡,餘下的內容單獨寫一篇:現代 C++ 對多執行緒/並發的支援(下) — 節選自 C++ 之父的 《A Tour of C++》

13.7.1 future 和 promise

13.7.2 packaged_task

13.7.3 async()

13.8 建議