優先順序反轉那些事兒

作者:崔曉兵

從一個線上問題說起

最近在線上遇到了一些[HMDConfigManager remoteConfigWithAppID:]卡死

初步分析

觀察了下主執行緒堆棧,用到的鎖是讀寫鎖

圖片

隨後又去翻了下持有著鎖的子執行緒,有各種各樣的情況,且基本都處於正常的執行狀態,例如有的處於打開文件狀態,有的處於read狀態,有的正在執行NSUserDefaults的方法…圖片圖片圖片通過觀察發現,出問題的執行緒都有QOS:BACKGROUND標記。整體看起來持有鎖的子執行緒仍然在執行,只是留給主執行緒的時間不夠了。為什麼這些子執行緒在持有鎖的情況下,需要執行這麼久,直到主執行緒的8s卡死?一種情況就是真的如此耗時,另一種則是出現了優先順序反轉。

解決辦法

在這個案例裡面,持有讀寫鎖且優先順序低的執行緒遲遲得不到調度(又或者得到調度的時候又被搶佔了,或者得到調度的時候時間已然不夠了),而具有高優先順序的執行緒由於拿不到讀寫鎖,一直被阻塞,所以互相死鎖。iOS8之後引入了QualityOfService的概念,類似於執行緒的優先順序,設置不同的QualityOfService的值後系統會分配不同的CPU時間、網路資源和硬碟資源等,因此我們可以通過這個設置隊列的優先順序 。

方案一:去除對NSOperationQueue的優先順序設置

在 Threading Programming Guide 文檔中,蘋果給出了提示:

Important: It is generally a good idea to leave the priorities of your threads at their default values. Increasing the priorities of some threads also increases the likelihood of starvation among lower-priority threads. If your application contains high-priority and low-priority threads that must interact with each other, the starvation of lower-priority threads may block other threads and create performance bottlenecks.

蘋果的建議是不要隨意修改執行緒的優先順序,尤其是這些高低優先順序執行緒之間存在臨界資源競爭的情況。所以刪除相關優先順序設置程式碼即可解決問題。

方案二:臨時修改執行緒優先順序

在 pthread_rwlock_rdlock(3pthread) 發現了如下提示:

Realtime applications may encounter priority inversion when using read-write locks. The problem occurs when a high priority thread 「locks」 a read-write lock that is about to be 「unlocked」 by a low priority thread, but the low priority thread is preempted by a medium priority thread. This scenario leads to priority inversion; a high priority thread is blocked by lower priority threads for an unlimited period of time. During system design, realtime programmers must take into account the possibility of this kind of priority inversion. They can deal with it in a number of ways, such as by having critical sections that are guarded by read-write locks execute at a high priority, so that a thread cannot be preempted while executing in its critical section.

儘管針對的是實時系統,但是還是有一些啟示和幫助。按照提示,對有問題的程式碼進行了修改:在執行緒通過pthread_rwlock_wrlock拿到_rwlock的時候,臨時提升其優先順序,在釋放_rwlock之後,恢復其原先的優先順序

- (id)remoteConfigWithAppID:(NSString *)appID
{
    .......
    pthread_rwlock_rdlock(&_rwlock);
    HMDHeimdallrConfig *result = ....... // get existing config
    pthread_rwlock_unlock(&_rwlock);
    
    if(result == nil) {
        result = [[HMDHeimdallrConfig alloc] init]; // make a new config
        pthread_rwlock_wrlock(&_rwlock);
        
        qos_class_t oldQos = qos_class_self();
        BOOL needRecover = NO;
        
        // 臨時提升執行緒優先順序
        if (_enablePriorityInversionProtection && oldQos < QOS_CLASS_USER_INTERACTIVE) {
            int ret = pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0);
            needRecover = (ret == 0);
        }
            
        ......

        pthread_rwlock_unlock(&_rwlock);
        
        // 恢復執行緒優先順序
        if (_enablePriorityInversionProtection && needRecover) {
            pthread_set_qos_class_self_np(oldQos, 0);
        }
    }
    
    return result;
}

值得注意的是,這裡只能使用pthreadapiNSThread提供的API是不可行的

Demo 驗證

為了驗證上述的手動調整執行緒優先順序是否有一定的效果,這裡通過demo進行本地實驗:定義了2000operation(目的是為了CPU繁忙),優先順序設置NSQualityOfServiceUserInitiated,且對其中可以被100整除的operation的優先順序調整為NSQualityOfServiceBackground,在每個operation執行相同的耗時任務,然後對這被選中的10operation進行耗時統計。

for (int j = 0; j < 2000; ++j) {
    NSOperationQueue *operation = [[NSOperationQueue alloc] init];
    operation.maxConcurrentOperationCount = 1;
    operation.qualityOfService = NSQualityOfServiceUserInitiated;
    
    // 模組1
    // if (j % 100 == 0) {
    //    operation.qualityOfService = NSQualityOfServiceBackground;
    // }
    // 模組1
    
    [operation addOperationWithBlock:^{
        // 模組2
        // qos_class_t oldQos = qos_class_self();
        // pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0);
        // 模組2
        
        NSTimeInterval start = CFAbsoluteTimeGetCurrent();
        double sum = 0;
        for (int i = 0; i < 100000; ++i) {
            sum += sin(i) + cos(i) + sin(i*2) + cos(i*2);
        }
        start = CFAbsoluteTimeGetCurrent() - start;
        if (j % 100 == 0) {
            printf("%.8f\n", start * 1000);
        }
        
        // 模組2
        // pthread_set_qos_class_self_np(oldQos, 0);
        // 模組2
    }];
}

統計資訊如下圖所示

A B C
(注釋模組1和模組2程式碼) (只打開模組1程式碼) (同時打開模組1和模組2程式碼)
11.8190561 94.70210189 15.04005137

可以看到

  1. 正常情況下,每個任務的平均耗時為:11.8190561;
  2. operation被設置為低優先順序時,其耗時大幅度提升為:94.70210189;
  3. operation被設置為低優先順序時,又在Block中手動恢復其原有的優先順序,其耗時已經大幅度降低:15.04005137( 耗時比正常情況高,大家可以思考下為什麼)

通過Demo可以發現,通過手動調整其優先順序,低優先順序任務的整體耗時得到大幅度的降低,這樣在持有鎖的情況下,可以減少對主執行緒的阻塞時間。

上線效果

圖片

該問題的驗證過程分為2個階段:

  1. 第一個階段如第1個紅框所示,從36號開始在版本19.7上有較大幅度的下降,主要原因:堆棧中被等待的隊列資訊由QOS:BACKGROUND變為了com.apple.root.default-qos,隊列的優先順序從QOS_CLASS_BACKGROUND提升為QOS_CLASS_DEFAULT,相當於實施了方案一,使用了默認優先順序。
  2. 第二個階段如第2個紅框所示,從424號在版本20.3上開始驗證。目前看起來效果暫時不明顯,推測一個主要原因是:demo中是把優先順序從QOS_CLASS_BACKGROUND提升為QOS_CLASS_USER_INITIATED,而線上相當於把隊列的優先順序從默認的優先順序QOS_CLASS_DEFAULT提升為QOS_CLASS_USER_INITIATED所以相對來說,線上的提升相對有限。
    1. QOS_CLASS_BACKGROUNDMach層級優先順序數是4;
    2. QOS_CLASS_DEFAULTMach層級優先順序數是31;
    3. QOS_CLASS_USER_INITIATEDMach層級優先順序數是37;

深刻理解優先順序反轉

那麼是否所有鎖都需要像上文一樣,手動提升持有鎖的執行緒優先順序?系統是否會自動調整執行緒的優先順序?如果有這樣的機制,是否可以覆蓋所有的鎖?要理解這些問題,需要深刻認識優先順序反轉。

什麼是優先順序反轉?

優先順序反轉,是指某同步資源被較低優先順序的進程/執行緒所擁有,較高優先順序的進程/執行緒競爭該同步資源未獲得該資源,而使得較高優先順序進程/執行緒反而推遲被調度執行的現象。根據阻塞類型的不同,優先順序反轉又被分為Bounded priority inversionUnbounded priority inversion。這裡藉助 Introduction to RTOS – Solution to Part 11 (Priority Inversion) 的圖進行示意。

Bounded priority inversion

如圖所示,高優先順序任務(Task H)被持有鎖的低優先順序任務(Task L)阻塞,由於阻塞的時間取決於低優先順序任務在臨界區的時間(持有鎖的時間),所以被稱為bounded priority inversion。只要Task L一直持有鎖,Task H就會一直被阻塞,低優先順序的任務運行在高優先順序任務的前面,優先順序被反轉。

這裡的任務也可以理解為執行緒

圖片

Unbounded priority inversion

Task L持有鎖的情況下,如果有一個中間優先順序的任務(Task M)打斷了Task L,前面的bounded就會變為unbounded,因為Task M只要搶佔了Task LCPU,就可能會阻塞Task H任意多的時間(Task M可能不止1個)

圖片

優先順序反轉常規解決思路

目前解決Unbounded priority inversion2種方法:一種被稱作優先權極限(priority ceiling protocol),另一種被稱作優先順序繼承(priority inheritance)。

Priority ceiling protocol

在優先權極限方案中,系統把每一個臨界資源與1個極限優先權相關聯。當1個任務進入臨界區時,系統便把這個極限優先權傳遞給這個任務,使得這個任務的優先權最高;當這個任務退出臨界區後,系統立即把它的優先權恢復正常,從而保證系統不會出現優先權反轉的情況。該極限優先權的值是由所有需要該臨界資源的任務的最大優先順序來決定的。

如圖所示,鎖的極限優先權是3。當Task L持有鎖的時候,它的優先順序將會被提升到3,和Task H一樣的優先順序。這樣就可以阻止Task M(優先順序是2)的運行,直到Task LTask H不再需要該鎖。

圖片

Priority inheritance

在優先順序繼承方案中,大致原理是:高優先順序任務在嘗試獲取鎖的時候,如果該鎖正好被低優先順序任務持有,此時會臨時把高優先順序執行緒的優先順序轉移給擁有鎖的低優先順序執行緒,使低優先順序執行緒能更快的執行並釋放同步資源,釋放同步資源後再恢復其原來的優先順序。

圖片

priority ceiling protocolpriority inheritance都會在釋放鎖的時候,恢復低優先順序任務的優先順序。同時要注意,以上2種方法只能阻止Unbounded priority inversion,而無法阻止Bounded priority inversionTask H必須等待Task L執行完畢才能執行,這個反轉是無法避免的)。

可以通過以下幾種發生來避免或者轉移Bounded priority inversion

  1. 減少臨界區的執行時間,減少Bounded priority inversion的反轉耗時;
  2. 避免使用會阻塞高優先順序任務的臨界區資源;
  3. 專門使用一個隊列來管理資源,避免使用鎖。

優先順序繼承必須是可傳遞的。舉個栗子:當T1阻塞在被T2持有的資源上,而T2又阻塞在T3持有的一個資源上。如果T1的優先順序高於T2T3的優先順序,T3必須通過T2繼承T1的優先順序。否則,如果另外一個優先順序高於T2T3,小於T1的執行緒T4,將搶佔T3,引發相對於T1的優先順序反轉。因此,執行緒所繼承的優先順序必須是直接或者間接阻塞的執行緒的最高優先順序。

如何避免優先順序反轉?

QoS 傳遞

iOS 系統主要使用以下兩種機制來在不同執行緒(或 queue)間傳遞 QoS

  • 機制1:dispatch_async
    • dispatch_async() automatically propagates the QoS from the calling thread, though it will translate User Interactive to User Initiated to avoid assigning that priority to non-main threads.
    • Captured at time of block submission, translate user interactive to user initiated. Used if destination queue does not have a QoS and does not lower the QoS (ex dispatch_async back to the main thread)
  • 機制2:基於 XPC 的進程間通訊(IPC

系統的 QoS 傳遞規則比較複雜,主要參考以下資訊:

  • 當前執行緒的 QoS
  • 如果是使用 dispatch_block_create() 方法生成的 dispatch_block,則考慮生成 block 時所調用的參數
  • dispatch_async 或 IPC 的目標 queue 或執行緒的 QoS

調度程式會根據這些資訊決定 block 以什麼優先順序運行。

  1. 如果沒有其他執行緒同步地等待此 block,則 block 就按上面所說的優先順序來運行。
  2. 如果出現了執行緒間同步等待的情況,則調度程式會根據情況調整執行緒的運行優先順序。

如何觸發優先順序反轉避免機制?

如果當前執行緒因等待某執行緒(執行緒1)上正在進行的操作(如 block1)而受阻,而系統知道 block1 所在的目標執行緒(owner),系統會通過提高相關執行緒的優先順序來解決優先順序反轉的問題。反之如果系統不知道 block1 所在目標執行緒,則無法知道應該提高誰的優先順序,也就無法解決反轉問題;

記錄了持有者資訊(owner)的系統 API 如下:

  1. pthread mutexos_unfair_lock、以及基於這二者實現的上層 API
    1. dispatch_once 的實現是基於 os_unfair_lock 的
    2. NSLockNSRecursiveLock@synchronized 等的實現是基於 pthread mutex
  2. dispatch_syncdispatch_wait
  3. xpc_connection_send_with_message_sync

使用以上這些 API 能夠在發生優先順序反轉時使系統啟用優先順序反轉避免機制

基礎API驗證

接下來對前文提到的各種「基礎系統API」進行驗證

測試驗證環境:模擬器 iOS15.2

pthread mutex

pthread mutex的數據結構pthread_mutex_s其中有一個m_tid欄位,專門來記錄持有該鎖的執行緒Id

// types_internal.h
struct pthread_mutex_s {
        long sig;
        _pthread_lock lock;
        union {
                uint32_t value;
                struct pthread_mutex_options_s options;
        } mtxopts;
        int16_t prioceiling;
        int16_t priority;
#if defined(__LP64__)
        uint32_t _pad;
#endif
        union {
                struct {
                        uint32_t m_tid[2]; // thread id of thread that has mutex locked
                        uint32_t m_seq[2]; // mutex sequence id
                        uint32_t m_mis[2]; // for misaligned locks m_tid/m_seq will span into here
                } psynch;
                struct _pthread_mutex_ulock_s ulock;
        };
#if defined(__LP64__)
        uint32_t _reserved[4];
#else
        uint32_t _reserved[1];
#endif
};

程式碼來驗證一下:執行緒優先順序是否會被提升?

// printThreadPriority用來列印執行緒的優先順序資訊
void printThreadPriority() {
  thread_t cur_thread = mach_thread_self();
  mach_port_deallocate(mach_task_self(), cur_thread);
  mach_msg_type_number_t thread_info_count = THREAD_INFO_MAX;
  thread_info_data_t thinfo;
  kern_return_t kr = thread_info(cur_thread, THREAD_EXTENDED_INFO, (thread_info_t)thinfo, &thread_info_count);
  if (kr != KERN_SUCCESS) {
    return;
  }
  thread_extended_info_t extend_info = (thread_extended_info_t)thinfo;
  printf("pth_priority: %d, pth_curpri: %d, pth_maxpriority: %d\n", extend_info->pth_priority, extend_info->pth_curpri, extend_info->pth_maxpriority);
}

先在子執行緒上鎖並休眠,然後主執行緒請求該鎖

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
  printf("begin : \n");
  printThreadPriority();
  printf("queue before lock \n");
  pthread_mutex_lock(&_lock); //確保 backgroundQueue 先得到鎖
  printf("queue lock \n");
  printThreadPriority();
  dispatch_async(dispatch_get_main_queue(), ^{
    printf("before main lock\n");
    pthread_mutex_lock(&_lock);
    printf("in main lock\n");
    pthread_mutex_unlock(&_lock);
    printf("after main unlock\n");
  });
  sleep(10);
  printThreadPriority();
  printf("queue unlock\n");
  pthread_mutex_unlock(&_lock);
  printf("queue after unlock\n");
});
begin : 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock 
queue lock 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock

可以看到,低優先順序子執行緒先持有鎖,當時的優先順序為4,而該鎖被主執行緒請求的時候,子執行緒的優先順序被提升為47

os_unfair_lock

os_unfair_lock用來替換OSSpinLock,解決優先順序反轉問題。等待os_unfair_lock鎖的執行緒會處於休眠狀態,從用戶態切換到內核態,而並非忙等。os_unfair_lock將執行緒ID保存到了鎖的內部,鎖的等待者會把自己的優先順序讓出來,從而避免優先順序反轉。驗證一下:

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
    printf("begin : \n");
    printThreadPriority();
    printf("queue before lock \n");
    os_unfair_lock_lock(&_unfair_lock); //確保 backgroundQueue 先得到鎖
    printf("queue lock \n");
    printThreadPriority();
    dispatch_async(dispatch_get_main_queue(), ^{
      printf("before main lock\n");
      os_unfair_lock_lock(&_unfair_lock);
      printf("in main lock\n");
      os_unfair_lock_unlock(&_unfair_lock);
      printf("after main unlock\n");
    });
    sleep(10);
    printThreadPriority();
    printf("queue unlock\n");
    os_unfair_lock_unlock(&_unfair_lock);
    printf("queue after unlock\n");
  });
begin : 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock 
queue lock 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock

結果和pthread mutex一致

pthread_rwlock_t

在 pthread_rwlock_init 有如下提示:

Caveats: Beware of priority inversion when using read-write locks. A high-priority thread may be blocked waiting on a read-write lock locked by a low-priority thread. The microkernel has no knowledge of read-write locks, and therefore can』t boost the low-priority thread to prevent the priority inversion.

大意是內核不感知讀寫鎖,無法提升低優先順序執行緒的優先順序,從而無法避免優先順序反轉。通過查詢定義發現:pthread_rwlock_s包含了欄位rw_tid,專門來記錄持有寫鎖的執行緒,這不由令人好奇:為什麼pthread_rwlock_sowner資訊卻仍然無法避免優先順序反轉?

struct pthread_rwlock_s {
        long sig;
        _pthread_lock lock;
        uint32_t
                unused:29,
                misalign:1,
                pshared:2;
        uint32_t rw_flags;
#if defined(__LP64__)
        uint32_t _pad;
#endif
        uint32_t rw_tid[2]; // thread id of thread that has exclusive (write) lock
        uint32_t rw_seq[4]; // rw sequence id (at 128-bit aligned boundary)
        uint32_t rw_mis[4]; // for misaligned locks rw_seq will span into here
#if defined(__LP64__)
        uint32_t _reserved[34];
#else
        uint32_t _reserved[18];
#endif
};

//news.ycombinator.com/item?id=21751269 鏈接中提到:

xnu supports priority inheritance through 「turnstiles」, a kernel-internal mechani** which is used by default by a number of locking primitives (list at [1]), including normal pthread mutexes (though not read-write locks [2]), as well as the os_unfair_lock API (via the ulock syscalls). With pthread mutexes, you can actually explicitly request priority inheritance by calling pthread_mutexattr_setprotocol [3] with PTHREAD_PRIO_INHERIT; the Apple implementation supports it, but currently ignores the protocol setting and just gives all mutexes priority inheritance.

大意是:XNU使用turnstiles內核機制進行優先順序繼承,這種機制被應用在pthread mutexos_unfair_lock上。

順藤摸瓜,在ksyn_wait方法中找到了_kwq_use_turnstile的調用,其中的注釋對讀寫鎖解釋的比較委婉,添加了at least sometimes

pthread mutexes and rwlocks both (at least sometimes)  know their owner and can use turnstiles. Otherwise, we pass NULL as the tstore to the shims so they wait on the global waitq.

// libpthread/kern/kern_synch.c
int
ksyn_wait(ksyn_wait_queue_t kwq, kwq_queue_type_t kqi, uint32_t lockseq,
                int fit, uint64_t abstime, uint16_t kwe_flags,
                thread_continue_t continuation, block_hint_t block_hint)
{
        thread_t th = current_thread();
        uthread_t uth = pthread_kern->get_bsdthread_info(th);
        struct turnstile **tstore = NULL;
        int res;

        assert(continuation != THREAD_CONTINUE_NULL);

        ksyn_waitq_element_t kwe = pthread_kern->uthread_get_uukwe(uth);
        bzero(kwe, sizeof(*kwe));
        kwe->kwe_count = 1;
        kwe->kwe_lockseq = lockseq & PTHRW_COUNT_MASK;
        kwe->kwe_state = KWE_THREAD_INWAIT;
        kwe->kwe_uth = uth;
        kwe->kwe_thread = th;
        kwe->kwe_flags = kwe_flags;

        res = ksyn_queue_insert(kwq, kqi, kwe, lockseq, fit);
        if (res != 0) {
                //panic("psynch_rw_wrlock: failed to enqueue\n"); // XXX                ksyn_wqunlock(kwq);
                return res;
        }

        PTHREAD_TRACE(psynch_mutex_kwqwait, kwq->kw_addr, kwq->kw_inqueue,
                        kwq->kw_prepost.count, kwq->kw_intr.count);

        if (_kwq_use_turnstile(kwq)) {
                // pthread mutexes and rwlocks both (at least sometimes) know their                
                // owner and can use turnstiles. Otherwise, we pass NULL as the                
                // tstore to the shims so they wait on the global waitq.                
                tstore = &kwq->kw_turnstile;
        }
        ......
}

再去查看_kwq_use_turnstile的定義,程式碼還是很誠實的,只有在KSYN_WQTYPE_MTX才會啟用turnstile進行優先順序反轉保護,而讀寫鎖的類型為KSYN_WQTYPE_RWLOCK,這說明讀寫鎖不會使用_kwq_use_turnstile,所以無法避免優先順序反轉。

#define KSYN_WQTYPE_MTX         0x01
#define KSYN_WQTYPE_CVAR        0x02
#define KSYN_WQTYPE_RWLOCK      0x04
#define KSYN_WQTYPE_SEMA        0x08

static inline bool
_kwq_use_turnstile(ksyn_wait_queue_t kwq)
{
        // If we had writer-owner information from the
        // rwlock then we could use the turnstile to push on it. For now, only
        // plain mutexes use it.
        return (_kwq_type(kwq) == KSYN_WQTYPE_MTX);
}

另外在_pthread_find_owner也可以看到,讀寫鎖的owner0

void
_pthread_find_owner(thread_t thread,
                struct stackshot_thread_waitinfo * waitinfo)
{
        ksyn_wait_queue_t kwq = _pthread_get_thread_kwq(thread);
        switch (waitinfo->wait_type) {
                case kThreadWaitPThreadMutex:
                        assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_MTX);
                        waitinfo->owner  = thread_tid(kwq->kw_owner);
                        waitinfo->context = kwq->kw_addr;
                        break;
                /* Owner of rwlock not stored in kernel space due to races. Punt
                 * and hope that the userspace address is helpful enough. */
                case kThreadWaitPThreadRWLockRead:
                case kThreadWaitPThreadRWLockWrite:
                        assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_RWLOCK);
                        waitinfo->owner  = 0;
                        waitinfo->context = kwq->kw_addr;
                        break;
                /* Condvars don't have owners, so just give the userspace address. */
                case kThreadWaitPThreadCondVar:
                        assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_CVAR);
                        waitinfo->owner  = 0;
                        waitinfo->context = kwq->kw_addr;
                        break;
                case kThreadWaitNone:
                default:
                        waitinfo->owner = 0;
                        waitinfo->context = 0;
                        break;
        }
}

把鎖更換為讀寫鎖,驗證一下前面的理論是否正確:

pthread_rwlock_init(&_rwlock, NULL);
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
  printf("begin : \n");
  printThreadPriority();
  printf("queue before lock \n");
  pthread_rwlock_rdlock(&_rwlock); //確保 backgroundQueue 先得到鎖
  printf("queue lock \n");
  printThreadPriority();
  dispatch_async(dispatch_get_main_queue(), ^{
    printf("before main lock\n");
    pthread_rwlock_wrlock(&_rwlock);
    printf("in main lock\n");
    pthread_rwlock_unlock(&_rwlock);
    printf("after main unlock\n");
  });
  sleep(10);
  printThreadPriority();
  printf("queue unlock\n");
  pthread_rwlock_unlock(&_rwlock);
  printf("queue after unlock\n");
});
begin : 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock 
queue lock 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue unlock
queue after unlock
in main lock
after main unlock

可以看到讀寫鎖不會發生優先順序提升

dispatch_sync

這個API都比較熟悉了,這裡直接驗證:

// 當前執行緒為主執行緒
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printThreadPriority();
dispatch_async(_queue, ^{
    printf("dispatch_async before dispatch_sync : \n");
    printThreadPriority();
});
dispatch_sync(_queue, ^{
    printf("dispatch_sync: \n");
    printThreadPriority();
});
dispatch_async(_queue, ^{
    printf("dispatch_async after dispatch_sync: \n");
    printThreadPriority();
});
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63 
dispatch_async before dispatch_sync : 
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_sync: 
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_async after dispatch_sync: 
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63

_queue是一個低優先順序隊列(QOS_CLASS_BACKGROUND),可以看到dispatch_sync調用壓入隊列的任務,以及在這之前dispatch_async壓入的任務,都被提升到較高的優先順序47(和主執行緒一致),而最後一個dispatch_async的任務則以優先順序4來執行。

dispatch_wait

// 當前執行緒為主執行緒
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printf("main thread\n");
printThreadPriority();
dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{
    printf("sub thread\n");
    sleep(2);
    printThreadPriority();
});
dispatch_async(_queue, block);
dispatch_wait(block, DISPATCH_TIME_FOREVER);

_queue是一個低優先順序隊列(QOS_CLASS_BACKGROUND),當在當前主執行緒使用dispatch_wait進行等待時,輸出如下,低優先順序的任務被提升到優先順序47

main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63

而如果將dispatch_wait(block, DISPATCH_TIME_FOREVER)注釋掉之後,輸出如下:

main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63

值得注意的是,dispatch_wait是一個宏(C11的泛型),或者是一個入口函數,它可以接受dispatch_block_tdispatch_group_tdispatch_semaphore_t 3種類型的參數,但是這裡的具體含義應該是指dispatch_block_wait,只有dispatch_block_wait會調整優先順序,避免優先順序反轉。

intptr_t
dispatch_wait(void *object, dispatch_time_t timeout);
#if __has_extension(c_generic_selections)
#define dispatch_wait(object, timeout) \
                _Generic((object), \
                        dispatch_block_t:dispatch_block_wait, \
                        dispatch_group_t:dispatch_group_wait, \
                        dispatch_semaphore_t:dispatch_semaphore_wait \
                )((object),(timeout))
#endif

神秘的訊號量

dispatch_semaphore

之前對dispatch_semaphore的認知非常淺薄,經常把二值訊號量和互斥鎖劃等號。但是通過調研後發現:dispatch_semaphore 沒有 QoS 的概念,沒有記錄當前持有訊號量的執行緒(owner),所以有高優先順序的執行緒在等待鎖時,內核無法知道該提高哪個執行緒的調試優先順序(QoS)。如果鎖持有者優先順序比其他執行緒低,高優先順序的等待執行緒將一直等待。Mutex vs Semaphore: What』s the Difference? 一文詳細比對了MutexSemaphore之間的區別。

Semaphores are for signaling (sames a condition variables, events) while mutexes are for mutual exclusion. Technically, you can also use semaphores for mutual exclusion (a mutex can be thought as a binary semaphore) but you really shouldn』t.Right, but libdispatch doesn』t have a mutex. It has semaphores and queues. So if you』re trying to use libdispatch and you don』t want the closure-based aspect of queues, you might be tempted to use a semaphore instead. Don』t do that, use os_unfair_lock or pthread_mutex (or a higher-level construct like NSLock) instead.

這些是一些警示,可以看到dispatch_semaphore十分危險,使用需要特別小心。

這裡通過蘋果官方提供的demo進行解釋:

__block NSString *taskName = nil;
dispatch_semaphore_t sema = dispatch_semaphore_create(0); 
[self.connection.remoteObjectProxy requestCurrentTaskName:^(NSString *task) { 
     taskName = task; 
     dispatch_semaphore_signal(sema); 
}]; 
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER); 
return taskName;
  1. 假設在主執行緒執行這段程式碼,那麼當前執行緒的優先順序是QOS_CLASS_USER_INTERACTIVE
  2. 由於從主執行緒進行了非同步,非同步任務隊列的QoS將會被提升為QOS_CLASS_USER_INITIATED
  3. 主執行緒被訊號量sema阻塞,而負責釋放該訊號量的非同步任務的優先順序QOS_CLASS_USER_INITIATED低於主執行緒的優先順序QOS_CLASS_USER_INTERACTIVE,因此可能會發生優先順序反轉。

值得一提的是,Clang專門針對這種情況進行了靜態檢測:

//github.com/llvm-mirror/clang/blob/master/lib/StaticAnalyzer/Checkers/GCDAntipatternChecker.cpp

static auto findGCDAntiPatternWithSemaphore() -> decltype(compoundStmt()) {

  const char *SemaphoreBinding = "semaphore_name";
  auto SemaphoreCreateM = callExpr(allOf(
      callsName("dispatch_semaphore_create"),
      hasArgument(0, ignoringParenCasts(integerLiteral(equals(0))))));

  auto SemaphoreBindingM = anyOf(
      forEachDescendant(
          varDecl(hasDescendant(SemaphoreCreateM)).bind(SemaphoreBinding)),
      forEachDescendant(binaryOperator(bindAssignmentToDecl(SemaphoreBinding),
                     hasRHS(SemaphoreCreateM))));

  auto HasBlockArgumentM = hasAnyArgument(hasType(
            hasCanonicalType(blockPointerType())
            ));

  auto ArgCallsSignalM = hasAnyArgument(stmt(hasDescendant(callExpr(
          allOf(
              callsName("dispatch_semaphore_signal"),
              equalsBoundArgDecl(0, SemaphoreBinding)
              )))));

  auto HasBlockAndCallsSignalM = allOf(HasBlockArgumentM, ArgCallsSignalM);

  auto HasBlockCallingSignalM =
    forEachDescendant(
      stmt(anyOf(
        callExpr(HasBlockAndCallsSignalM),
        objcMessageExpr(HasBlockAndCallsSignalM)
           )));

  auto SemaphoreWaitM = forEachDescendant(
    callExpr(
      allOf(
        callsName("dispatch_semaphore_wait"),
        equalsBoundArgDecl(0, SemaphoreBinding)
      )
    ).bind(WarnAtNode));

  return compoundStmt(
      SemaphoreBindingM, HasBlockCallingSignalM, SemaphoreWaitM);
}

如果想使用該功能,只需要打開xcode設置即可:

圖片

另外,dispatch_group 跟 semaphore 類似,在調用 enter() 方法時,無法預知誰會調用 leave(),所以系統也無法知道其 owner是誰,所以同樣不會有優先順序提升的問題。

訊號量卡死現身說法

dispatch_semaphore給筆者的印象非常深刻,之前寫過一段這樣的程式碼:使用訊號量在主執行緒同步等待相機授權結果。

__block BOOL auth = NO;
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) {
  auth = allow;
  dispatch_semaphore_signal(semaphore);
}];
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);

上線後長期佔據卡死top1,當時百思不得其解,在深入了解到訊號量無法避免優先順序反轉後,終於豁然開朗,一掃之前心中的陰霾。這類問題一般通過2種方式來解決:

  1. 使用同步API
BOOL auth = [KTAuthorizeService authorizationWithType:KTPermissionsTypeCamera];
// do something next
  1. 非同步回調,不要在當前執行緒等待
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) {
    BOOL auth = allow;
    // do something next via callback
}];

幾個概念

turnstile

前文提到XNU使用turnstile進行優先順序繼承,這裡對turnstile機制進行簡單的描述和理解。在XNU內核中,存在著大量的同步對象(例如lck_mtx_t),為了解決優先順序反轉的問題,每個同步對象都必須對應一個分離的數據結構來維護大量的資訊,例如阻塞在這個同步對象上的執行緒隊列。可以想像一下,如果每個同步對象都要分配一個這樣的數據結構,將造成極大的記憶體浪費。為了解決這個問題,XNU採用了turnstile機制,一種空間利用率很高的解決方案。該方案的提出依據是同一個執行緒在同一時刻不能同時阻塞於多個同步對象上。這一事實允許所有同步對象只需要保留一個指向turnstile的指針,且在需要的時候去分配一個turnstile即可,而turnstile則包含了操作一個同步對象需要的所有資訊,例如阻塞執行緒的隊列、擁有這個同步對象的執行緒指針。turnstile是從池中動態分配的,這個池的大小會隨著系統中已分配的執行緒數目增加而增加,所以turnstile總數將始終低於或等於執行緒數,這也決定了turnstile的數目是可控的。turnstile由阻塞在該同步對象上的第一個執行緒負責分配,當沒有更多執行緒阻塞在該同步對象上,turnstile會被釋放,回收到池中。turnstile的數據結構如下:

struct turnstile {
    struct waitq                  ts_waitq;              /* waitq embedded in turnstile */
    turnstile_inheritor_t         ts_inheritor;          /* thread/turnstile inheriting the priority (IL, WL) */
    union {
        struct turnstile_list ts_free_turnstiles;    /* turnstile free list (IL) */
        SLIST_ENTRY(turnstile) ts_free_elm;          /* turnstile free list element (IL) */
    };
    struct priority_queue_sched_max ts_inheritor_queue;    /* Queue of turnstile with us as an inheritor (WL) */
    union {
        struct priority_queue_entry_sched ts_inheritor_links;    /* Inheritor queue links */
        struct mpsc_queue_chain   ts_deallocate_link;    /* thread deallocate link */
    };
    SLIST_ENTRY(turnstile)        ts_htable_link;        /* linkage for turnstile in global hash table */
    uintptr_t                     ts_proprietor;         /* hash key lookup turnstile (IL) */
    os_refcnt_t                   ts_refcount;           /* reference count for turnstiles */
    _Atomic uint32_t              ts_type_gencount;      /* gen count used for priority chaining (IL), type of turnstile (IL) */
    uint32_t                      ts_port_ref;           /* number of explicit refs from ports on send turnstile */
    turnstile_update_flags_t      ts_inheritor_flags;    /* flags for turnstile inheritor (IL, WL) */
    uint8_t                       ts_priority;           /* priority of turnstile (WL) */

#if DEVELOPMENT || DEBUG
    uint8_t                       ts_state;              /* current state of turnstile (IL) */
    queue_chain_t                 ts_global_elm;         /* global turnstile chain */
    thread_t                      ts_thread;             /* thread the turnstile is attached to */
    thread_t                      ts_prev_thread;        /* thread the turnstile was attached before donation */
#endif
};

優先順序數值

在驗證環節有一些優先順序數值,這裡藉助「Mac OS® X and iOS Internals 」解釋一下:實驗中涉及到的優先順序數值都是相對於Mach層而言的,且都是用戶執行緒數值

  1. 用戶執行緒的優先順序是0~63;
    1. NSQualityOfServiceBackgroundMach層級優先順序數是4;
    2. NSQualityOfServiceUtilityMach層級優先順序數是20;
    3. NSQualityOfServiceDefaultMach層級優先順序數是31;
    4. NSQualityOfServiceUserInitiatedMach層級優先順序數是37;
    5. NSQualityOfServiceUserInteractiveMach層級優先順序是47;
  2. 內核執行緒的優先順序是80~95;
  3. 實時系統執行緒的優先順序是96~127;
  4. 64~79被保留給系統使用;

圖片

總結

本文主要闡述了優先順序反轉的一些概念和解決思路,並結合iOS平台的幾種鎖進行了詳細的調研。通過深入的理解,可以去規避一些不必要的優先順序反轉,從而進一步避免卡死異常。字節跳動 APM團隊也針對執行緒的優先順序做了監控處理,進而達到發現和預防優先順序反轉的目的。

加入我們

字節跳動 APM 中台致力於提升整個集團內全系產品的性能和穩定性表現,技術棧覆蓋iOS/Android/Server/Web/Hybrid/PC/遊戲/小程式等,工作內容包括但不限於性能穩定性監控,問題排查,深度優化,防劣化等。長期期望為業界輸出更多更有建設性的問題發現和深度優化手段。

歡迎對位元組APM團隊職位感興趣的同學投遞簡歷到郵箱 [email protected] 。

參考文檔