Python | Python初學者的自我修養,找到自己的方向

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是Python專題的第21篇文章,我們繼續多執行緒的話題。

上周的文章當中我們簡單介紹了執行緒和進程的概念,以及在Python當中如何在主執行緒之外創建其他執行緒,並且還了解了用戶級執行緒和後台執行緒的區別以及使用方法。今天我們來看看執行緒的其他使用,比如如何停止一個執行緒,執行緒之間的Event用法等等。

停止執行緒

利用Threading庫我們可以很方便地創建執行緒,讓它按照我們的想法執行我們想讓它執行的事情,從而加快程式運行的效率。然而有一點坑爹的是,執行緒創建之後,就交給了作業系統執行,我們無法直接結束一個執行緒,也無法給它發送訊號,無法調整它的調度,也沒有其他高級操作。如果想要相關的功能,只能自己開發。

怎麼開發呢?

我們創建執行緒的時候指定了target等於一個我們想讓它執行的函數,這個函數並不一定是全局函數,實際上也可以是一個對象中的函數。如果是對象中的函數,那麼我們就可以在這個函數當中獲取到對象中的其他資訊,我們可以利用這一點來實現手動控制執行緒的停止。

說起來好像不太好理解,但是看下程式碼真的非常簡單:

import time
from threading import Thread

class TaskWithSwitch:
    def __init__(self):
        self._running = True
        
    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('Running {}'.format(n))
            n -= 1
            time.sleep(1)

c = TaskWithSwitch()
t = Thread(target=c.run, args=(10, ))
t.start()
c.terminate()
t.join()

如果你運行這段程式碼,會發現螢幕上只輸出了10,因為我們將_running這個欄位置為False之後,下次循環的時候不再滿足循環條件,它就會自己退出了。

如果我們想要用多執行緒來讀取IO,由於IO可能存在堵塞,所以可能會出現執行緒一直無法返回的情況。也就是說我們在循環內部卡死了,這個時候單純用_running來判斷還是不夠的,我們需要在執行緒內部設置計時器,防止循環內部的卡死。

class IOTask:
    def __init__(self):
        self._running = True
        
    def terminate(self):
        self._running = False

    def run(self, sock):
        # 在socket中設置計時器
        sock.settimeout(10)
        while self._running:
            try:
                # 由於設置了計時器,所以這裡不會永久等待
                data = sock.recv(1024)
                break
            except socket.timeout:
                continue
        return

執行緒訊號的傳遞

我們之所以如此費勁才能控制執行緒的運行,主要原因是執行緒的狀態是不可知的,並且我們無法直接操作它,因為它是被作業系統管理的。我們運行的主執行緒和創建出來的執行緒是獨立的,兩者之間並沒有從屬關係,所以想要實現對執行緒的狀態進行控制,往往需要我們通過其他手段來實現。

我們來思考一個場景,假設我們有一個任務,需要在另外一個執行緒運行結束之後才能開始執行。要想要實現這一點,就必須對執行緒的狀態有所感知,需要其他執行緒傳遞出訊號來才行。我們可以使用threading中的Event工具來實現這一點。Event工具就是可以用來傳遞訊號的,就好像是一個開關,當一個執行緒執行完成之後,會去啟動這個開關。而這個開關控制著另外一段邏輯的運行。

我們來看下樣例程式碼:

import time
from threading import Thread, Event

def run_in_thread():
    time.sleep(1)
    print('Thread is running')

t = Thread(target=run_in_thread)
t.start()

print('Main thread print')

我們在執行緒裡面就只做了輸出一行提示符,沒有其他任何邏輯。由於我們在run_in_thread函數當中沉睡了1s,所以一定是先輸出Main thread print再輸出的Thread is running。假設這個執行緒是一個很重要的任務,我們希望主執行緒能夠等待它運行到一個階段再往下執行,我們應該怎麼辦呢?

注意,這裡說的是運行到一個階段,並不是運行結束。運行結束我們很好處理,可以通過join來完成。但如果不是運行結束,而是運行完成了某一個階段,當然通過join也可以,但是會損害整體的效率。這個時候我們就必須要用上Event了。加上Event之後,我們再來看下程式碼:

import time
from threading import Thread, Event

def run_in_thread(event):
    time.sleep(1)
    print('Thread is running')
    # set一下event,這樣外面wait的部分就會被啟動
    event.set()

# 初始化Event
event = Event()
t = Thread(target=run_in_thread, args=(event, ))
t.start()

# event等待set
event.wait()
print('Main thread print')

整體的邏輯沒有太多的修改,主要的是增加了幾行關於Event的使用程式碼。

我們如果要用到Event,最好在程式碼當中只使用一次。當然通過Event中的clear方法我們可以重置Event的值,但問題是我們沒辦法保證重置的這個邏輯會在wait之前執行。如果是在之後執行的,那麼就會問題,並且在debug的時候會異常痛苦,因為bug不是必現的,而是有時候會出現有時候不會出現。這種情況往往都是因為多執行緒的使用問題。

所以如果要多次使用開關和訊號的話,不要使用Event,可以使用訊號量。

訊號量

Event的問題在於如果多個執行緒在等待Event的發生,當它一旦被set的時候,那麼這些執行緒都會同時執行。但有時候我們並不希望這樣,我們希望可以控制這些執行緒一個一個地運行。如果想要做到這一點,Event就無法滿足了,而需要使用訊號量。

訊號量和Event的使用方法類似,不同的是,訊號量可以保證每次只會啟動一個執行緒。因為這兩者的底層邏輯不太一致,對於Event來說,它更像是一個開關。一旦開關啟動,所有和這個開關關聯的邏輯都會同時執行。而訊號量則像是許可證,只有拿到許可證的執行緒才能執行工作,並且許可證一次只發一張。

想要使用訊號量並不需要自己開發,thread庫當中為我們提供了現成的工具——Semaphore,我們來看它的使用程式碼:

# 工作執行緒
def worker(n, sema):
    # 等待訊號量
    sema.acquire()
    print('Working', n)

# 初始化
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

在上面的程式碼當中我們創建了10個執行緒,雖然這些執行緒都被啟動了,但是都不會執行邏輯,因為sema.acquire是一個阻塞方法,沒有監聽到訊號量是會一直掛起等待。

當我們釋放訊號量之後,執行緒被啟動,才開始了執行。我們每釋放一個訊號,則會多啟動一個執行緒。這裡面的邏輯應該不難理解。

總結

在並發場景當中,多執行緒的使用絕不是多啟動幾個執行緒做不同的任務而已,我們需要執行緒間協作,需要同步、獲取它們的狀態,這是非常不容易的。一不小心就會出現幽靈bug,時顯時隱,這也是並發問題讓人頭疼的主要原因。

這篇文章當中我們只是簡單介紹了執行緒間通訊的基本方法,針對這個問題,還有更好的解決方案。我們將在後續的文章當中繼續討論這個問題,敬請期待。

今天的文章到這裡就結束了,如果喜歡本文的話,請來一波素質三連,給我一點支援吧(關注、轉發、點贊)。

本文使用 mdnice 排版