C#多執行緒之執行緒高級(下)

四、Monitor訊號構造

訊號構造本質:一個執行緒阻塞直到收到另一個執行緒發來的通知。

當多執行緒Wait同一對象時,就形成了一個「等待隊列(waiting queue)」,和用於等待獲得鎖的「就緒隊列(ready queue)」不同,每次調用Pulse時會釋放隊頭執行緒,它會進入就緒隊列,然後重新獲取鎖。可以把它想像成一個自動停車場,首先你在收費站(等待隊列)排隊驗票,然後在柵欄前(就緒隊列)排隊等待放行。

這個隊列結構天然有序,但是,對於Wait/Pulse應用通常不重要,在這種場景下把它想像成一個等待執行緒的「池(pool)」更好理解,每次調用Pulse都會從池中釋放一個等待執行緒。

PulseAll釋放整個等待隊列或者說等待池。收到Pulse的執行緒不會完全同時開始執行,而是有序的執行,因為每個Wait語句都要試圖重新獲取同一把鎖。他們的效果就是,PulseAll將執行緒從等待隊列移到就緒隊列中,讓它們可以繼續有序執行。

使用Wait/Pulse需要注意:

  • Wait / Pulse不能lock塊之外使用,否則會拋異常。
  • Pulse最多釋放一個執行緒,而PulseAll釋放所有執行緒。
  • Wait會立即釋放當前持有的鎖,然後進入阻塞,等待脈衝
  • 收到脈衝會立即嘗試重新獲取鎖,如果在指定時間內重新獲取,則返回true,如果在超過指定時間獲取,則返回false,如果沒有獲取鎖,則一直阻塞不會返回

Wait和Pulse

  1. 定義一個欄位,作為同步對象

    private readonly object _locker = new object();
    
  2. 定義一個或多個欄位,作為阻塞條件

    private bool _ok;
    
  3. 當你希望阻塞的時候

    Monitor.Wait在等待脈衝時,同步對象上的鎖會被釋放,並且進入阻塞狀態,直到收到 _locker上的脈衝,收到脈衝後重新獲取 _locker,如果此時 _locker 已經被別的執行緒佔有,則繼續阻塞,直至_獲取 _locker

    lock (_locker) 
    {
        while (!_ok)
        {
            Monitor.Wait (_locker);
        }
    }
    
  4. 當你希望改變阻塞條件時

    lock (_locker)
    {
        _ok = true;
        Monitor.Pulse(_locker);  // Monitor.PulseAll(_locker);
    }
    

WaitPulse幾乎是萬能的,通過一個bool標識我們就能實現AutoResetEvent/ManualResetEvent的功能,同理使用一個整形欄位,就可以實現CountdownEvent/Semaphore

性能方面,調用Pulse花費大概約是在等待句柄上調用Set三分之一的時間。但是,使用WaitPulse進行訊號同步,對比事件等待句柄有以下缺點:

  • Wait / Pulse不能跨越應用程式域和進程使用。

  • 必須通過鎖保護所有訊號同步邏輯涉及的變數。

等待超時

調用Wait方法時,你可以設定一個超時時間,可以是毫秒或TimeSpan的形式。如果因為超時而放棄了等待,那麼Wait方法就會返回false

public static bool Wait(object obj, TimeSpan timeout)

如果在超時到達時仍然沒有獲得一個脈衝,CLR會主動給它發送一個虛擬的脈衝(virtual pulse),使其能夠重新獲得鎖,然後繼續執行,就像收到一個真實脈衝一樣。

下面這個例子非常有用,它可以定期的檢查阻塞條件。即使其它執行緒無法按照預期發送脈衝,例如程式之後被其他人修改,但沒能正確使用Pulse,這樣也可以在一定程度上免疫 bug。因此在複雜的同步設計中可以給所有Wait指定超時時間。

lock (_locker)
  while (/* <blocking-condition> */)
    Monitor.Wait (_locker, /* <timeout> */);

Monitor.Wait的boolean類型返回值其實還可以這麼理解:其返回值意味著是否獲得了一個「真實的脈衝「。

如果」虛擬的脈衝「並不是期待的行為,可以記錄日誌或拋出異常。

Wait等待一個變數上的脈衝,Pulse對一個變數發送脈衝。脈衝也是一種訊號形式,相對於事件等待句柄那種鎖存(latching)訊號,脈衝顧名思義是一種非鎖存或者說易失的訊號

雙向訊號與競爭狀態

Monitor.Pulse是一種單向通訊機制:發送脈衝的執行緒不關心發出的脈衝被誰收到了,他沒有返回值,不會阻塞,內部也沒有確認機制。

當一個執行緒發起一次脈衝:

  • 如果等待隊列中沒有任何執行緒,那麼這次發起的脈衝不會有任何效果。
  • 如果等待隊列中有執行緒,執行緒發送完脈衝並釋放鎖後,並不能保證接到脈衝訊號的等待執行緒能立即開始工作。

然後我們有一些場景依賴等待執行緒能夠在收到脈衝後及時的響應,此時,雙向訊號出現了,這是一種自定義的確認機制。

在上文的訊號構造基礎上改造一個競爭狀態的案例:

public class 競爭狀態測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new object();
    private bool _ok;

    public 競爭狀態測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>  // Worker
        {
            for (int i = 0; i < 5; i++)
                lock (_locker)
                {
                    while (!_ok) Monitor.Wait(_locker);
                    _ok = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                _ok = true;
                Monitor.Pulse(_locker);
            }
        }
    }
}

我們期待的結果:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

實際上這個這個程式可能一次」Wassup?「都不會輸出:主執行緒可能在工作執行緒啟動之前完成,這五次Pulse啥事都沒幹

還記得我們講事件等待句柄時,使用AutoResetEvent來模擬的雙向訊號嗎?現在使用Monitor來實現一個擴展性更好的版本

public class 雙向訊號測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new();
    private bool _entry; // 我是否可以工作了
    private bool _ready; // 我是否可以繼續投遞了

    public 雙向訊號測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>
        {
            Thread.Sleep(100);
            for (int i = 0; i < 5; i++)
            {
                lock (_locker)
                {
                    _ready = true;
                    Monitor.PulseAll(_locker);
                    while (!_entry) Monitor.Wait(_locker);
                    _entry = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
            }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                while (!_ready) Monitor.Wait(_locker);
                _ready = false;
                _entry = true;
                Monitor.PulseAll(_locker);
            }
        }
    }
}

我們仍然使用_ready來作為上游脈衝執行緒的自旋條件,使用_entry作為下游等待執行緒的自旋條件。由於我們的邏輯都在lock語句中,即使之後引入了第三個執行緒,我們的邏輯仍然不會出問題,_ready_entry的讀寫總是原子的。

升級生產消費隊列

  1. 這次,我們將允許多個消費者,各自擁有獨立的消費執行緒。使用一個數組來存放這些執行緒,並且他們接收的不再是string,而是更加靈活的委託:

    private Thread[] _workers;
    private Queue<Action> _queue = new Queue<Action>();
    
  2. 和上次一樣,我們傳遞null來告知消費者執行緒退出:

    foreach (var worker in _workers)
    {
        AddTask(null);
    }
    
  3. 在告知消費執行緒退出後Join這些執行緒,等待未完成的任務被消費:

    foreach (var worker in _workers)
    { 
        worker.Join();
    }
    
  4. 每個工作執行緒會執行一個名為Consume的方法。我們在構造隊列時循環創建和啟動這些執行緒:

    _workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
        _workers[i] = new Thread(Consume);
        _workers[i].Start();
    }
    
  5. 消費Comsume方法,一個工作執行緒從隊列中取出並執行一個項目。我們希望工作執行緒沒什麼事情做的時候,或者說當隊列中沒有任何項目時,它們應該被阻塞。因此,我們的阻塞條件是_queue.Count == 0

    private void Consume()
    {
        while (true)
        {
            Action task;
            lock (_locker)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_locker);  // 隊列里沒任務,釋放鎖,進入等待
                }
                // 獲取新任務,重新持有鎖
                task = _queue.Dequeue();
            }
            
            if (task == null) return;  // 空任務代表退出
            task();  // 執行任務
        }
    }
    
  6. 添加一個任務。出於效率考慮,加入一個任務時,我們調用Pulse而不是PulseAll。這是因為每個項目只需要喚醒(至多)一個消費者。如果你只有一個冰激凌,你不會把一個班 30 個正在睡覺的孩子都叫起來排隊獲取它。

    public void AddTask(Action task)
    {
        lock (_locker)
        {
            _queue.Enqueue(task);
            Monitor.Pulse(_locker);
        }
    }
    

模擬等待句柄

在雙向訊號中,你可能注意到了一個模式:_flag在當前執行緒被作為自旋阻塞條件,在另一執行緒中被設置為true,跳出自旋

lock(_locker)
{
    while (!_flag) Monitor.Wait(_locker);
	_flag = false;
}

ManualResetEvent

事實上它的工作原理就是模仿AutoResetEvent。如果去掉_flag=false,就得到了ManualResetEvent的基礎版本。

private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
    lock (_locker)
    {
        while (!_signal) Monitor.Wait(_locker);
    }
}
void Set()
{
    lock (_locker)
    {
        _signal = true;
        Monitor.PulseAll(_locker);
    }
}
void Reset()
{
    lock (_locker) _signal = false;
}

使用PulseAll,是因為可能存在多個被阻塞的等待執行緒。而EventWaitHandle.WaitOne()的通行條件就是:是開著的,ManualResetEvent被放行通過後不會自己關門,只能通過Reset將門關上,再次期間其它所有阻塞執行緒都能通行。

AutoResetEvent

實現AutoResetEvent非常簡單,只需要將WaitOne方法改為:

lock (_locker)
{
    while (!_signal) Monitor.Wait(_locker);
    _signal = false;  // 添加一條,自己關門
}

然後將Set方法改為:

lock (_locker)
{
    _signal = true;
    Monitor.Pulse(_locker);  // PulseAll替換成Pulse:
}

Semaphore

_signal替換為一個整型欄位可以得到Semaphore的基礎版本

public class 模擬訊號量
{
    private readonly object _locker = new object();
    private int _count, _initialCount;
    public 模擬訊號量(int initialCount)
    {
        _initialCount = initialCount;
    }
    
    void WaitOne()  // +1
    {
        lock (_locker)
        {
            _count++;
            while (_count >= _initialCount)
            {
                Monitor.Wait(_locker);
            }
        }
    }

    void Release()  // -1
    {
        lock (_locker)
        {
            _count --;
            Monitor.Pulse(_locker);
        }
    }
}

模擬CountdownEvent

是不是非常類似訊號量?

public class 模擬CountdownEvent
{
    private object _locker = new object();
    private int _initialCount;

    public 模擬CountdownEvent(int initialCount)
    {
        _initialCount = initialCount;
    }

    public void Signal()  // +1
    {
        AddCount(-1);
    }

    public void AddCount(int amount)  // +amount
    {
        lock (_locker)
        {
            _initialCount -= amount;
            if (_initialCount <= 0) Monitor.PulseAll(_locker);
        }
    }

    public void Wait()
    {
        lock (_locker)
        {
            while (_initialCount > 0)
                Monitor.Wait(_locker);
        }
    }
}

執行緒會合

CountdownEvent

利用我們剛剛實現的模擬CountdownEvent,來實現兩個執行緒的會和,和同步基礎中提到的WaitHandle.SignalAndWait一樣。

並且我們也可以通過initialCount將會和的執行緒擴展到更多個,顯而易見的強大。

public class 執行緒會和測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private 模擬CountdownEvent _countdown = new 模擬CountdownEvent(2);

    public 執行緒會和測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Show()
    {
        // 每個執行緒都睡眠一段隨機時間
        Random r = new Random();
        new Thread(Mate).Start(r.Next(10000));
        Thread.Sleep(r.Next(10000));

        _countdown.Signal();
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }

    void Mate(object delay)
    {
        Thread.Sleep((int)delay);

        _countdown.Signal(); //+1
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }
}

上面例子,每個執行緒隨機休眠一段時間,然後等待對方,他們幾乎在同時列印」Mate!「,這被稱為執行緒執行屏障(thread execution barrier)

當你想讓多個執行緒執行一個系列任務,希望它們步調一致時,可以用到執行緒執行屏障。然而,我們現在的解決方案有一定限制:我們不能重用同一個Countdown對象來第二次會合執行緒,至少在沒有額外訊號構造的情況下不能。為解決這個問題,Framework 4.0 提供了一個新的類Barrier

Barrier

Framework 4.0 加入的一個訊號構造。它實現了執行緒執行屏障(thread execution barrier),允許多個執行緒在一個時間點會合。這個類非常快速和高效,它是建立在Wait / Pulse和自旋鎖基礎上的。

  1. 實例化它,指定有多少個執行緒參與會合(可以調用AddParticipants / RemoveParticipants來進行更改)。

    public Barrier(int participantCount)
    
  2. 當希望會合時,調用SignalAndWait。表示參與者已到達障礙,並等待所有其他參與者到達障礙

    public void SignalAndWait()
    

    他還實現了協作取消模式

    public void SignalAndWait(CancellationToken cancellationToken)
    

    並提供了超時時間的重載,返回一個bool類型,true標識在規定的時間,其他參與者到達障礙,false標識沒有全部到達

    public bool SignalAndWait(TimeSpan timeout)
    

實例化Barrier,參數為 3 ,意思是調用SignalAndWait會被阻塞直到該方法被調用 3 次。但與CountdownEvent不同,它會自動複位:再調用SignalAndWait仍會阻塞直到被調用 3 次。這允許你保持多個執行緒「步調一致」,讓它們執行一個系列任務。

下邊的例子中,三個執行緒步調一致地列印數字 0 到 4:

private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier測試(ITestOutputHelper testOutputHelper)
{
    _testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
    new Thread(Speak).Start();
    new Thread(Speak).Start();
    new Thread(Speak).Start();
}
void Speak()
{
    for (int i = 0; i < 5; i++)
    {
        _testOutputHelper.WriteLine(i.ToString());
        _barrier.SignalAndWait();
    }
}

Barrier還提供一個非常用有的構造參數,他是一個委託,會在每個會和處執行。不用擔心搶佔,因為當它被執行時,所有的參與者都是被阻塞的。

public Barrier(int participantCount, Action<Barrier>? postPhaseAction)

五、拓展

前景回顧:

還記得我們在講同步的時候提到的最小化共享數據無狀態設計嗎?經過前面的學習,稍加思考,其實引發執行緒安全的本質是多執行緒並發下的數據交互問題。如果我們的數據在執行緒之間沒有交互,或者說我們的數據都是只讀的,那不就天然的執行緒安全了嗎?

現在你能理解為什麼只讀欄位是天然執行緒安全的了嗎?

然而有的場景下又需要對公共數據進行讀寫,同步篇中我們通過很簡單的排它鎖來保證執行緒安全,在這裡,我們不在滿足這種粗暴的粒度(事實上多數時候讀總是多於寫),這時,讀寫鎖出現了。

ReaderWriterLockSlim

ReaderWriterLockSlim在 Framework 3.5 加入的,被加入了standard 1.0,此類型是執行緒安全的,用於保護由多個執行緒讀取的資源。

ReaderWriterLockSlim出現的目的是為了取締ReaderWriterLock,他簡化了遞歸規則以及鎖狀態的升級和降級規則。避免了許多潛在的死鎖情況。 另外,他的性能顯著優於ReaderWriterLock。 建議對所有新開發的項目使用ReaderWriterLockSlim

然而如果與普通的lockMonitor.Enter / Exit)對比,他還是要慢一倍。

ReaderWriterLockSlim有三種模式:

  • 讀取模式:允許任意多的執行緒處於讀取模式

  • 可升級模式:只允許一個執行緒處於可升級模式,與讀鎖兼容

  • 寫入模式:完全互斥,不允許任何模式下的執行緒獲取任何鎖

ReaderWriterLockSlim定義了如下的方法來獲取和釋放讀 / 寫鎖:

public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();

另外,對應所有EnterXXX的方法,都有相應的TryXXX版本,可以接受一個超時參數,與Monitor.TryEnter類似。

讓我們來看一個案例:

模擬三個讀執行緒,兩個寫執行緒,並行執行

new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();

讀方法是這樣的

while (true)
{
    _rw.EnterReadLock();
    foreach (int number in _items)
    {
        Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
        Thread.Sleep(100);
    }
    _rw.ExitReadLock();
}

寫方法是這樣的

while (true)
{
    int number = _rand.Value.Next(100);
    _rw.EnterWriteLock();
    _items.Add(number);
    _rw.ExitWriteLock();
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
    Thread.Sleep(100);
}

隨機數生成方法就是用的TLS講過的

new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));

需要注意ReaderWriterLockSlim實現了IDisposable,用完了請記得釋放

public class ReaderWriterLockSlim : IDisposable

運行結果:

Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...

顯而易見的,並發度變高了

鎖遞歸

ReaderWriterLockSlim提供一個構造參數LockRecursionPolicy用於配置鎖遞歸策略

public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
public enum LockRecursionPolicy
{
  /// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>
  NoRecursion,
  /// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>
  SupportsRecursion,
}

默認情況下是使用NoRecursion策略:不允許遞歸或重入,這與GO的讀寫鎖設計不謀而合,建議使用此默認策略,因為遞歸引入了不必要的複雜性,並使程式碼更易於死鎖。

public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)

開啟支援遞歸策略後,以下程式碼不會拋出LockRecursionException異常

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

遞歸鎖定級別只能越來越小,級別順序如下:讀鎖,可升級鎖,寫鎖。下面程式碼會拋出LockRecursionException異常

void F()
{
    var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
    rw.EnterReadLock();
    rw.EnterWriteLock();
    rw.EnterWriteLock();
    rw.ExitReadLock();
}
Assert.Throws<LockRecursionException>(F);

可升級鎖例外,把可升級鎖升級為寫鎖是合法的。

var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();

思考一個問題:為什麼只允許一個執行緒處於可升級模式?

SQL Server ReaderWriterLockSlim
共享鎖(Share lock) 讀鎖(Read lock)
排它鎖(Exclusive lock) 寫鎖(Write lock)
更新鎖(Update lock) 可升級鎖(Upgradeable lock)

Timer

如果你需要使用規律的時間間隔重複執行一些方法,這個例子會使得一個執行緒永遠被佔用

while (true)
{
    // do something
    Thread.Sleep(1000);
}

這時候你會需要Timer

創建計時器時,可以指定在方法首次執行之前等待的時間 dueTime ,以及後續執行之間等待的時間period。 類 Timer 的解析度與系統時鐘相同。 這意味著,如果period小於系統時鐘的解析度,委託將以系統時鐘解析度定義的時間間隔執行,在Windows 7 和Windows 8系統上大約為 15 毫秒。

public Timer(TimerCallback callback, object? state, int dueTime, int period)

下面這個例子首次間隔1s,之後間隔500ms列印tick…

Timer timer = new Timer ((data) =>
{
    _testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();

計時器委託是在構造計時器時指定的,不能更改。 該方法不會在創建計時器的執行緒上執行;而是在執行緒池(thread pool)執行。

如果計時器間隔period小於執行回調所需的時間,或者如果所有執行緒池執行緒都在使用,並且回調被多次排隊,則可以在兩個執行緒池執行緒上同時執行回調。

只要使用 Timer,就必須保留對它的引用。 與任何託管對象一樣,當沒有對其引用時,會受到垃圾回收的約束。 即使 Timer 仍然處於活動狀態也不會阻止它被收集。

不再需要計時器時,請調用 Dispose 釋放計時器持有的資源。請注意,調用 Dispose() 後仍然可能會發生回調,因為計時器將回調排隊供執行緒池執行緒執行。可以使用public bool Dispose(WaitHandle notifyObject)重載等待所有回調完成。

System.Threading.Timer是一個普通計時器。 它會回調一個執行緒池執行緒(來自工作池)。

System.Timers.Timer是一個System.ComponentModel.Component ,它包裝System.Threading.Timer ,並提供一些用於在特定執行緒上調度的附加功能。