c++ 跨平台執行緒同步對象那些事兒——基於 ace

前言

ACE (Adaptive Communication Environment) 是早年間很火的一個 c++ 開源通訊框架,當時 c++ 的庫比較少,以至於談 c++ 網路通訊就繞不開 ACE,隨著後來 boost::asio / libevent / libev … 等專門解決通訊框架的庫像雨後春筍一樣冒出來,ACE 就漸漸式微了。特別是它雖然號稱是通訊框架,實則把各個平台的基礎設施都封裝了一個遍,導致想用其中一個部分,也牽一髮而動全身的引入了一堆其它的不相關的部分,雖然用起來很爽,但是耦合度太強,學習曲線過於陡峭,以至於坊間流傳一種說法:ACE 適合學習,不適合快速上手做項目。所以後來也就慢慢淡出了人們的視線,不過對於一個真的把它拿來學習的人來說,它的一些設計思想還是不錯的,今天就以執行緒同步對象為例,說一下「史上最全」的 ACE 是怎麼封裝的,感興趣的同學可以和標準庫、boost 或任意什麼跨平台庫做個對比,看看它是否當得起這個稱呼。

互斥量

互斥量主要就是指各種 mutex 了,依據 mutex 的各種特性,又細分為以下幾類:

ACE_Thread_Mutex

這個主要是做進程內多執行緒同步的,底層類型為 ACE_thread_mutex_t,這個類型在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ACE_thread_mutex_t CRITICAL_SECTION pthread_mutex_t mutex_t SEM_ID int
init InitializeCriticalSection pthread_mutex_init mutex_init semMCreate n/a
acquire EnterCriticalSection pthread_mutex_lock mutex_lock semTake (..WAIT_FOREVER..) n/a
acquire (..time..) n/a pthread_mutex_timedlock n/a semTake (..time..) n/a
tryacquire TryEnterCriticalSection pthread_mutex_trylock mutex_trylock semTake (..NOWAIT..) n/a
release LeaveCriticalSection pthread_mutex_unlock mutex_unlock semGive n/a
remove DeleteCriticalSection pthread_mutex_destroy mutex_destroy semDelete n/a

 

對於上面的表做個簡單說明:

  • windows 上就是使用臨界區來做執行緒級別的互斥量;
  • unix like 一般都支援 pthread,例如 AIX / HPUX / IRIX / LYNXOS / MACOSX / UNIXWARE / OPENBSD / FREEBSD ……,如果不支援 pthread,則不在此列;
  • Solaris 有自己的執行緒庫,不使用 pthread;
  • VxWorks 實時作業系統只有一個進程,可以有多個執行緒 (任務),所以這裡使用的是進程級別的同步對象來模擬,具體就是訊號燈 (SEM_ID);
  • 對於沒有 mutex 支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)。

另外由於執行緒同步對象沒有對讀寫做分離,所以 acquire_read / acquire_write / tryacquire_read / tryacquire_write 均使用默認的 acquire / tryacquire 來實現。帶超時參數的 acquire 重載,在有些平台並不被支援,例如 windows 和 Solaris。

ACE_Recursive_Thread_Mutex

與 ACE_Thread_Mutex 相比,增加了已鎖定執行緒再次加鎖的能力 (遞歸進入不死鎖)。底層類型為 ACE_recursive_thread_mutex_t,與它相關的一些設施列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ACE_recursive_thread_mutex_t CRITICAL_SECTION pthread_mutex_t 自定義類型模擬 自定義類型模擬 int
init InitializeCriticalSection

pthread_mutex_init

(..PTHREAD_MUTEX_RECURSIVE..)

參考自定義類型 參考自定義類型 n/a
acquire EnterCriticalSection pthread_mutex_lock 參考自定義類型 參考自定義類型 n/a
acquire (..time..) n/a pthread_mutex_timedlock 參考自定義類型 參考自定義類型 n/a
tryacquire TryEnterCriticalSection pthread_mutex_trylock 參考自定義類型 參考自定義類型 n/a
release LeaveCriticalSection pthread_mutex_unlock 參考自定義類型 參考自定義類型 n/a
remove DeleteCriticalSection pthread_mutex_destroy 參考自定義類型 參考自定義類型 n/a
get_thread_id n/a n/a .owner_id_ .owner_id_ n/a
get_nesting_level

.RecursionCount /

.LockCount + 1

n/a .nesting_level_ .nesting_level_ n/a

 

對於上面的表做個簡單說明:

  • windows 的臨界區默認就是遞歸的,所以直接拿來用沒有一點兒問題;
  • 支援 pthread 的 unix like 系統,可以為 pthread_mutex_init 設置 PTHREAD_MUTEX_RECURSIVE 參數來指定互斥量是遞歸的 (當然了,創建 pthread mutex 還有一些其它選項,例如 PTHREAD_MUTEX_ERRORCHECK 可以做一些錯誤檢測並返回錯誤,而不是直接死鎖);
  • Solaris 系統的原生互斥量不支援遞歸加鎖,這裡使用自定義類型來模擬,其實只要是不支援遞歸互斥量的系統,都由這個自定義類型搞定,例如 VxWorks 等;
  • 相對於 ACE_Thread_Mutex,遞歸版本的增加了兩個介面,分別是 get_thread_id 和 get_nesting_level,分別用來獲取當前鎖的擁有者執行緒 ID 和嵌套層次,不過貌似只有自定義類型全部支援。windows 的 CRITICAL_SECTION 可以支援後者,不過對於 32 位系統與 64 位系統有略微差別,前者使用 CRITICAL_SECTION 的 RecursionCount 欄位,後者使用 LockCount 欄位;
  • 對於沒有 mutex 支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)。

 

帶超時參數的 acquire 重載,在有些平台並不被支援,例如 windows。自定義類型的通用定義如下:

 1 class ACE_recursive_thread_mutex_t
 2 {
 3 public:
 4   /// Guards the state of the nesting level and thread id.
 5   ACE_thread_mutex_t nesting_mutex_;
 6 
 7   /// This condition variable suspends other waiting threads until the
 8   /// mutex is available.
 9   ACE_cond_t lock_available_;
10 
11   /// Current nesting level of the recursion.
12   int nesting_level_;
13 
14   /// Current owner of the lock.
15   ACE_thread_t owner_id_;
16 };

 

而其中具體使用的平台設施,又隨 ACE_thread_mutex_t / ACE_cond_t / ACE_thread_t 的定義而不同。關於如何基於非遞歸 mutex 與 condition variable 來實現遞歸 mutex,這個留在後面詳細說明。

ACE_RW_Thread_Mutex

與  ACE_Thread_Mutex 相比,ACE_RW_Thread_Mutex 允許對讀和寫分別加鎖,以提高讀的並行程度 (讀-寫、寫-寫之間還是互斥的,讀-讀可以同時進入)。底層類型為 ACE_rwlock_t,與它相關的一些設施列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ACE_rwlock_t 自定義類型模擬 pthread_rwlock_t rwlock_t 自定義類型模擬 int
init 參考自定義類型 pthread_rwlock_init rwlock_init 參考自定義類型 n/a
acquire_read 參考自定義類型 pthread_rwlock_rdlock rw_rdlock 參考自定義類型 n/a
tryacquire_read 參考自定義類型 pthread_rwlock_tryrdlock rw_tryrdlock 參考自定義類型 n/a
acquire_write 參考自定義類型 pthread_rwlock_wrlock rw_wrlock 參考自定義類型 n/a
tryacquire_write 參考自定義類型 pthread_rwlock_trywrlock rw_trywrlock 參考自定義類型 n/a
tryacquire_write_upgrade 參考自定義類型 n/a n/a 參考自定義類型 n/a
release 參考自定義類型 pthread_rwlock_unlock rw_unlock 參考自定義類型 n/a
remove 參考自定義類型 pthread_rwlock_destroy rwlock_destroy 參考自定義類型 n/a

 

對於上面的表做個簡單說明:

  • 支援 pthread 的 unix like 系統,可以直接基於 pthread_rwlock_t 原生類型進行封裝;
  • Solaris 系統的原生讀寫鎖 rwlock_t 本身就可以支援上述介面;
  • windows 沒有讀寫鎖原生支援,這裡使用自定義類型來模擬,其實只要是不支援讀寫鎖的系統,都由這個自定義類型搞定,例如 VxWorks ;
  • 讀寫鎖的 acquire 分為 acquire_read / acquire_write 分別表示獲取讀鎖與寫鎖;同理,tryacquire 也分為 tryacquire_read / tryacquire_write;而通用的 acquire 其實就是 acquire_write,tryacquire 就是 tryacquire_write;沒有列出帶超時參數的 acquire 重載,因為底層都不支援;另外讀寫鎖還增加了一個 tryacquire_write_upgrade 介面,用來給已經獲取讀鎖的執行緒升級為寫鎖,不過目前僅有模擬的自定義類型支援該介面;
  • 對於沒有 mutex 支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)

 

該自定義類型的通用定義如下:

 1 struct ACE_Export ACE_rwlock_t
 2 {
 3 public:
 4 //protected:
 5 
 6   ACE_mutex_t lock_;
 7   // Serialize access to internal state.
 8 
 9   ACE_cond_t waiting_readers_;
10   // Reader threads waiting to acquire the lock.
11 
12   int num_waiting_readers_;
13   // Number of waiting readers.
14 
15   ACE_cond_t waiting_writers_;
16   // Writer threads waiting to acquire the lock.
17 
18   int num_waiting_writers_;
19   // Number of waiting writers.
20 
21   int ref_count_;
22   // Value is -1 if writer has the lock, else this keeps track of the
23   // number of readers holding the lock.
24 
25   int important_writer_;
26   // indicate that a reader is trying to upgrade
27 
28   ACE_cond_t waiting_important_writer_;
29   // condition for the upgrading reader
30 };

 

而其中具體使用的平台設施,又隨 ACE_mutex_t / ACE_cond_t 的定義而不同。關於如何使用 mutex 與 condition variable 來實現讀寫鎖,這個留在後面詳細說明。

ACE_Process_Mutex

這個主要是做進程間執行緒同步的,底層類型為 ACE_Mutex 或 ACE_SV_Semaphore_Complex,前者是通用的進程間互斥量,後者依賴 System V IPC 機制,默認使用前者,如果所在平台支援,可以通過定義宏 ACE_USES_MUTEX_FOR_PROCESS_MUTEX 來切換到後者,但是我看系統預定義的各平台頭文件,都沒有定義這個宏,所以還是重點看一下前者的實現。ACE_Mutex 底層類型為 ACE_mutex_t,這個類型在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ACE_mutex_t HANDLE pthread_mutex_t mutex_t SEM_ID int
init CreateMutex pthread_mutex_init (..PTHREAD_PROCESS_SHARED..) mutex_init semMCreate n/a
acquire WaitForSingleObject (..INFINITE..) pthread_mutex_lock mutex_lock semTake (..WAIT_FOREVER..) n/a
acquire (..time..) WaitForSingleObject (..time..) pthread_mutex_timedlock n/a semTake (..time..) n/a
tryacquire WaitForSingleObject (..0..) pthread_mutex_trylock mutex_trylock semTake(..NOWAIT..) n/a
release ReleaseMutex pthread_mutex_unlock mutex_unlock semGive n/a
remove CloseHandle pthread_mutex_destroy mutex_destroy semDelete n/a

 

 對於上面的表做個簡單說明:

  • windows 上就是使用互斥量來做進程級別的互斥;
  • 支援 pthread 的 unix like 系統,可以直接基於 pthread_mutex_t 原生類型進行封裝,不過相比進程內互斥量,需要多做兩個工作:
    • 創建或打開一塊共享記憶體,在該記憶體上創建互斥量,需要使用該互斥量的進程,都打開這塊共享記憶體進行操作;
    • 創建互斥量時指定 PTHREAD_PROCESS_SHARED 屬性。
  • Solaris 自己的 mutex_t 就可以支援進程間的互斥,在 type 中指定 USYNC_PROCESS 標誌位即可 (進程內的指定 USYNC_THREAD);
  • VxWorks 實時作業系統只有一個進程,所以無所謂進程間互斥量了,因此還是使用訊號燈 SEM_ID 來模擬;
  • 對於沒有 mutex 支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)。

另外由於執行緒同步對象沒有對讀寫做分離,所以 acquire_read / acquire_write / tryacquire_read / tryacquire_write 均使用默認的 acquire / tryacquire 來實現。ACE_mutex_t 和 ACE_thread_mutex_t 的一個最大不同是,前者可以根據傳入的 type 自動決定是使用進程內還是進程間的互斥量,例如在 windows 上,它的類型其實是一個 union:

 1 typedef struct
 2 {
 3   /// Either USYNC_THREAD or USYNC_PROCESS
 4   int type_;
 5   union
 6   {
 7     HANDLE proc_mutex_;
 8     CRITICAL_SECTION thr_mutex_;
 9   };
10 } ACE_mutex_t;

使用 HANDLE 還是 CRITICAL_SECTION,完全由 type 決定,當然,在 ACE_Process_Mutex 中,是明確指定了 type 為 USYNC_PROCESS 的。

ACE_RW_Process_Mutex

與  ACE_RW_Thread_Mutex 相比,它提供了進程間多執行緒讀寫鎖的能力。基於 ACE_File_Lock,而它的底層類型是 ace_flock_t,其實也是一個自定義類型,主要封裝了不同平台上的文件鎖,因此 ACE_RW_Thread_Mutex 其實只能做進程間的讀寫鎖,而不能做進程內執行緒間的讀寫鎖,這是因為一般的文件鎖的粒度是到進程而不是執行緒的 (進程內多個執行緒去獲取鎖,都會得到鎖已獲取的結果,完全沒有鎖的效果)。與 ace_flock_t 相關的一些設施列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ace_flock_t HADNLE/OVERLAPPED int/struct flock int/struct flock int/struct flock int
init CreateFile open open n/a n/a
acquire_read LockFile[Ex] fcntl (..F_RDLCK..F_SETLKW..) fcntl (..F_RDLCK..F_SETLKW..) n/a n/a
tryacquire_read LockFileEx (..LOCKFILE_FAIL_IMMEDIATELY..) fcntl (..F_RDLCK..F_SETLK..) fcntl (..F_RDLCK..F_SETLK..) n/a n/a
acquire_write LockFileEx (..LOCKFILE_EXCLUSIVE_LOCK..) fcntl (..F_WRLCK..F_SETLKW..) fcntl (..F_WRLCK..F_SETLKW..) n/a n/a
tryacquire_write

LockFileEx (..LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK..)

fcntl (..F_WRLCK..F_SETLK..) fcntl (..F_WRLCK..F_SETLK..) n/a n/a
tryacquire_write_upgrade LockFileEx (..LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK..) fcntl (..F_WRLCK..F_SETLK..) fcntl (..F_WRLCK..F_SETLK..) n/a n/a
release UnlockFile fcntl (..F_UNLCK..F_SETLK..) fcntl (..F_UNLCK..F_SETLK..) n/a n/a
remove CloseHandle/DeleteFile close/unlink close/unlink n/a n/a

 

對於上面的表做個簡單說明:

  • windows 系統基於原生的文件鎖進行封裝;
  • unix like 系統 (包含 Solaris) 可以直接基於 struct flock 原生類型進行封裝;
  • 除了上面列出的介面,還有通用的 acquire 和 tryacquire,它們其實就是通過 acquire_write 和 tryacquire_write 來實現的;帶超時參數的 acquire 重載沒有列出,因為底層都不支援;另外 tryacquire_write_upgrade 介面底層是通過 tryacquire_write 來實現的,底層文件鎖具備直接將讀鎖轉化為寫鎖的介面;
  • 所有介面基本上都使用 start / whence / len 來指定鎖定或解鎖的文件範圍,這個與其它鎖參數還是有很大不同的,好在如果只是鎖定文件第一個位元組,ace 提供的默認值就夠了,所以還能有一定通用性的 (可以在某些模板中通過不帶參數的方式來調用);
  • 對於沒有文件鎖支援的系統,使用 int 來定義類別 (VxWorks 雖然定義了 flock 但是沒有相應的機制來實現文件鎖功能),函數體留空來避免編譯報錯 (相當於不起作用)。

 

下面是 ace_flock_t 的具體定義:

 1   /**
 2    * @class ace_flock_t
 3    *
 4    * @brief OS file locking structure.
 5    */
 6   class ACE_Export ace_flock_t
 7   {
 8   public:
 9   /// Dump state of the object.
10     void dump (void) const;
11 
12 # if defined (ACE_WIN32)
13     ACE_OVERLAPPED overlapped_;
14 # else
15     struct flock lock_;
16 # endif /* ACE_WIN32 */
17 
18     /// Name of this filelock.
19     const ACE_TCHAR *lockname_;
20 
21     /// Handle to the underlying file.
22     ACE_HANDLE handle_;
23   };

 

可以看到就是主要支援兩類:windows 的重疊 IO 和支援文件鎖的 unix like 系統。

ACE_RW_Mutex

通用的讀寫鎖類型,ACE_RW_Thread_Mutex 基類,與後者不同的是,它提供了 type 類型來指定共享的範圍是進程內 (USYNC_THREAD) 還是進程間 (USYNC_PROCESS),ACE_RW_Thread_Mutex 就是通過傳遞 USYNC_THREAD 來實現的。底層類型同為 ACE_rwlock_t,這裡重點考察一下它在 posix 與 solaris 上底層設施的差別:

平台/介面/設施 unix like (pthread) Solaris
ACE_rwlock_t pthread_rwlock_t rwlock_t
init pthread_rwlock_init rwlock_init
acquire_read pthread_rwlock_rdlock rw_rdlock
tryacquire_read pthread_rwlock_tryrdlock rw_tryrdlock
acquire_write pthread_rwlock_wrlock rw_wrlock
tryacquire_write pthread_rwlock_trywrlock rw_trywrlock
tryacquire_write_upgrade n/a n/a
release pthread_rwlock_unlock rw_unlock
remove pthread_rwlock_destroy rwlock_destroy

 

其中 rwlock_init 接收一個 type 參數用於表示進程內、進程間共享 (USYNC_THREAD | USYNC_PROCESS);pthread_rwlock_init 也是如此,不過具體類型定義與 Solaris 上有所不同 (THREAD_PROCESS_SHARED | THREAD_PROCESS_PRIVATE),ACE 內部會做適當轉換。這兩組介面都不支援原生 name (雖然介面出於一致性提供了,但是內部都沒有使用),是通過將讀寫鎖放在共享記憶體中實現跨進程訪問的,這一點需要特別注意。

條件變數

條件變數主要源自於 pthread 中的 condition variable,依據條件變數配合使用的 mutex 的不同,又細分為以下幾類:

ACE_Condition_Thread_Mutex

這個主要是做進程內多執行緒等待與通知的,底層類型為 ACE_cond_t 與 ACE_Thread_Mutex,後者上面已經說明過了,下面重點說一下前者,它在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows unix like (pthread) Solaris VxWorks unsupport
ACE_cond_t 自定義類型模擬 pthread_cond_t cond_t 自定義類型模擬 int
init 參考自定義類型 pthread_cond_init cond_init 參考自定義類型 n/a
wait 參考自定義類型 pthread_cond_wait cond_wait 參考自定義類型 n/a
wait(..timeout..) 參考自定義類型 pthread_cond_timedwait cond_timedwait 參考自定義類型 n/a
signal 參考自定義類型 pthread_cond_signal cond_signal 參考自定義類型 n/a
broadcast 參考自定義類型 pthread_cond_broadcast cond_broadcast 參考自定義類型 n/a
remove 參考自定義類型 pthread_cond_destroy cond_destroy 參考自定義類型 n/a

 

對於上面的表做個簡單說明:

  • 支援 pthread 的 unix like 系統,可以直接基於 pthread_cond_t 原生類型進行封裝;
  • Solaris 系統的原生條件變數 cond_t 本身就可以支援上述介面;
  • windows 沒有原生條件變數支援,這裡使用自定義類型來模擬,其實只要是不支援條件變數的系統,都由這個自定義類型搞定,例如 VxWorks  等;
  • 條件變數的 wait 有兩個重載,第二個可以帶超時參數,此時對應的底層設施和第一個介面是不一樣的;signal 用於喚醒一個執行緒;broadcast 用於喚醒所有等待在這個條件變數上的執行緒,不過最終仍只有一個執行緒可獲取鎖從而進入條件變數中;
  • 對於沒有 thread mutex 和訊號燈或事件支援的系統 (模擬類型所依賴的基礎設施),使用 int 來定義 ACE_cond_t 類型、函數體留空,來避免編譯報錯 (相當於不起作用)。

 

該自定義類型的通用定義如下:

 1 class ACE_Export ACE_cond_t
 2 {
 3 public:
 4 
 5   /// Returns the number of waiters.
 6   long waiters (void) const;
 7 
 8 //protected:
 9   /// Number of waiting threads.
10   long waiters_;
11 
12   /// Serialize access to the waiters count.
13   ACE_thread_mutex_t waiters_lock_;
14 
15   /// Queue up threads waiting for the condition to become signaled.
16   ACE_sema_t sema_;
17 
18 #     if defined (VXWORKS)19   /**
20    * A semaphore used by the broadcast/signal thread to wait for all
21    * the waiting thread(s) to wake up and be released from the
22    * semaphore.
23    */
24   ACE_sema_t waiters_done_;
25 #     elif defined (ACE_WIN32)
26   /**
27    * An auto reset event used by the broadcast/signal thread to wait
28    * for the waiting thread(s) to wake up and get a chance at the
29    * semaphore.
30    */
31   HANDLE waiters_done_;
32 #     else
33 #       error "Please implement this feature or check your config.h file!"
34 #     endif /* VXWORKS || ACE_PSOS */
35 
36   /// Keeps track of whether we were broadcasting or just signaling.
37   size_t was_broadcast_;
38 };

 

而其中具體使用的平台設施,又隨 ACE_thread_mutex_t / ACE_sema_t / event 的定義而不同 (waiters_done_ 成員還特別區分了 VxWorks 與 Win32 平台,前者基於訊號燈,後者基於事件)。關於如何使用 mutex 與 semaphore 或 event 來實現條件變數,這個留在後面詳細說明。

ACE_Condition <MUTEX>

通用類型的條件變數,底層的互斥量可通過模板參數傳遞。與 ACE_Thread_Mutex_Condition 唯一的不同之處是提供了 type 類型來指定共享的範圍是進程內 (USYNC_THREAD) 還是進程間 (USYNC_PROCESS),底層類型同為 ACE_cond_t,這裡重點考察一下它在 posix 與 solaris 上的底層設施:

平台/介面/設施 unix like (pthread) Solaris
ACE_cond_t pthread_cond_t cond_t
init pthread_cond_init cond_init
wait pthread_cond_wait cond_wait
wait(..timeout..) pthread_cond_timedwait cond_timedwait
signal pthread_cond_signal cond_signal
broadcast pthread_cond_broadcast cond_broadcast
remove pthread_cond_destroy cond_destroy

 

其中 cond_init 接收一個 type 參數用於表示進程內、進程間共享 (USYNC_THREAD | USYNC_PROCESS);pthread_cond_init 也是如此,不過具體類型定義與 Solaris 上有所不同 (THREAD_PROCESS_SHARED | THREAD_PROCESS_PRIVATE),ACE 內部會做適當轉換。這兩組介面都不支援原生 name (雖然介面出於一致性提供了,但是內部都沒有使用),是通過將條件變數放在共享記憶體中實現跨進程訪問的,這一點需要注意。

用於 ACE_Condition 的 MUTEX 模板參數,只能是下面幾類:

  • ACE_Thread_Mutex
  • ACE_Recursive_Thread_Mutex
  • ACE_Null_Mutex

前兩個已經在前面介紹過了,ACE_Null_Mutex 請參考後面 NULL 那一章。

ACE_Thread_Condition <MUTEX>

進程內多執行緒等待與喚醒的通用的條件變數,派生自 ACE_Condition <MUTEX>,並指定了使用 USYNC_THREAD 類型。它與 ACE_Condition_Thread_Mutex 作用完全一致,其實 ACE 作者的本意是定義它的實例化來作為 ACE_Condition_Thread_Mutex:

typedef ACE_Condition_Thread_Mutex ACE_Thread_Condition <ACE_Thread_Mutex>; 

 

不過由於某些古老編譯器的限制,這一實例化受限,於是不得不重新定義了一個 ACE_Condition_Thread_Mutex。不過這個類型也有好處,就是可以指定 MUTEX 類型,目前支援的類型與其父類 ACE_Condition <MUTEX> 相同。

ACE_Condition_Recursive_Thread_Mutex

與  ACE_Condition_Thread_Mutex 相比,增加了同 ACE_Recursive_Thread_Mutex 配合使用的能力。底層類型為 ACE_cond_t 與 ACE_Recursive_Thread_Mutex,涉及 ACE_cond_t 類型的底層設施上面已經說明過了,這裡沒有改變。其實 ACE_Condition_Recursive_Thread_Mutex 是 ACE_Condition <MUTEX> 模板使用 ACE_Recursive_Thread_Mutex 作為 MUTEX 模板參數的一個特化,後者與 ACE_Condition_Thread_Mutex 的關係前面已經介紹過了,可以認為是等價的。

而新的特化專門為等待遞歸鎖 (wait 的兩個重載) 提供了一份新的實現,用於在等待條件時釋放 nesting_level 級別個鎖、並在條件滿足被喚醒後重新獲取 nesting_level 個鎖,從而保證在等待期間其它執行緒可以進入鎖,避免死鎖的發生。其實條件變數一般為了避免這種多層加鎖導致的死鎖問題,很少和遞歸鎖配合使用,一般是和非遞歸鎖一起用,所以非不得已,一般不使用這個類型。

上面的類型可能有點讓人眼暈,畫個圖說明一下它們之間的關係:

 

 

ACE 因為兼容大量老舊平台與編譯器,不得不在某些場景捨棄他們最愛的模板,不然的話程式碼還可以更為精簡。

訊號燈

訊號燈就是 semaphore 了,它提供經典的 PV 操作,是作業系統同步的基石之一,所以很多平台都會支援。依據 semaphore 的各種特性,又細分為以下幾類:

ACE_Thread_Semaphore

這個主要是做進程內同步的,底層類型為 ACE_sema_t,這個類型在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows wince unix like (posix) unix like (sysv) Solaris VxWorks unsupport
ACE_sema_t HANDLE 自定義類型 I sem_t 自定義類型 II sema_t SEM_ID int
init CreateSemaphore 參考自定義類型 I sem_init / sem_open 參考自定義類型 II sema_init semCCreate n/a
acquire WaitForSingleObject (..INFINITE..) 參考自定義類型 I sem_wait 參考自定義類型 II sema_wait semTake (..WAIT_FOREVER..) n/a
acquire (..time..) WaitForSingleObject (..time..) 參考自定義類型 I n/a 參考自定義類型 II n/a semTake (..time..) n/a
tryacquire WaitForSingleObject (..0..) 參考自定義類型 I sem_trywait 參考自定義類型 II sema_trywait semTake (..NOWAIT..) n/a
release ReleaseSemaphore (..1..) 參考自定義類型 I sem_post 參考自定義類型 II sema_post semGive n/a
release (..N..) ReleaseSemaphore (..N..) 循環調用 release N 次 循環調用 release N 次 循環調用 release N 次 循環調用 release N 次 循環調用 release N 次 n/a
remove CloseHandle 參考自定義類型 I sem_unlink / sem_close / sem_destroy 參考自定義類型 II sema_destroy semDelete n/a

 

對於上面的表做個簡單說明:

  • windows 上就是使用原生 Semaphore 來做訊號燈;
  • wince  (Windows CE) 某些版本之前不支援原生 Semaphore,這裡使用事件 (event) 和臨界區 (CRITICAL_SECTION) 來模擬,定義為類型 I;
  • unix like 一般都支援 posix 標準,可以直接使用 posix 定義的 sem_t 類型來實現訊號燈,它既支援匿名訊號燈 (sem_init / sem_destroy)、也支援命名訊號燈 (sem_open / sem_unlink / sem_close),根據用戶需求 (是否傳遞有效的 name 參數) 來決定使用的底層介面。奇怪的是 posix semaphore 有 sem_timedwait 介面,而 ACE 卻沒有封裝,不知道是不是我使用的版本太老的緣故;
  • 一些早期的平台對 posix 標準支援不全,它們沒有 posix semaphore 可用, 這裡基於互斥量 (pthread_mutex_t) 和條件變數 (pthread_cond_t) 來模擬,定義為類型 II;
  • Solaris 有自己原生的 sema_t,不使用 posix 訊號燈,注意它和 posix 上的 sem_t 不是一個類型,sem 與  sema 一字之差,但是完全是兩套介面,Solaris 上不支援命名訊號燈。不過 Solaris 後續版本也支援 posix 訊號燈,所以具體使用哪個,要看系統版本而定;
  • VxWorks 有自己原生的 SEM_ID 來做訊號燈;
  • 對於沒有 semaphore 支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)。

另外由於執行緒同步對象沒有對讀寫做分離,所以 acquire_read / acquire_write / tryacquire_read / tryacquire_write 均使用默認的 acquire / tryacquire 來實現;對於 release 介面,提供一個一次釋放 N 次訊號燈的重載,對於該重載,除 wince 平台是調整介面 (ReleaseSemaphore) 參數外,其它的都是通過循環調用釋放介面來模擬的。

 

自定義類型 I 定義如下:

 1 /**
 2  * @class ACE_sema_t
 3  *
 4  * @brief Semaphore simulation for Windows CE.
 5  */
 6 class ACE_Export ACE_sema_t
 7 {
 8 public:
 9   /// Serializes access to <count_>.
10   ACE_thread_mutex_t lock_;
11 
12   /// This event is signaled whenever the count becomes non-zero.
13   ACE_event_t count_nonzero_;
14 
15   /// Current count of the semaphore.
16   u_int count_;
17 };

 

因為限定了 wince 平台,所以 ACE_thread_mutex_t 就是 CRITICAL_SECTION,ACE_event_t 就是 HANDLE (Event) 了。自定義類型 II 定義如下:

 1 /**
 2  * @class ACE_sema_t
 3  *
 4  * @brief This is used to implement semaphores for platforms that support
 5  * POSIX pthreads, but do *not* support POSIX semaphores, i.e.,
 6  * it's a different type than the POSIX <sem_t>.
 7  */
 8 class ACE_Export ACE_sema_t
 9 {
10 public:
11   /// Serialize access to internal state.
12   ACE_mutex_t lock_;
13 
14   /// Block until there are no waiters.
15   ACE_cond_t count_nonzero_;
16 
17   /// Count of the semaphore.
18   u_long count_;
19 
20   /// Number of threads that have called <ACE_OS::sema_wait>.
21   u_long waiters_;
22 };

因為限定了 unix 平台,所以 ACE_mutex_t 就是 pthread_mutex_t,ACE_cond_t 就是 pthread_cond_t 了。關於如何基於它們來實現訊號燈,這個留在後面再說。

此外,為了保存命名訊號燈的名稱,支援 posix semaphore 的平台和 VxWorks 並不是直接使用 sem_t 和 SEM_ID 的,而是將它們和 name 組合成一個結構體一起來使用:

 1 typedef struct
 2 {
 3   /// Pointer to semaphore handle.  This is allocated by ACE if we are
 4   /// working with an unnamed POSIX semaphore or by the OS if we are
 5   /// working with a named POSIX semaphore.
 6   sem_t *sema_;
 7 
 8   /// Name of the semaphore (if this is non-NULL then this is a named
 9   /// POSIX semaphore, else its an unnamed POSIX semaphore).
10   char *name_;
11 
12 #if defined (ACE_LACKS_NAMED_POSIX_SEM)
13   /// this->sema_ doesn't always get created dynamically if a platform
14   /// doesn't support named posix semaphores.  We use this flag to
15   /// remember if we need to delete <sema_> or not.
16   int new_sema_;
17 #endif /* ACE_LACKS_NAMED_POSIX_SEM */
18 } ACE_sema_t;

 

不過好像因為 VxWorks 本身不支援命名訊號燈,所以它這個成員一直保持為 NULL:

1 // Use VxWorks semaphores, wrapped ...
2 typedef struct
3 {
4   /// Semaphore handle.  This is allocated by VxWorks.
5   SEM_ID sema_;
6 
7   /// Name of the semaphore:  always NULL with VxWorks.
8   char *name_;
9 } ACE_sema_t;

 

ACE_Process_Semaphore

這個主要是做進程間執行緒同步的,底層類型視不同平台分別為 ACE_Semaphore 或 ACE_SV_Semaphore_Complex,在 Windows 與支援 posix semaphore 的平台上使用前者,因為它們原生的訊號燈本身就支援跨進程使用;對於支援 SystemV semaphore 的平台則使用後者,它封裝了 sysv 相關的訊號燈。ACE_Semaphore 其實就是 ACE_Thread_Semaphore 的基類,因而它的一些封裝和上一節完全一樣,不同的地方主要在於 posix semaphore 的跨進程處理上:

  • 如果該訊號燈是有名的,則使用 sem_open / sem_close / sem_unlink 介面來操作命名訊號燈,不同進程可以通過名稱來指定一個唯一的全局訊號燈;
  • 如果該訊號燈是匿名的,則使用 sem_init / sem_destroy  在共享記憶體上創建對應的訊號燈,不同的進程都映射這個共享記憶體來操作匿名的訊號燈。

對於 sysv 訊號燈,則使用另外一套完全不同的介面,該介面早於 posix 訊號燈存在,因而也被許多系統廣泛的支援,具體依賴的介面列表如下:

平台/介面/設施 windows unix like (posix) unix like (sysv) Solaris VxWorks unsupport
ACE_sema_t HANDLE sem_t int sema_t SEM_ID int
init CreateSemaphore sem_init / sem_open semget / semctl (..SETVAL..) sema_init semCCreate n/a
acquire WaitForSingleObject (..INFINITE..) sem_wait semop (..-1..) sema_wait semTake (..WAIT_FOREVER..) n/a
tryacquire WaitForSingleObject (..0..) sem_trywait semop (..-1..IPC_NOWAIT..) sema_trywait semTake (..NOWAIT..) n/a
release ReleaseSemaphore (..1..) sem_post semop (..1..) sema_post semGive n/a
remove CloseHandle sem_unlink / sem_close / sem_destroy semctl (..IPC_RMID..) sema_destroy semDelete n/a
op n/a n/a semop n/a n/a n/a
control n/a n/a semctl n/a n/a n/a

 

對於上面的表做個簡單說明:

  • 除 unix like (sysv) 外,其它列無變化,底層使用 ACE_Semaphore 來實現;
  • 對於使用 sysv 訊號燈的 unix like 系統,進程間執行緒同步不再使用自定義類型模擬,而是直接使用 sysv 原生訊號燈。

sysv 原生訊號燈類型定義就是一個整數,與 posix 訊號燈最大的不同就是可以同時操作一組訊號燈,因此在 ACE_SV_Semaphore_Complex 類型中介面都有一個下標參數,用來表示操作哪個訊號燈。初始化時一般需要兩步,即 semget 創建訊號燈數組,semctl 設置其初始值。

同 ACE_Thread_Semaphore 一樣,這裡 acquire_read / acquire_write / tryacquire_read / tryacquire_write 均使用默認的 acquire / tryacquire 來實現,此外,雖然 sysv 訊號燈支援一次 acquire 或 release 訊號燈的值大於一,但是這裡沒有封裝,對應的 release N 重載版本也從介面中移除了。同樣,由於 sysv 訊號燈不支援超時,對應的帶超時時間的 acquire 重載版本也從介面中移除了。這裡新加入的 op 與 control 介面只是針對 ACE_SV_Semaphore_Complex 類型。

其實針對 sysv 訊號燈的封裝,在 ACE 中有兩個層面,一個是較直接的 ACE_SV_Semaphore_Simple 類型,就是直接的底層介面封裝;另外一個才是 ACE_SV_Semaphore_Complex 類型,它主要是考慮到多進程並發的情況下,如何規避 sysv 訊號燈本身的一些設計缺陷帶來的競爭問題,這種競爭問題主要表現在幾個方面:

  • 並不是當前進程退出時就要刪除訊號燈組,需要判斷沒有其它進程再使用該訊號燈組時才發生刪除動作;
  • 多個進程工作在同一個訊號燈組時,創建與退出可能存在競爭關係,需要加以保護;
  • 創建訊號燈與設置訊號燈初始值是兩個操作不是原子的,需要加以保護。

它空出兩個訊號燈專門用於整個訊號燈組的創建、刪除操作過程中的同步,其中一個就是簡單的當作鎖來用,另一個則記錄了整個工作在訊號燈組上的進程數量,當數量減為 0 時表示無進程工作在此實例上因而可以安全的釋放整個訊號燈組。至於如何做到這一點,留在以後再說。

事件

事件是 bool 狀態的訊號燈,適合一些簡單的同步場景。事件可以有兩種狀態,有訊號或無訊號,無訊號狀態下,在上面等待的執行緒將被阻塞,直到事件被激發 (signal) 為有訊號狀態。

ACE_Event

這個主要用來做進程間執行緒間同步的。底層類型為 ACE_event_t ,這個類型在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows unix like unsupport
ACE_event_t HANDLE 自定義類型模擬 int
init CreateEvent 參考自定義類型 n/a
wait WaitForSingleObject (..INFINITE..) 參考自定義類型 n/a
wait(..timeout..) WaitForSingleObject (..time..) 參考自定義類型 n/a
signal SetEvent 參考自定義類型 n/a
pulse PulseEvent 參考自定義類型 n/a
reset ResetEvent 參考自定義類型 n/a
remove CloseHandle 參考自定義類型 n/a

 

對於上面的表做個簡單說明:

  • windows 上就是使用原生 Event 來做事件,基於 windows 的原生能力,可實現跨進程執行緒間同步、全局名稱查找等能力;
  • unix like 系統,沒有原生的事件對象,這裡使用自定義類型來模擬,要求平台支援 mutex 與 condition variable,模擬實現的事件沒有跨進程、全局名稱查找的能力;
  • 對於沒有 mutex 和 condition variable 支援的系統,使用 int 來定義 ACE_event_t 類型、函數體留空,來避免編譯報錯 (相當於不起作用)。

事件的介面和一般的執行緒同步對象差別較大,與條件變數介面有些相似,它使用 wait / signal / pulse / reset 來代替 acquire / release。

自定義類型的通用定義如下:

 1 class ACE_Export ACE_event_t
 2 {
 3   /// Protect critical section.
 4   ACE_mutex_t lock_;
 5 
 6   /// Keeps track of waiters.
 7   ACE_cond_t condition_;
 8 
 9   /// Specifies if this is an auto- or manual-reset event.
10   int manual_reset_;
11 
12   /// "True" if signaled.
13   int is_signaled_;
14 
15   /// Special bool for auto_events alone
16   /**
17    * The semantics of auto events forces us to introduce this extra
18    * variable to ensure that the thread is not woken up
19    * spuriously. Please see event_wait and event_timedwait () to see
20    * how this is used for auto_events. Theoretically this is a hack
21    * that needs revisiting after x.4
22    */
23   bool auto_event_signaled_;
24 
25   /// Number of waiting threads.
26   unsigned long waiting_threads_;
27 };

關於如何使用 mutex 與 condition variable 來實現事件,這個留在後面詳細說明。

ACE_Auto_Event

自動事件,派生自 ACE_Event。自動事件在激活時一次只喚醒一個執行緒,且執行緒喚醒後,事件自動重置為無訊號狀態。

  • windows 平台上原生的事件就支援該類型,介面只是傳遞了一個標誌位給底層介面;
  • unix like 平台上模擬的自定義類型通過內部的一個變數記錄了事件的類型,在處理時會參考它進行不同的操作。

由於自動重置的特性,很少使用 reset / pulse 介面,也沒有類似條件變數 broadcast 那樣的介面可以一次喚醒所有等待執行緒。

ACE_Manual_Event

手動事件,派生自 ACE_Event。手動事件在激活時喚醒全部執行緒,且執行緒喚醒後,事件仍保持有訊號狀態,直接用戶手動重置事件。

  • windows 平台上原生的事件就支援該類型,介面只是傳遞了一個標誌位給底層介面;
  • unix like 平台上模擬的自定義類型通過內部的一個變數記錄了事件的類型,在處理時會參考它進行不同的操作。

由於沒有自動重置的特性,非常依賴 reset / pulse 介面,signal 介面將喚醒所有在事件上等待的執行緒 (類似條件變數的 broadcast 介面),但沒有類似條件變數喚醒單個執行緒的介面。

 

總之,由於需要事先指定事件類型、且創建後不能再修改類型,事件在使用過程中不如條件變數靈活。

執行緒局部存儲

執行緒專有/局部存儲 ,Thread Special/Local Storage,簡寫為 TSS (linux) 或 TLS (windows),它提供了一套訪問變數的介面,通過這組介面,多個執行緒看上去訪問的是一個全局變數,實際上是該變數對應到此執行緒的專有實例,從而從根本上避免了執行緒競爭的問題。

ACE_TSS <TYPE>

通用的 TSS 模板類,底層類型為 ACE_thread_key_t,這個類型在不同平台上依賴的設施也不盡相同,可以列表如下:

平台/介面/設施 windows wince unix like (posix) unix like (non-posix) Solaris VxWorks unsupport
ACE_thread_key_t DWORD u_int pthread_key_t u_long thread_key_t u_int int
init TlsAlloc 參考自定義類型 pthread_key_create 參考自定義類型 thr_keycreate 參考自定義類型 n/a
ts_get TlsGetValue 參考自定義類型 pthread_getspecific 參考自定義類型 thr_getspecific 參考自定義類型 n/a
ts_set TlsSetValue 參考自定義類型 pthread_setspecific 參考自定義類型 thr_setspecific 參考自定義類型 n/a
ts_object ts_get n/a
ts_object (ts) init / ts_get / ts_set n/a

operator ->

operator TYPE*

init / ts_get / ts_set n/a
remove TlsFree 參考自定義類型 pthread_key_delete 參考自定義類型 thr_keydelete 參考自定義類型 n/a

 

對於上面的表做個簡單說明:

  • windows 上就是使用原生 Tls 介面來實現執行緒局部存儲;
  • wince  (Windows CE) 上的原生 Tls 介面有一些限制(?),這裡使用自定義類型來模擬,需要定義宏 ACE_HAS_TSS_EMULATION;
  • unix like 一般都支援 posix 標準,可以直接使用 posix 定義的 pthread_key_xxx 介面來實現執行緒專有存儲;
  • 早期對 posix 標準支援不全的 unix like 系統 ,使用與 wince 相同的自定義類型來模擬;
  • Solaris 有自己原生的 thr_keyxxx 介面,不使用 posix 設施;
  • VxWorks 也缺失執行緒局部存儲能力,這裡也使用與上面相同的自定義類型來模擬;
  • 對於其它沒有執行緒局部存儲支援的系統,使用 int 來定義類別,函數體留空來避免編譯報錯 (相當於不起作用)。

 

執行緒局部存儲的介面和一般的執行緒同步對象差別較大,在進程創建或第一次使用時初始化根鍵、用來代表一個全局的變數,之後每個執行緒可以基於這個根鍵存取自己執行緒實例的值,ts_get / ts_set 代表了獲取和設置兩種操作,實際並不存在這樣一個公開介面,只是為了描述下面介面實現方便而抽象出來的。例如:

  • ts_object 不帶參數的版本表示獲取實例值,底層基於 ts_get 實現,如果未初始化根鍵或沒有對應的值,返回空指針;
  • ts_object 帶 TYPE* 參數的版本表示設置實例值,因為要返回之前的舊值,所以內部同時調用了 ts_get 與 ts_set,如果根鍵未初始化,可能還需要調用到 init 來做第一次使用時的初始化動作;;
  • 類的 operator-> 與 operator TYPE* 操作符重載,底層實際是調用 ts_get 獲取實例值,如果該執行緒還沒有設置任何實例值,則返回一個新的值並通過 ts_set 將其綁定到根鍵所在的執行緒中,同理,如果根鍵未初始化,也需要調用一次 init 來初始化之。

最複雜的部分在於銷毀,在創建執行緒局部存儲根鍵時:

  • unix like (posix) 和 Solaris 平台會記錄一個清理函數;
  • windows 和其它通過自定義類型模擬的平台由於不支援清理函數,只能通過外部的 ACE_TSS_Cleanup 類單例完成對象與清理函數的登記工作。

這樣在執行緒退出時:

  • unix like (posix) 和 Solaris 平台會自動調用這個清理函數銷毀創建的執行緒實例;
  • windows 和其它通過自定義類型模擬的平台則有兩個時機來銷毀根鍵:
    • 針對每個執行緒引用的 ACE_TSS <TYPE> 對象做處理,當該對象析構時,會嘗試在  ACE_TSS_Cleanup 中找到對應的登記資訊,對對應根鍵的引用計數減一,如果引用計數減為零,才嘗試銷毀這個根鍵;
    • 每個由 ace 介面啟動的執行緒都會運行 ace 自己的執行緒託管函數,該函數在執行緒退出前,會嘗試在 ACE_TSS_Cleanup 中針對每一個根鍵做遍歷,找到該根鍵在此執行緒的實例並銷毀之,但不會對根鍵做任何處理 (以防止銷毀將要被其它執行緒訪問的根鍵)。

ACE_TSS <TYPE> 本身是一個 c++ 模板類,模板參數就是執行緒使用的實例類型,可以為簡單類型如 char / int / float / double,也可以為其它自定義的類或結構體。用戶只需要定義自己的類型並傳遞給模板就可以實現執行緒隔離能力,這就是 c++ 的強悍之處。

自定義類型的通用定義如下:

  1 // forward declaration
  2 class ACE_TSS_Keys;
  3 
  4 /**
  5  * @class ACE_TSS_Emulation
  6  *
  7  * @brief Thread-specific storage emulation.
  8  *
  9  * This provides a thread-specific storage implementation.
 10  * It is intended for use on platforms that don't have a
 11  * native TSS, or have a TSS with limitations such as the
 12  * number of keys or lack of support for removing keys.
 13  */
 14 class ACE_Export ACE_TSS_Emulation
 15 {
 16 public:
 17   typedef void (*ACE_TSS_DESTRUCTOR)(void *value) /* throw () */;
 18 
 19   /// Maximum number of TSS keys allowed over the life of the program.
 20   enum { ACE_TSS_THREAD_KEYS_MAX = ACE_DEFAULT_THREAD_KEYS };
 21 
 22   /// Returns the total number of keys allocated so far.
 23   static u_int total_keys ();
 24 
 25   /// Sets the argument to the next available key.  Returns 0 on success,
 26   /// -1 if no keys are available.
 27   static int next_key (ACE_thread_key_t &key);
 28 
 29   /// Release a key that was used. This way the key can be given out in a
 30   /// new request. Returns 0 on success, 1 if the key was not reserved.
 31   static int release_key (ACE_thread_key_t key);
 32 
 33   /// Returns the exit hook associated with the key.  Does _not_ check
 34   /// for a valid key.
 35   static ACE_TSS_DESTRUCTOR tss_destructor (const ACE_thread_key_t key);
 36 
 37   /// Associates the TSS destructor with the key.  Does _not_ check
 38   /// for a valid key.
 39   static void tss_destructor (const ACE_thread_key_t key,
 40                               ACE_TSS_DESTRUCTOR destructor);
 41 
 42   /// Accesses the object referenced by key in the current thread's TSS array.
 43   /// Does _not_ check for a valid key.
 44   static void *&ts_object (const ACE_thread_key_t key);
 45 
 46   /**
 47    * Setup an array to be used for local TSS.  Returns the array
 48    * address on success.  Returns 0 if local TSS had already been
 49    * setup for this thread.  There is no corresponding tss_close ()
 50    * because it is not needed.
 51    * NOTE: tss_open () is called by ACE for threads that it spawns.
 52    * If your application spawns threads without using ACE, and it uses
 53    * ACE's TSS emulation, each of those threads should call tss_open
 54    * ().  See the ace_thread_adapter () implementation for an example.
 55    */
 56   static void *tss_open (void *ts_storage[ACE_TSS_THREAD_KEYS_MAX]);
 57 
 58   /// Shutdown TSS emulation.  For use only by ACE_OS::cleanup_tss ().
 59   static void tss_close ();
 60 
 61 private:
 62   // Global TSS structures.
 63   /// Contains the possible value of the next key to be allocated. Which key
 64   /// is actually allocated is based on the tss_keys_used
 65   static u_int total_keys_;
 66 
 67   /// Array of thread exit hooks (TSS destructors) that are called for each
 68   /// key (that has one) when the thread exits.
 69   static ACE_TSS_DESTRUCTOR tss_destructor_ [ACE_TSS_THREAD_KEYS_MAX];
 70 
 71   /// TSS_Keys instance to administrate whether a specific key is in used
 72   /// or not.
 73   /// or not.
 74   // Static construction in VxWorks 5.4 and later is slightly broken.
 75   // If the static object is more complex than an integral type, static
 76   // construction will occur twice.  The tss_keys_used_ object is
 77   // statically constructed and then modified by ACE_Log_Msg::instance()
 78   // when two keys are created and TSS data is stored.  However, at
 79   // the end of static construction the tss_keys_used_ object is again
 80   // initialized and therefore it will appear to next_key() that no
 81   // TSS keys have been handed out.  That is all true unless the
 82   // tss_keys_used object is a static pointer instead of a static object.
 83   static ACE_TSS_Keys* tss_keys_used_;
 84 
 85 #   if defined (ACE_HAS_THREAD_SPECIFIC_STORAGE)
 86   /// Location of current thread's TSS array.
 87   static void **tss_base (void* ts_storage[] = 0, u_int *ts_created = 0);
 88 #   else  /* ! ACE_HAS_THREAD_SPECIFIC_STORAGE */
 89   /// Location of current thread's TSS array.
 90   static void **&tss_base ();
 91 #   endif /* ! ACE_HAS_THREAD_SPECIFIC_STORAGE */
 92 
 93 #   if defined (ACE_HAS_THREAD_SPECIFIC_STORAGE)
 94   // Rely on native thread specific storage for the implementation,
 95   // but just use one key.
 96   static ACE_OS_thread_key_t native_tss_key_;
 97 
 98   // Used to indicate if native tss key has been allocated
 99   static int key_created_;
100 #   endif /* ACE_HAS_THREAD_SPECIFIC_STORAGE */
101 };

 該類型有兩種實現方式:

  • 如果平台有執行緒局部存儲的能力,只是支援的不夠完整,那麼 ACE_TSS_Emulation 嘗試使用這些現成的機制、並在此基礎上做一些彌補工作;
  • 如果平台壓根沒有這方面的能力,那麼 ACE_TSS_Emulation 將從無到有模擬這種能力。

ACE_TSS_Emulation 需要用到 ACE_TSS_Keys 輔助類,它的定義如下:

 1 /**
 2  * @class ACE_TSS_Keys
 3  *
 4  * @brief Collection of in-use flags for a thread's TSS keys.
 5  * For internal use only by ACE_TSS_Cleanup; it is public because
 6  * some compilers can't use nested classes for template instantiation
 7  * parameters.
 8  *
 9  * Wrapper around array of whether each key is in use.  A simple
10  * typedef doesn't work with Sun C++ 4.2.
11  */
12 class ACE_TSS_Keys
13 {
14 public:
15   /// Default constructor, to initialize all bits to zero (unused).
16   ACE_TSS_Keys (void);
17 
18   /// Mark the specified key as being in use, if it was not already so marked.
19   /// Returns 1 if the had already been marked, 0 if not.
20   int test_and_set (const ACE_thread_key_t key);
21 
22   /// Mark the specified key as not being in use, if it was not already so
23   /// cleared.  Returns 1 if the key had already been cleared, 0 if not.
24   int test_and_clear (const ACE_thread_key_t key);
25 
26   /// Return whether the specific key is marked as in use.
27   /// Returns 1 if the key is been marked, 0 if not.
28   int is_set (const ACE_thread_key_t key) const;
29 
30 private:
31   /// For a given key, find the word and bit number that represent it.
32   static void find (const u_int key, u_int &word, u_int &bit);
33 
34   enum
35     {
36 #   if ACE_SIZEOF_LONG == 8
37       ACE_BITS_PER_WORD = 64,
38 #   elif ACE_SIZEOF_LONG == 4
39       ACE_BITS_PER_WORD = 32,
40 #   else
41 #     error ACE_TSS_Keys only supports 32 or 64 bit longs.
42 #   endif /* ACE_SIZEOF_LONG == 8 */
43       ACE_WORDS = (ACE_DEFAULT_THREAD_KEYS - 1) / ACE_BITS_PER_WORD + 1
44     };
45 
46   /// Bit flag collection.  A bit value of 1 indicates that the key is in
47   /// use by this thread.
48   u_long key_bit_words_[ACE_WORDS];
49 };

關於具體如何實現模擬,這個留在後面詳細說明。

ACE_TSS_Connection

每執行緒一個連接的實用類型,它是 ACE_TSS <TYPE> 的派生類,提供的 TYPE 是 ACE_SOCK_Stream,用來代表一個 tcp 連接。

這個類型可以理解成是 ACE_TSS <TYPE> 的現成應用,主要用於 ACE_Token_Connections 中,後者又用於 ACE_Remote_Token_Proxy 來實現遠程令牌同步對象系統中的鎖伺服器,關於這方面的內容,可以參考後面 TOKEN 這一章。

原子操作

上面說的一些同步對象都比較重,面向的也是一些複雜的同步場景,而一些簡單的算術運算,由於底層可能被解釋成多條彙編指令,從而帶來執行緒競爭的場景,完全可以由平台提供的 CPU 指令級別的原子操作來實現,可以大大提高並發性能。

ACE_Atomic_Op <LOCK, TYPE>

這個模板類封裝了通用的原子操作類型,說它通用,是因為底層它是通過 LOCK 類型來獲取執行緒互斥的能力,然後在鎖的保護下執行一些類似 ++/–/+=/-+ … 的算術操作以及 >/>=/</<= … 的比較操作,這些操作都是直接委託給傳入的 TYPE 類型。

對於鎖的類型沒有要求,只要提供以下四個通用介面即可:

  • acquire
  • tryacquire
  • release
  • remove

ACE 中符合這個約定的類型有不少,其實這裡使用了 GUARD 輔助類,關於各種 GUARD 類型及其適配的鎖類型,請參考下面 GUARD 一章。

有的人可能會說,這個沒有實現性能的提高呀,事實確實是這樣,但是不要急,它還有針對簡單類型的特化,請耐心看下文。

ACE_Atomic_Op <ACE_Thread_Mutex, long>

對於一些簡單類型,例如整型,大多數平台都會提供特有的 CPU 指令來提高多執行緒同步的性能,這個模板特化就封裝了這方面的能力,它在各個平台上依賴的 cpu 指令列表如下:

平台/介面/設施 windows gnuc (pentium) unsupport
single increment InterlockedIncrement “xadd %0, (%1)” : “+r”(1) : “r”(addr) int
single decrement InterlockedDecrement “xadd %0, (%1)” : “+r”(-1) : “r”(addr) n/a
single exchange InterlockedExchange “xchg %0, (%1)” : “+r”(rhs) : “r”(addr) n/a
single exchange add InterlockedExchangeAdd “xadd %0, (%1)” : “+r”(rhs) : “r”(addr) n/a
multiple increment InterlockedIncrement “lock ; xadd %0, (%1)” : “+r”(1) : “r”(addr) n/a
multiple decrement InterlockedDecrement “lock ; xadd %0, (%1)” : “+r”(-1) : “r”(addr) n/a
multiple exchange InterlockedExchange “xchg %0, (%1)” : “+r”(rhs) : “r”(addr) n/a
multiple exchange add InterlockedExchangeAdd “lock ; xadd %0, (%1)” : “+r”(rhs) : “r”(addr) n/a

 

上面 8 個介面可分為兩組,一組為單 CPU 場景下調用的 single xxx;另一組為多 CPU 場景下調用的 multiple xxx。ACE 會在初始化時根據系統 CPU 核心數來確定調用哪組介面,對於 windows 系統而言,兩組介面底層是一致的,因為 Interlockedxxx 介面本身可以應對單核與多核兩個場景。其它平台如果支援 gnuc 和奔騰處理器,則使用 x86 特有的 xadd 與 xchg 指令,該指令中多核與單核唯一不同就是有沒有 lock 指令鎖定相應記憶體,對於 xchg 指令,隱含 lock 指令,所以不用單獨指定。關於介面與操作符的對應關係,可以參考下表:

操作符 介面
operator ++ single/multiple increment
operator — single/multiple decrement
operator += / -= single/multiple exchange add
operator = single/multiple exchange

對於對比運算符,沒有使用原子操作。這裡模板參數中的 ACE_Thread_Mutex 只是一個佔位符,並沒有真正使用 (其實完全可以使用 ACE_Null_Mutex,請參考後面 NULL 這一章)。另外 windows 上非常有用的 InterlockedCompareExchange 沒有封裝進來 (對比並交換,只有在對比結果一致的情況下才交換,很多”無鎖隊列”都是基於這個實現的)。

ACE_Atomic_Op_Ex <LOCK, TYPE>

這個類型與 ACE_Atomic_Op <LOCK, TYPE> 非常類似,不同的是該類型使用外部傳入的 LOCK 實例,從而可以在多個對象之間共享同一個鎖,而 ACE_Atomic_Op 使用的是自己內部的鎖。它們之間雖然介面完全一樣,但是不存在派生關係。其實 ACE_Atomic_Op 內部聚合了一個 ACE_Atomic_Op_Ex 實例,將內部鎖傳遞給後者的構造函數,並將所有操作都委託給後者實現。不過使用這個類型的一個缺點是,無法再針對整型進行特化,從而享受 ACE_Atomic_Op <ACE_Thread_Mutex, long> 帶來性能提升的好處。

GUARD

上面講了很多可以充當鎖的同步對象,可以直接拿來使用,不過在 c++ 中,基於 RAII  的思想,一般將鎖對象包裝在守衛 (GUARD) 對象中,利用 c++ 構造、析構函數被編譯器自動調用的特性,實現鎖的自動釋放,避免因 return / continue / break 甚至拋出異常等離開當前控制流、外加一些人為因素導致的鎖未及時釋放問題。

ACE_Guard <LOCK>

封裝了通用的鎖守衛類,凡是支援以下介面的鎖類型都可適用:

介面 時機
acquire 構造函數 / 明確獲取
tryacquire 非 block 類型的構造函數 / 明確嘗試獲取
release 析構函數 / 明確釋放
remove 明確銷毀

 

ACE_Guard 模板類一般是借用外部的鎖實例,模板參數 LOCK 為借用的鎖類型。在構造函數時加鎖、析構函數解鎖,但也提供了一些介面來提供一定靈活性,例如可以在 guard 實例生命周期內提前調用 release 介面來釋放鎖、在之後某個時機再調用 acqure / tryacquire 再次獲取鎖,甚至直接調用 remove 銷毀底層的鎖。guard 實例內部有一個 acquire / tryacquire 介面的返回值,通過檢查該值 (locked) 來判斷是否加鎖成功,也可以重置該值 (disown) 來避免析構時自動調用 release (用於保持加鎖狀態)。該類型適配的鎖範圍較廣,列表如下:

  • ACE_Thread_Mutex
  • ACE_Recursive_Thread_Mutex
  • ACE_RW_Thread_Mutex
  • ACE_Process_Mutex
  • ACE_RW_Process_Mutex
  • ACE_Thread_Semaphore
  • ACE_Process_Semaphore
  • 任何滿足上面調用約定的自定義類型

具體的性質和使用的鎖類型相關,例如對於 linux 上的 ACE_Thread_Mutex,是不支援遞歸加鎖的,在使用時並不是隨意搭配,而是要考慮使用的場景再選取合適的鎖類型,這一點可以參考前面的說明。

ACE_Read_Guard <LOCK>

派生自 ACE_Guard <LOCK>,封裝了讀寫鎖中讀的一方,凡是支援以下介面的鎖類型即可適用:

介面 時機
acquire_read 構造函數 / 明確獲取
tryacquire_read 非 block 類型的構造函數 / 明確嘗試獲取
release 析構函數 / 明確釋放
remove 明確銷毀

 

它也定義 acquire / tryacquire 介面,不過都重定向到了 acquire_read / tryacquire_read 介面,強制使用讀鎖。該類型適配的讀寫鎖類型列表如下:

  • ACE_RW_Thread_Mutex
  • ACE_RW_Process_Mutex
  • 任何滿足上面調用約定的自定義類型

 

ACE_Write_Guard <LOCK>

派生自 ACE_Guard <TYPE>,封裝了讀寫鎖中讀的一方,凡是支援以下介面的鎖類型即可適用:

介面 時機
acquire_write 構造函數 / 明確獲取
tryacquire_write 非 block 類型的構造函數 / 明確嘗試獲取
release 析構函數 / 明確釋放
remove 明確銷毀

 

它也定義 acquire / tryacquire 介面,不過都重定向到了 acquire_write / tryacquire_write 介面,強制使用寫鎖。該類型適配的讀寫鎖類型列表如下:

  • ACE_RW_Thread_Mutex
  • ACE_RW_Process_Mutex
  • 任何滿足上面調用約定的自定義類型

 

上面羅列了三種類型守衛類型,如果直接使用的話,要這樣寫:

ACE_Guard <ACE_Thread_Mutex> guard (mutex_); 

其中 mutex_ 可以理解成是類的鎖成員,用於類內部所有並發的控制。ACE 提供了一些宏來簡化守衛的定義:

ACE_GUARD (LockType, GuardName, LockObject)
ACE_WRITE_GUARD (LockType, GuardName, LockObject)
ACE_READ_GUARD (LockType, GuardName, LockObject)

上面的程式碼就可以被簡化成:

ACE_GUARD (ACE_Thread_Mutex, guard, mutex_)

是不是清爽了很多?

ACE_TSS_Guard <LOCK>

將鎖和執行緒局部存儲結合起來,就得到每執行緒 (per-thread) 的同步鎖。不過既然執行緒局部存儲已經可以保證只有一個執行緒訪問,那麼加鎖又有什麼用呢?找到源碼中的說明貼出來大家自己理解吧:

/**
 * @class ACE_TSS_Guard
 *
 * @brief This data structure is meant to be used within a method or
 * function...  It performs automatic aquisition and release of
 * a synchronization object.  Moreover, it ensures that the lock
 * is released even if a thread exits via <thr_exit>!
 */

搜遍了整個源程式碼,沒有找到這個類的調用點。所以個人理解這應該是單純為了體現 c++ 模板各種組合帶來的強大能力(?),有點「炫技」的感覺,所以下面只從開拓眼界的角度看一下這個類型的介面。

在構造函數里,通過 init_key 創建了一個 TSS 的根鍵,並將傳入的外部鎖實例作為鍵值設置進去。當調用相關介面時,再通過根鍵獲取鎖實例,並將調用委託給此實例實現。注意這裡的實例並不是 LOCK 本身,而是 ACE_Guard <LOCK>,其實就是通過聚合重用了後者,因此介面及適配的鎖類型與 ACE_Guard 完全一致,這裡不再贅述。

從這裡的實現也可以看出,如果執行緒不是當初創建這個對象的執行緒,那麼當去調用它的一些介面時,對應的底層鎖對象其實是 NULL,將會導致進程直接崩潰。好在 GUARD 類本身就是作為棧上的局部對象使用,一般不涉及超過函數級別共享的問題,如果是一個函數被多個執行緒並發訪問,那麼這種情況下每個執行緒使用自己的 ACE_Guard 對象其實更為合理。

而 TSS  本身是多個執行緒訪問全局或共享變數時,每個執行緒訪問其基於本執行緒的實例,如果這樣來用這個類型的話,則會遇到我之前說的,只有創建該類型實例的執行緒能訪問底層鎖,其它執行緒將得到 NULL 從而崩潰。所以我實在想不出 ACE_TSS_Guard 的任何實用場景,有看出門道的看官可以指點一二則個~~

ACE_TSS_Read_Guard <LOCK>

派生自 ACE_TSS_Guard <LOCK>,封裝了讀寫鎖中讀的一方,適配的鎖類型與 ACE_Read_Guard <LOCK> 相同,與其父類相似,它底層其實是聚合了 ACE_Read_Guard 來實現介面的。

ACE_TSS_Write_Guard <LOCK>

派生自 ACE_TSS_Guard <LOCK>,封裝了讀寫鎖中寫的一方,適配的鎖類型與 ACE_Write_Guard <LOCK> 相同,與其父類相似,它底層其實是聚合了 ACE_Write_Guard 來實現介面的。

 

可能因為使用場景少,沒有定義與 ACE_TSS_xxx_Guard 相關聯的宏來簡化守衛對象的聲明。

BARRIER

從這節開始討論一些基於基本同步對象構建的高級同步對象。BARRIER:柵欄同步,顧名思義,就是當執行緒沒有達到指定的數量時,會堵塞在對應的 BARRIER 上,直到所期待的執行緒都到達後才一次性全部喚醒,從而保證不會有一些執行緒仍滯留在某些程式碼從而導致執行緒競爭的問題 (?)。

ACE_Barrier

通用的柵欄同步類,通過構造函數可以指定要同步的執行緒數量,相同數量的執行緒在需要同步的位置調用該實例的 wait 介面,當到達的執行緒數量不足時,wait 會讓執行緒阻塞,直到到達同步點的執行緒達到指定的數量,才一次性喚醒所有執行緒繼續執行。柵欄同步體可多次使用,linux 上有原生的柵欄同步 api ,不過 ace 考慮可移植性,並沒有直接基於它進行封裝,而是基於條件變數自己實現了一版:

 1 struct ACE_Export ACE_Sub_Barrier
 2 {
 3   // = Initialization.
 4   ACE_Sub_Barrier (unsigned int count,
 5                    ACE_Thread_Mutex &lock,
 6                    const ACE_TCHAR *name = 0,
 7                    void *arg = 0);
 8 
 9   ~ACE_Sub_Barrier (void);
10 
11   /// True if this generation of the barrier is done.
12   ACE_Condition_Thread_Mutex barrier_finished_;
13 
14   /// Number of threads that are still running.
15   int running_threads_;
16 
17   /// Dump the state of an object.
18   void dump (void) const;
19 
20   /// Declare the dynamic allocation hooks.
21   ACE_ALLOC_HOOK_DECLARE;
22 };
23 
24 /**
25  * @class ACE_Barrier
26  *
27  * @brief Implements "barrier synchronization".
28  *
29  * This class allows <count> number of threads to synchronize
30  * their completion of (one round of) a task, which is known as
31  * "barrier synchronization".   After all the threads call <wait()>
32  * on the barrier they are all atomically released and can begin a new
33  * round.
34  *
35  * This implementation uses a "sub-barrier generation numbering"
36  * scheme to avoid overhead and to ensure that all threads wait to
37  * leave the barrier correct.  This code is based on an article from
38  * SunOpsis Vol. 4, No. 1 by Richard Marejka
39  * ([email protected]).
40  */
41 class ACE_Export ACE_Barrier
42 {
43 public:
44   /// Initialize the barrier to synchronize <count> threads.
45   ACE_Barrier (unsigned int count,
46                const ACE_TCHAR *name = 0,
47                void *arg = 0);
48 
49   /// Default dtor.
50   ~ACE_Barrier (void);
51 
52   /// Block the caller until all <count> threads have called <wait> and
53   /// then allow all the caller threads to continue in parallel.
54   int wait (void);
55 
56   /// Dump the state of an object.
57   void dump (void) const;
58 
59   /// Declare the dynamic allocation hooks.
60   ACE_ALLOC_HOOK_DECLARE;
61 
62 protected:
63   /// Serialize access to the barrier state.
64   ACE_Thread_Mutex lock_;
65 
66   /// Either 0 or 1, depending on whether we are the first generation
67   /// of waiters or the next generation of waiters.
68   int current_generation_;
69 
70   /// Total number of threads that can be waiting at any one time.
71   int count_;
72 
73   /**
74    * We keep two <sub_barriers>, one for the first "generation" of
75    * waiters, and one for the next "generation" of waiters.  This
76    * efficiently solves the problem of what to do if all the first
77    * generation waiters don't leave the barrier before one of the
78    * threads calls wait() again (i.e., starts up the next generation
79    * barrier).
80    */
81   ACE_Sub_Barrier sub_barrier_1_;
82   ACE_Sub_Barrier sub_barrier_2_;
83   ACE_Sub_Barrier *sub_barrier_[2];
84 
85 private:
86   // = Prevent assignment and initialization.
87   void operator= (const ACE_Barrier &);
88   ACE_Barrier (const ACE_Barrier &);
89 };

具體的實現是委託給 ACE_Sub_Barrier 來實現的,它內部基於 ACE_Condition_Thread_Mutex 來實現,所以只有支援這個類型的平台才有柵欄同步體的支援,如果沒有的話該類型聲明為空以避免編譯錯誤。ACE_Barrier 內部聚合了兩個該對象進行循環切換以便支援後續的 wait 調用,具體實現細節留在後面詳細說明。

ACE_Thread_Barrier

派生自 ACE_Barrier,可能想構建類似前面同步對象的體系——Thread 表示進程內多執行緒使用,Process 表示進程間多執行緒使用——但是目前底層的依賴的 ACE_Condition_Thread_Mutex 只支援進程內多執行緒,導致 ACE_Barrier 本身就不能跨進程使用,所以目前 ACE_Thread_Barrier 與 ACE_Barrier 完全等價,且沒有對應的 ACE_Process_Barrier 可用 (被 ifdef 注釋掉了)。除了 ACE_Condition_Thread_Mutex 的局限外,即使我們找到了可以跨進程使用的條件變數,ACE_Barrier 內部還有一些變數 (running_threads_ / count_ …) 也需要放在共享記憶體中,這個工作量也是蠻大的,所以目前沒有跨進程的柵欄同步體提供。

TOKEN

TOKEN:令牌同步,是 ACE 抽象的高級同步對象,它實現了可遞歸鎖定、讀寫分離、死鎖檢測、等待通知等高級特性,甚至還支援分散式鎖。

ACE_Token_Proxy

是各種進程內 token 對象的基類,不能直接拿來用,如果想要擴展 token 對象的類型,可以從它派生。通過它我們先來了解一下 TOKEN 體系各個類之間的關係:

 

右側三個類是對外介面,左側三個類是實現,不能直接拿來用。它們存在一一對應的關係,例如 ACE_Local_Mutex 基於 ACE_Mutex_Token 實現;ACE_Local_RLock 和 ACE_Local_WLock  基於 ACE_RW_Token 實現。ACE_Token_Proxy 是所有對外介面類的基類,它是調用者 (執行緒) 的抽象;ACE_Tokens 是所有內部實現類的基類,它是鎖的抽象。一個鎖上可能有多個執行緒,但最多只有一個擁有者,其它執行緒則將自己加入鎖的隊列中,並在自己內部的一個條件變數 (ACE_Condition_Thread_Mutex) 上等待。當擁有者釋放鎖 (release) 時,會自動將等待隊列頭部的執行緒喚醒 (通知內部的條件變數),從而使其獲取鎖。反之,一個執行緒只能在一個鎖上等待,不過可能同時擁有多個鎖 (申請了多份資源)。

ACE_Local_Mutex

本地簡單鎖的抽象,通過派生 ACE_Token_Proxy 並提供 ACE_Mutex_Token 作為底層的實現來製作一個行為類似普通互斥量的同步對象。有的人可能會問了,ACE 自己封裝了一大堆類最後卻做了一個和 ACE_Thread_Mutex 一樣功能的同步對象,有什麼用處呢? 答案是令牌同步對象具有更多高級的功能:

  • 支援遞歸鎖定;
  • 支援死鎖檢測,這是通過依賴另外一個類 (ACE_Token_Manager) 的單例來實現的,所有令牌對象都會在該單例中註冊,在鎖定前,會通過它進行查找,看有無導致死鎖的可能,如果發生了導致死鎖的鎖定,則會直接返回 EDEADLK 錯誤,從而避免死鎖。更進一步,如果打開了調試模式,還會在日誌中列印豐富的資訊,幫助開發者定位互鎖的執行緒及它們競爭的鎖,關於這方面的內容,可以參考我在這篇文章里的回復 《有什麼辦法檢測死鎖阻塞在哪裡么? 》(附錄 18);
  • 在 acquire 中還可以傳遞一個自定義的通知函數,當沒有成功獲取鎖從而進入等待之前,可以調用該函數用來做一些通知工作,通過合理的設計,這個通知函數可以向持有鎖的執行緒發送消息,告訴它釋放鎖,這樣就可以讓當前執行緒很快得到鎖了,而不用「傻乎乎」的進入漫長的等待,關於這一 點,還會在後面 ACE_Token 一節中提到;
  • 最後就是在鎖上等待的執行緒,嚴格的遵循 FIFO 順序,不會出現在一些平台上鎖的一些「不良」實現導致的飢餓問題——喚醒的執行緒是無序的從而有一定概率導致一些執行緒一直陷入等待。

 本類型可用於 ACE_Guard <TYPE> 守衛類型。

ACE_Local_RLock

本地讀鎖的抽象,通過派生 ACE_Token_Proxy 並提供 ACE_RW_Token 作為底層的實現來製作一個行為類似讀寫鎖中讀端的同步對象。後者內部其實也是使用 ACE_Thread_Mutex 外加一個等待者數量來實現的 (而沒有採用平台原生的讀寫鎖 ACE_RW_Thread_Mutex 之類),除了具備上一節中的高級功能外,它還具備以下讀寫鎖的特有性質:

  • 讀鎖在獲取鎖時如果已經有寫鎖,則進入等待隊列;
  • 讀鎖在獲取鎖時如果已經有讀鎖且不是本鎖遞歸加鎖,則進入等待隊列,注意這一點與平台提供的讀寫鎖概念有區別,後者在這種情況下是允許多個讀鎖共存的,而令牌系統的讀寫鎖僅僅是優先順序不同,相互之間也都是互斥的;
  • 當解鎖寫鎖時,且等待隊列的下一個執行緒要求讀鎖時,則會同時喚醒這批連續的讀執行緒,讓它們有同樣的機率爭搶這把鎖。如果解鎖的是讀鎖、或下一個等待執行緒要求寫鎖時,則只喚醒該執行緒,來避免語義錯誤或不必要的競爭;

 本類型可用於 ACE_Read_Guard <TYPE> 守衛類型。

ACE_Local_WLock

本地寫鎖的抽象,通過派生 ACE_Token_Proxy 並提供 ACE_RW_Token 作為底層的實現來製作一個行為類似讀寫鎖中寫端的同步對象。與 ACE_Local_RLock 幾乎完全相同,只是返回的類型為寫鎖、在 ACE_RW_Token 中進入不同的分支條件,從而進入與上面不同的邏輯處理。

本類型可用於 ACE_Write_Guard <TYPE> 守衛類型。

 

限於這篇文章的主題,只討論使用相關的問題,並不討論實現相關的部分,關於 ACE_Mutex_Token / ACE_RW_Token 這裡不展開說明。回顧一下之前講過的模擬讀寫鎖或互斥量 (因平台本身不支援而自定義的),它們內部的實現與這裡的 Token 一定有相通之處,關於這方面的對比,留待後面詳細說明。

ACE_Remote_Token_Proxy

從這節開始,介紹可以跨進程、跨機器協同的令牌系統。同 ACE_Token_Proxy 一樣,ACE_Remote_Token_Proxy 是各種進程間 token 對象的基類,不能直接拿來用,如果想要擴展 token 對象的類型,可以從它派生。通過它我們先來了解一下遠程 TOKEN 系統各個類之間的關係:

這個類圖是在進程內 token 基礎上添加的,其中紅色的三個類是對外介面,位於本地;藍色的三個類是實現,位於一個專門的鎖服務進程內 (可能位於另一台機器)。它們存在一一對應的關係,例如 ACE_Remote_Mutex 對應 ACE_TS_Mutex;ACE_Remote_RLock  對應 ACE_TS_RLock;ACE_Remote_WLock 對應 ACE_TS_WLock,TS 意即 Token Server。

如何將進程內的令牌系統拓展到進程間甚至是跨機器呢?我想你已經猜到答案了,就是通過 tcp 連接,將鎖定的請求發往一個集中的鎖服務,該服務在內部根據機器名+鎖名來唯一標識一把鎖,當多個遠程執行緒試圖鎖定同一把鎖時,只有在本地真正獲得鎖的那個實例會向 tcp 回傳確認數據,從而讓對應的遠程執行緒繼續執行;而其它陷入等待的實例因為沒有任何數據回傳,導致對應的遠程執行緒只能阻塞在同步的 tcp 讀過程中,這相當於另一種形式的鎖定。

當獲得鎖的執行緒解鎖時,同樣會向鎖服務發送一個解鎖請求,鎖服務得到這個請求後,會在本地解鎖對應的鎖,這個過程和之前進程內的解鎖過程並無二致。不同的是,釋放鎖後,鎖會喚醒在隊列上等待的本地執行緒,該執行緒獲取鎖後,將通過 tcp 向對應的遠程執行緒回傳一個應答,從而激活對應的遠程執行緒繼續執行。記得當年看到這裡的實現時,心中不由的稱讚一句——妙哇~ 電腦領域擅長將新問題歸化為已解決問題、從而依賴之前的解決方案的思路,在這裡又得到了一次充分體現。

ACE_Remote_Mutex

遠程簡單鎖的抽象,通過派生 ACE_Remote_Token_Proxy 並提供 ACE_Mutex_Token 作為底層的實現來製作一個行為類似普通互斥量的同步對象。注意,上邊說 ACE_Remote_Mutex 和 ACE_TS_Mutex 是一對一的關係,但這並不表示前者底層包含一個後者,因為他們分屬兩個進程,這裡說的對應關係是指 tcp 通訊層面的,ACE_Remote_Mutex 會將鎖定、解鎖的操作封裝成一個請求,發往鎖服務並分派給對應的 ACE_TS_Mutex 來實現對應的操作,而他們兩個底層其實都是依賴的 ACE_Mutex_Token,這個和本地 token 並無二致。

可能有的人會問了,ACE_TS_Mutex 底層依賴 ACE_Mutex_Token 可以理解,畢竟要做鎖定、解鎖的動作嘛,但是為什麼 ACE_Remote_Mutex 也要依賴這個 ACE_Mutex_Token?它不是把請求發往伺服器了嗎?確實是這樣的,不過只說對了一半,因為 ACE 為進程內場景做了優化,它先嘗試在本地獲取鎖,如果失敗,就說明進程內已經有執行緒獲取這個鎖了,不用”千里迢迢”跑到鎖伺服器再問一遍,這樣可以大大優化進程內場景的性能,只有當本地成功時,才去嘗試遠程鎖伺服器。 同理,在釋放時,也需要記得釋放本地這個「影子」鎖。

 

 本類型可用於 ACE_Guard <TYPE> 守衛類型。

ACE_Remote_RLock

遠程讀鎖的抽象,通過派生 ACE_Remote_Token_Proxy 並提供 ACE_RW_Token 作為底層的實現來製作一個行為類似讀寫鎖中讀端的同步對象。

ACE_Token_Handler 封裝了與請求、應答傳輸相關的邏輯,當鎖的第一個請求到達鎖服務時,後者首先按照請求中聲明的類型創建對應的鎖 (ACE_TS_xxx),然後在它上面應用請求中聲明的操作類型 (acquire / tryacquire / renew / release …),而根據上面的討論,我們知道對應的操作其實是委託給了底層的 ACE_Mutex_Token 或 ACE_RW_Token 去實現了,當操作順利返回或明顯出錯時,ACE_TS_XXX 將通過底層的 Handler 回傳應答,通知請求端結果;當操作被阻塞時,也不回送應答,從而阻塞請求端讀應答操作,造成一種等待鎖的「假象「。

 

本類型可用於 ACE_Read_Guard <TYPE> 守衛類型。

ACE_Remote_WLock

遠程寫鎖的抽象,通過派生 ACE_Remote_Token_Proxy 並提供 ACE_RW_Token 作為底層的實現來製作一個行為類似讀寫鎖中寫端的同步對象。與 ACE_Remote_RLock 幾乎完全相同,只是返回的類型為寫鎖、在 ACE_RW_Token 中進入不同的分支條件,從而進入與上面不同的邏輯處理。

與 ACE_Token_Handler 相關的還有 ACE_Token_Acceptor 和 ACE_TSS_Connection (未在上圖中標出),前者用於鎖伺服器建立埠監聽並創建 ACE_Token_Handler 實例來處理到達的連接上的數據;後者用於鎖請求端建立和鎖伺服器的主動連接,從而發出鎖上的各類請求,關於後者在前面介紹執行緒局部存儲時已經提到,這裡不再贅述,主要補充一點就是這裡使用 TSS 的目的是保證即使同一個進程內的同一類型的 ACE_Remote_XXX 的多個實例也使用獨立的連接,從而保證它們之間互不影響。另外他們都是從 ACE_Acceptor / ACE_Event_Handler / ACE_SOCK_Stream 派生而來,目的是為了重用 ACE 已有的 Acceptor-Connector 框架來簡化連接的建立過程。

本類型可用於 ACE_Write_Guard <TYPE> 守衛類型。

 

其實聰明的讀者已經發現一個問題,就是要想實現同樣數量的遠程執行緒鎖定,鎖服務必需使用同樣多數量的本地執行緒,這確實是一個遠程 TOKEN 的局限。限於這篇文章的主題,只討論使用相關的問題,並不討論實現相關的部分,關於 ACE_TS_Mutex / ACE_TS_RLock / ACE_TS_WLock 這裡不展開說明,關於遠程 token 的實現留在以後詳細說明,關於分散式鎖服務的一些內容可以參考我之前寫的一篇文章《ACE 分散式鎖服務介紹》(附錄 13)。

ACE_Token

前面介紹的令牌系統已經非常豐富了,這裡的 ACE_Token 卻和他們不是一個體系。雖然相互之間沒有什麼直接聯繫,但是它們的設計理念與實現卻非常相似,不同的地方比較少,下面羅列出來做個對比:

  • ACE_Token 一個類包含了 Tokens + ACE_Token_Proxy 及其派生類的所有功能:遞歸鎖定、等待通知 (sleep hook)、等待順序,除了死鎖檢測,基本上都支援;
  • ACE_Token 中等待鎖的執行緒可訂製使用 FIFO 或 LIFO 順序,前者用來保證執行緒分派的公平性,後者用來保證性能。而 Token 系統只支援 FIFO 順序;
  • ACE_Token 針對不同平台,使用不同的喚醒機制,支援 posix 的 unix like 系統使用條件變數 (ACE_Condition_Thread_Mutex),否則使用訊號燈 (ACE_Semaphore),這樣在沒有原生條件變數的平台上 (例如 windows) 上有更好的性能 (不用使用模擬的條件變數了)。而 Token 系統只使用條件變數;
  • ACE_Token 在 ACE 內部有重要應用,而 Tokens 系統雖然支援遠程鎖這種高大上的東東,最終在 ACE 內部使用的非常少。

ACE_Token 在 ACE 內部主要用於 Reactor 內部的通知,而且僅限基於 select 實現的反應器 (ACE_Select_Reactor)。這是由於 select 本身對多執行緒支援不足導致的,眾所周知,當一個執行緒對一組 IO 句柄進行 select 操作且阻塞時,其它執行緒是沒有辦法同時操作這些句柄的,例如註冊、移除或修改關心的事件類型 (讀、寫或OOB),也沒有辦法在上面同時 select。所以通常的做法是在等待執行緒時加一把鎖,來保護這一過程不受其它執行緒競爭的影響。但是這樣一來如何在運行過程中更新句柄集呢? 總不能碰運氣吧,萬一句柄集上一直沒有事件,那用戶的註冊句柄請求豈不等到天荒地老了:

1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler
3   (ACE_Event_Handler *handler,
4    ACE_Reactor_Mask mask)
5 {
6   ACE_TRACE ("ACE_Select_Reactor_T::register_handler");
7   ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
8   return this->register_handler_i (handler->get_handle (), handler, mask);
9 }

 

觀察這段註冊句柄的介面實現,貌似上來直接加把鎖就去幹活了,他是這麼普通,卻又如此自信,難道他就不擔心上面我們提到的問題嗎?在正式開始解答這一系列疑惑之前,先說明一下這裡的 ACE_Select_Reactor_T,它本身是一個模板類,模板參數是加鎖的類型:

 1 template <class ACE_SELECT_REACTOR_TOKEN>
 2 class ACE_Select_Reactor_T : public ACE_Select_Reactor_Impl
 3 {
 4 ……
 5 }; 
 6 
 7 typedef ACE_Token ACE_SELECT_TOKEN;
 8 
 9 typedef ACE_Select_Reactor_Token_T<ACE_SELECT_TOKEN> ACE_Select_Reactor_Token;
10 
11 typedef ACE_Select_Reactor_T<ACE_Select_Reactor_Token> ACE_Select_Reactor;

 

而 ACE_Select_Reactor 正是 ACE_Select_Reactor_T 使用 ACE_Select_Reactor_Token 作為模板參數的 typedef,說到這裡,有的人可能已經暈了,不過不要緊,這裡只是說明上面那個介面確實是 ACE_Select_Reactor 的一部分。現在回到正題,為什麼這段程式碼可以工作而不是死等呢?原因就在於他使用的 ACE_Select_Reactor_Token 其實就是 ACE_Select_Reactor_Token_T <ACE_Token>,這個形式表明這個類型其實就是一個從 ACE_Token 派生的類型,主要重寫了後者的 sleep_hook 方法:

1 template <class ACE_SELECT_REACTOR_MUTEX>
2 class ACE_Select_Reactor_Token_T : public ACE_SELECT_REACTOR_MUTEX
3 {
4 public:
5  ……
6   /// Called just before the ACE_Event_Handler goes to sleep.
7   virtual void sleep_hook (void);
8  ……
9 };

 

這樣,當另外執行緒試圖註冊句柄時,如果因主執行緒阻塞在 select 上導致 token 獲取失敗時,將有機會通過 sleep hook 向 reactor 發出一個通知,這個通知將導致主執行緒的 select 被喚醒,從而讓出 token 為我們所持有,進而這裡可以進行句柄更新。當額外的執行緒更新完畢離開這個函數從而釋放 token 時,又會將所有權重新轉移給在鎖上等待的主執行緒,讓它繼續 select 更新後的句柄集。

有的人可能會問為什麼向 reactor 發一個通知就可以讓阻塞在 select 上的主執行緒退出,其實這裡涉及到了一個小技巧,即 self-pipe-trick,在初始化時創建一對自連接的 tcp / pipe 句柄,將他們默認加入到 select 的句柄集中,當需要解除阻塞時,在上面寫入一個位元組就可以了,巧妙吧~

ok,解釋了這麼多,或許還有人一頭霧水。沒關係,當初我看這段程式碼時,也沒想到 ACE 會在這麼不經意的一行鎖定程式碼時塞入這麼多邏輯,巧則巧矣,只是「偽裝」的太好了,以至於我一開始根本沒發現這裡的玄機。是後來看到 token 的實現,又搜索整個程式碼庫中的實現,才發現這裡別有天地。關於 ACE_Select_Reactor 的更多內容,請參考我之前寫的一篇文章《ACE_Select_Reactor 多執行緒通知機制分析》(附錄 12)。

對於 ACE_Select_Reactor 而言,這個機制可能還不明顯,畢竟有些同學是在單執行緒環境下使用這個反應器;但是對於 ACE_TP_Reactor 來說,這個 ACE_Token 就至為重要了,因為它本身就是要在只支援單執行緒的 select 上使用執行緒池來優化性能的 (TP 即 Thread Pool),一堆執行緒跑起來,如果只是「嘎嘣」一下上把鎖,那和 ACE_Select_Reactor 又有何異呢?所以 ACE 這裡別出新裁,只鎖定事件偵測段,不鎖定事件分發段,來保證不確定時間的事件分發回調不會影響處理其它連接上到達的請求。

這也是 ACE_TP_Reactor 與 ACE_Select_Reactor 最大的不同,雖然前者派生自後者,但是它為了保證一個連接的一個請求在處理過程中不會被另外的執行緒對同樣的請求繼續分派 (例如有讀事件時,當數據未讀取完成前,select 一直會報告該句柄有可讀事件),從而導致的多執行緒競爭問題,它在分派一個連接上的事件時,會自動將對應的句柄從當前偵測句柄集中移除,直到連接上的數據被處理完成後,才將該句柄加回來 (resume_handler)。

於是我們看到 ACE_Token 使用最多的場景就是當連接上數據處理完成後 resume 的一刻,此時一般已經有一個執行緒在句柄集上偵測事件了,且陷入了阻塞,如果想把這個處理完數據的句柄再加入進去,必需先通知正在 select 的執行緒退出阻塞並讓出所有權,所有這一切都是一行鎖定程式碼搞定:

1 template <class ACE_SELECT_REACTOR_TOKEN> int
2 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_HANDLE handle)
3 {
4   ACE_TRACE ("ACE_Select_Reactor_T::resume_handler");
5   ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1));
6   return this->resume_i (handle);
7 }

 

記得當時為了印證我的觀點,還特意增加了 sleep_hook 中的日誌並重新編譯 ACE 運行進程觀察日誌輸出。關於 ACE_TP_Reactor 的更多內容,請參考我之前寫過的一篇文章《ACE_TP_Reactor 實現 Leader-Follower 執行緒模型分析》(附錄 14)。

既然 ACE_Token 如此好用,為什麼不在所有的反應器中使用呢?答案是其它的多路事件分派 api 大多數是支援多執行緒的,例如 epoll 雖然不支援多個執行緒同時 epoll_wait,但是可以在一個執行緒 wait 時另外的執行緒修改句柄及事件集合,這種修改會實時的反映到當前 wait 的執行緒中,就大大減少了執行 epoll_wait 的執行緒無謂的頻繁喚醒,提高了性能;更不要說,基於 windows 完成埠 (iocp) 實現的前攝器 (proactor) ,可以直接通過 PostQueuedComplectionStatus 向完成埠發送任意通知,且 GetQueuedCompletionStatus 本身就是支援多執行緒從 iocp 獲取事件的。關於 epoll 和 iocp 的論述,請參考我另一篇文章 《[apue] epoll 的一些不為人所注意的特性》。這種 self-pipe-trick 廣泛用於基於 select 的事件驅動庫,例如 libevent,關於該技巧引發的一場血案,並由此衍生的 gevent 框架,請參考我寫的另一篇文章:《一個工業級、跨平台、輕量級的 tcp 網路服務框架:gevent 》。

說了許多與 ACE_Token 本身不相關的內容,主要是解釋這個類型存在的必要性,其實它在 ACE 中有特定的用途,不一定適合通用場景,反而是之前介紹的 Token 系統比較通用,如果你不在意擴展性和死鎖檢測功能,可以基於 ACE_Token 派生自己的類型去使用,特別是它的 sleep_hook 等待通知功能,一定要利用起來,不然和使用 Mutex 沒有什麼兩樣。

NULL

ACE 為了提供靈活性,對鎖類型採用模板參數的方式提供,便於用戶根據自己的實際場景選擇合適的鎖類型。但是這也帶來了一個問題,就是當用戶所在的場景明確是單執行緒環境不需要鎖的時候,也要提供一個鎖類型,從而造成性能下降。為了解決這個問題,ACE 使用空類型 (ACE_Null_XXX) 來適配單執行緒環境。

ACE_Null_Mutex

適配互斥量類型,包括但不限於:

  • ACE_Thread_Mutex
  • ACE_Recursive_Thread_Mutex
  • ACE_RW_Thread_Mutex
  • ACE_RW_Process_Mutex
  • ACE_RW_Mutex
  • ……

凡是可以使用以上類型的,都可以通過 ACE_Null_Mutex 來適配單執行緒版本。適用於以下守衛類型:

  • ACE_Guard
  • ACE_Read_Guard
  • ACE_Write_Guard

其實上述幾個守衛類型針對 ACE_Mull_Mutex 參數做了模板特化,它們壓根不會調用後者的介面,而是直接用返回 0 來獲得更好的性能。

ACE_Null_Semaphore

適配訊號燈類型,包括但不限於:

  • ACE_Thread_Semaphore
  • ACE_Process_Semaphore
  • ……

凡是可以使用以上類型的,都可以通過 ACE_Null_Semaphore 來適配單執行緒版本。適用於以下守衛類型:

  • ACE_Guard

對於帶 timeout 參數的 acquire 操作,ACE_Null_Semaphore 直接返回超時錯誤,因為它無法模擬被另一個執行緒喚醒的場景,否則就不是 NULL object 了。

ACE_Null_Condition

適配條件變數類型,包括但不限於:

  • ACE_Condition_Thread_Mutex
  • ACE_Recursive_Condition_Thread_Mutex
  • ACE_Condition <TYPE>
  • ……

凡是可以使用以上類型的,都可以通過 ACE_Null_Condition 來適配單執行緒版本。對於條件變數,無守衛類型可用。

同 ACE_Null_Semaphore 一樣,對於帶 timeout 參數的 wait 操作,ACE_Null_Condition 直接返回超時錯誤。

ACE_Null_Barrier

適配柵欄同步體,包括但不限於:

  • ACE_Barrier
  • ACE_Thread_Barrier
  • ……

凡是可以使用以上類型的,都可以通過 ACE_Null_Barrier 來適配單執行緒版本。無守衛類型可用。

對於 wait 操作,ACE_Null_Barrier 直接返回 0 表示等到了所有需要同步的執行緒。

ACE_Null_Token

適配令牌同步體,包括但不限於:

  • ACE_Local_Mutex
  • ACE_Local_RLock
  • ACE_Local_WLock
  • ACE_Remote_Mutex
  • ACE_Remote_RLock
  • ACE_Remote_WLock
  • ……

凡是可以使用以上類型的,都可以通過 ACE_Null_Token 來適配單執行緒版本。適用於以下守衛類型:

  • ACE_Guard
  • ACE_Read_Guard
  • ACE_Write_Guard

由於它的 create_token 虛函數直接返回空,所以對應的所有操作都直接返回 ENOENT 錯誤。

ACE_Noop_Token

專門適配 ACE_Token 類型,適用的守衛類型與 ACE_Token 相同。

由於 ACE_Token 應用於 ACE_Select_Reactor,所以它的單執行緒版本其實就是使用 ACE_Noop_Token 實現的。

另外 ACE_Guard 專門針對 Reactor 中使用的令牌 (ACE_Select_Reactor_Token_T <ACE_Noop_Token>) 參數作了模板特化,它壓根不會調用後者的介面,而是直接用返回 0 來獲得更好的性能。

結語

以上內容根據 ACE 5.4.1 版本整理,現在最新版本已經到了 7.0.0,看上去根據功能拆分了模組,想使用哪一部分就包含哪一部分,不存在一帶一大坨這種問題了,不過限於精力沒有進一步詳細研究,感興趣的同學可以自行前往官方網站查看文檔。

關於一些模擬類型的實現,限於篇幅就不在本文中展開詳述了,後面將開一個系列分別介紹這些模擬類型的實現,名字我都想好了,就叫 simlock 吧,打算支援 linux / mac / windows  三個平台,基於 c++ 構建,可能沒有 ACE 那樣面面俱到,但每個設施做的盡量獨立且輕量級,可以單獨拿來使用那種。做這個庫的目的,一是為了復用; 二就是為了學習,比如 ace 中執行緒局部存儲的模擬實現,大大降低了作業系統的神聖感,基本就是一個大數組,讓人有種不過如此的趕腳,有助於深入理解平台提供的各種同步設施的理解,感興趣的同學可以持續關注。又給自己挖了個大坑,希望能如約填上……

參考

[1]. 用pthread進行進程間同步

[2]. Solaris 執行緒和 POSIX 執行緒的 API

[3]. 關於作業系統中進程、執行緒、和任務之間的關係

[4]. acejoy

[5]. system V訊號量和Posix訊號量

[6]. SunOS與Solaris系統的對應關係

[7]. C/C++跨平台的的預編譯宏

[8]. Unix (Solaris) Threads and Semaphores

[9]. ACE網路編程 –ACE庫入門:中篇-ACE程式設計師教程

[10]. ACE TSS 自動清理機制分析與應用

[11]. ACE 柵欄同步體介紹

[12]. ACE_Select_Reactor 多執行緒通知機制分析

[13]. ACE 分散式鎖服務介紹

[14]. ACE_TP_Reactor 實現 Leader-Follower 執行緒模型分析

[15]. ACE Readers/Writer 鎖介紹

[16]. Linux 的多執行緒編程的高效開發經驗

[17]. ACE 示例中的一個多執行緒問題分析

[18]. 有什麼辦法檢測死鎖阻塞在哪裡么?