threading多執行緒模組
- 2020 年 7 月 1 日
- 筆記
- Python並發編程
基本使用
Python中提供了threading
模組用來實現執行緒並發編程,使用方法有兩種,一種是將threading
模組下的Therad
類進行實例化的方式實現,一種是通過繼承threading
模組下的Therad
類並覆寫run()
方法實現。
實例化Therad類創建子執行緒
這種方式是最常用的也是推薦使用的方式。先來介紹一個Therad
類中的方法,然後再看程式碼。
start()
:開始執行緒活動。它在一個執行緒里最多只能被調用一次。它安排對象的
PS:該方法不會立即執行,只是告訴CPU說你可以調度我了,我準備好了,一定要注意不是立即執行!
import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) # <-- 這裡睡眠了三秒,可以看見主執行緒繼續往下走了 print("子執行緒任務處理完畢") if __name__ == '__main__': # ==== 實例化出Thread類並添加子執行緒任務以及參數 ==== t1 = threading.Thread(target=task, args=("執行緒[1]",)) # <-- 參數必須添加逗號。因為是args所以會打散,如果不加逗號則不能進行打散會拋出異常 t1.start() # 等待CPU調度..請注意這裡不是立即執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 主執行緒任務處理完畢 子執行緒任務處理完畢 """
我們可以看見,在進行time.sleep()
的時候執行緒做了一次切換,這是因為該方法是屬於IO操作
,所以GIL鎖將執行許可權丟給了主執行緒。還有一點要注意的就是主執行緒任務處理完畢後不會立馬結束掉,而是等子執行緒任務處理完畢後才會真正將主執行緒連同子執行緒一起kill掉。
圖示:
自定義類繼承Therad並覆寫run方法
這種方法並不常見,但是還是要舉例說出來。我們可以看到第一種方法是實例化出了Therad
類,並且執行了其start()
方法,然後子執行緒就可以被調度了,其實在內部是通過start()
方法調用了Therad
類下的run()
方法的。
run()
:代表執行緒活動的方法。你可以在子類型里重載這個方法。 標準的
那麼我們就可以自定義一個類並繼承Therad
類,再覆寫run()
方法
import threading import time print("主執行緒任務開始處理") class Threading(threading.Thread): """自定義類""" def __init__(self, th_name): self.th_name = th_name super(Threading, self).__init__() def run(self): print("子執行緒任務開始處理,參數:{0}".format(self.th_name)) time.sleep(3) # <-- 這裡睡眠了三秒,可以看見主執行緒繼續往下走了 print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = Threading("執行緒[1]") t1.start() # 等待CPU調度..請注意這裡不是立即執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 主執行緒任務處理完畢 子執行緒任務處理完畢 """
注意現在依然是主執行緒任務處理完畢後現在是不會立馬結束掉的,而是等子執行緒任務處理完畢後才會真正將主執行緒kill掉。其實原則上這兩種創建執行緒的方式都一模一樣。
源碼淺析-選讀
這個源碼淺析非常淺,主要是來看一下基於實例化Therad
類創建子執行緒內部是如何做的。
那麼我們看一下其Thread
類的源碼,:
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)
調用這個構造函數時,必需帶有關鍵字參數。參數如下:
group
應該為None
;為了日後擴展ThreadGroup
類實現而保留。
target
是用於
name
是執行緒名稱。默認情況下,由 “Thread-N” 格式構成一個唯一的名稱,其中 N 是小的十進位數。
args
是用於調用目標函數的參數元組。默認是()
。
kwargs
是用於調用目標函數的關鍵字參數字典。默認是{}
。如果不是
None
,daemon 參數將顯式地設置該執行緒是否為守護模式。 如果是None
(默認值),執行緒將繼承當前執行緒的守護模式屬性。如果子類型重載了構造函數,它一定要確保在做任何事前,先發起調用基類構造器(
Thread.__init__()
)。


class Thread: """注釋被我刪掉了""" _initialized = False # 這是一個狀態位,來表示該執行緒是否被被初始化過 def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): """注釋被我刪掉了""" assert group is None, "group argument must be None for now" #如果不是 None,daemon參數將顯式地設置該執行緒是否為守護模式。 如果是 None (默認值),執行緒將繼承當前執行緒的守護模式屬性。 if kwargs is None: kwargs = {} # kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。 self._target = target # 對於第一種調用方式來說,它就是我們的task函數。 self._name = str(name or _newname()) # 執行緒名 self._args = args # _args是用於調用目標函數的參數元組。默認是 ()。 self._kwargs = kwargs if daemon is not None: # 判斷其是否為守護執行緒 self._daemonic = daemon else: self._daemonic = current_thread().daemon self._ident = None # 這個是執行緒的編號 if _HAVE_THREAD_NATIVE_ID: # 判斷是否具有本地ID self._native_id = None self._tstate_lock = None # 鎖定的狀態 self._started = Event() # 開始 self._is_stopped = False # 狀態位,是否停止 self._initialized = True # 將初始化狀態為改為True # Copy of sys.stderr used by self._invoke_excepthook() self._stderr = _sys.stderr self._invoke_excepthook = _make_invoke_excepthook() # For debugging and _after_fork() _dangling.add(self)
Thread類的__init__方法
我們可以看見其__init__
方法大多都是做了一些初始化的東西。下面我們來看run()
方法,它才是離我們最近的一個方法。


def run(self): """注釋被我刪掉了""" try: if self._target: # 簡單吧,這個方法,就是判斷你有沒有傳入一個函數。即我們定義的task self._target(*self._args, **self._kwargs) # 有的話就立即執行,我們傳入的name其實就放在了_args中。這裡將它打散出來了,所以我們的task函數中的第一個參數name能收到。 finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs # 不管處不出錯,都會清理他們。當然,如果有則是執行完成後清理
TCPServer中的run()方法
好了,其實看到這裡就行了。其實我們自定義類的傳參也可以不用覆寫__init__
再去調用父類方法初始化進行傳參,我們完全以另一種方式,但是我個人不太推薦。


import threading import time print("主執行緒任務開始運行") class Threading(threading.Thread): """自定義類""" def run(self): print(self._args) # ('執行緒[1]',) print(self._kwargs) # {} print("子執行緒任務開始處理,參數:{0}".format(self._args[0])) time.sleep(3) # <-- 這裡睡眠了三秒,可以看見主執行緒繼續往下走了 print("子執行緒任務運行完畢") if __name__ == '__main__': t1 = Threading(args=("執行緒[1]",)) t1.start() # 等待CPU調度..請注意這裡不是立即執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 ('執行緒[1]',) 主執行緒任務處理完畢 {} 子執行緒任務開始處理,參數:執行緒[1] 子執行緒任務處理完畢 """
自定義類繼承Therad並覆寫run方法的其他方式參數傳入
threading通用方法大全
thrading的通用方法 | ||
---|---|---|
方法/屬性名稱 | 通俗的功能描述 | 官方功能描述(譯版) |
threading.active_count() | 查看當前進程下一共存活了多少個執行緒的數量,返回的是一個int 值。 |
返回當前存活的執行緒類
|
threading.current_thread() | 獲取當前執行緒對象。 |
返回當前對應調用者的控制執行緒的
|
threading.currentThread() | 同上 | 同上 |
threading.excepthook(args, /) | 處理由 | 太長了,自己去看。懶得寫啊,想要看的自己去看一眼。 |
threading.get_ident() | 返回當前執行緒對象的編號。 |
返回當前執行緒的 「執行緒標識符」。 它是一個非零的整數。它的值沒有直接含義,主要是用作 magic cookie,比如作為含有執行緒相關數據的字典的索引。 執行緒標識符可能會在執行緒退出,新執行緒創建時被複用。 |
threading.get_native_id() | 返回當前執行緒對象的編號。和threading.get_ident() 相同。 |
返回內核分配給當前執行緒的原生集成執行緒 ID。 這是一個非負整數。 它的值可被用來在整個系統中唯一地標識這個特定執行緒(直到執行緒終結,在那之後該值可能會被 OS 回收再利用)。 |
threading.enumerate() | 查看當前進程存活了的所有執行緒對象,以列表形式返回。 |
以列表形式返回當前所有存活的
|
threading.main_thread() | 返回主執行緒對象。 | 返回主 |
threading.settrace(func) | 不太清楚..好像是測試用的。 |
為所有
|
threading.stack_size([size]) | 返回創建執行緒時使用的堆棧大小。 |
返回創建執行緒時使用的堆棧大小。 可選參數 size 指定之後新建的執行緒的堆棧大小,而且一定要是0(根據平台或者默認配置)或者最小是32,768(32KiB)的一個正整數。 如果 size 沒有指定,默認是0。 如果不支援改變執行緒堆棧大小,會拋出
|
threading.TIMEOUT_MAX | 規定一個全局的所有阻塞函數的最大時間。 |
阻塞函數(
|
執行緒對象方法大全
執行緒對象方法大全(即Thread類的實例對象) | ||
---|---|---|
方法/屬性名稱 | 通俗的功能描述 | 官方功能描述(譯版) |
start() | 啟動執行緒,該方法不會立即執行,而是告訴CPU自己準備好了,可以隨時調度,而非立即啟動。 |
開始執行緒活動。 它在一個執行緒里最多只能被調用一次。 它安排對象的 |
run() | 一般是自定義類繼承Thread 並覆寫的方法,即執行緒的詳細任務邏輯。 |
代表執行緒活動的方法。 你可以在子類型里重載這個方法。 標準的 |
join(timeout=None) | 主執行緒默認會等待子執行緒運行結束後再繼續執行,timeout 為等待的秒數,如不設置該參數則一直等待。 |
等待,直到執行緒終結。 這會阻塞調用這個方法的執行緒,直到被調用
|
name | 可以通過 = 給該執行緒設置一個通俗的名字。如直接使用該屬性則返回該執行緒的默認名字。 |
只用於識別的字元串。它沒有語義。多個執行緒可以賦予相同的名稱。 初始名稱由構造函數設置。 |
getName() | 獲取該執行緒的名字。 | 舊的 |
setName() | 設置該執行緒的名字。 | 舊的 |
ident | 獲取執行緒的編號。 |
這個執行緒的 ‘執行緒標識符’,如果執行緒尚未開始則為 這是個非零整數。參見
|
native_id | 獲取執行緒的編號,和ident相同。 |
此執行緒的原生集成執行緒 ID。 這是一個非負整數,或者如果執行緒還未啟動則為
|
is_alive() | 查看執行緒是否存活,返回布爾值。 | 當 |
isAlive( ) | 同上,但是不推薦使用這種方法。 | 同上,但是不推薦使用這種方法。 |
daemon | 查看執行緒是否為一個守護執行緒,返回布爾值。默認為False 。 |
一個表示這個執行緒是(True)否(False)守護執行緒的布爾值。 一定要在調用
|
isDaemon() | 查看執行緒是否為一個守護執行緒,返回布爾值。默認為False 。 |
舊的 daemon 取值 API;建議直接當做特徵屬性使用它。 |
setDaemon() | 設置一個執行緒為守護執行緒,參數如果為True 則表示該執行緒被設置為守護執行緒,默認為False 。當主執行緒運行完畢之後設置為守護執行緒的子執行緒便立即結束執行… |
舊的 daemon 設值 API;建議直接當做特徵屬性使用它。 |
常用方法示例
由於方法太多了,所以這裡就只例舉一些非常常用的。
守護執行緒setDaemon()
setDaemon()
:設置一個執行緒為守護執行緒,參數如果為True
則表示該執行緒被設置為守護執行緒,默認為False
。當主執行緒運行完畢之後設置為守護執行緒的子執行緒便立即結束執行…
我們對比上面的圖,現在子執行緒是沒有設置為守護執行緒的:
當他設置為守護執行緒之後會是這樣的:
程式碼如下:
import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = threading.Thread(target=task, args=("執行緒[1]",)) t1.setDaemon(True) # <-- 設置執行緒對象t1為守護執行緒,注意這一步一定要放在start之前。 t1.start() # 等待CPU調度..請注意這裡不是立即執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 主執行緒任務處理完畢 """
執行緒阻塞join()
join(timeout=None)
:主執行緒默認會等待子執行緒運行結束後再繼續執行,timeout
為等待的秒數,如不設置該參數則一直等待。
圖示如下:(未設置超時時間)
程式碼如下:
import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = threading.Thread(target=task,args=("執行緒[1]",)) t1.start() # 等待CPU調度..請注意這裡不是立即執行 t1.join() # <--- 放在start()下面,死等 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 子執行緒任務處理完畢 主執行緒任務處理完畢 """
圖示如下:(設置超時時間)
程式碼如下:
import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = threading.Thread(target=task,args=("執行緒[1]",)) t1.start() # 等待CPU調度..請注意這裡不是立即執行 t1.join(2) # <--- 放在start()下面,等2秒後主執行緒繼續執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 主執行緒任務處理完畢 子執行緒任務處理完畢 """
注意,join()
方法可以多次設置!
join()與setDaemon(True)共存
如果同時設置
setDaemon(True)
與join()
方法會怎麼樣呢?有兩種情況:1.
join()
方法沒有設置timeout
(沒有設置即表示死等)或者timeout
的時間比子執行緒作業時間要長,這代表子執行緒會死在主執行緒之前,setDaemon(True)
也就沒有了意義,即失效了。2.
join()
設置了timeout
並且timeout
的時間比子執行緒作業時間要短,這代表主執行緒會死在子執行緒之前,setDaemon(True)
生效,子執行緒會跟著主執行緒一起死亡。


# ==== 情況一 ==== import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = threading.Thread(target=task,args=("執行緒[1]",)) t1.setDaemon(True) # <--- 放在start()上面,主執行緒運行完後會立即終止子執行緒的運行。但是由於有join(),故不生效。 t1.start() # 等待CPU調度..請注意這裡不是立即執行 t1.join() # <--- 放在start()下面,等2秒後主執行緒繼續執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 子執行緒任務處理完畢 主執行緒任務處理完畢 """
情況一


# ==== 情況二 ==== import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) time.sleep(3) print("子執行緒任務處理完畢") if __name__ == '__main__': t1 = threading.Thread(target=task,args=("執行緒[1]",)) t1.setDaemon(True) # <--- 放在start()上面,主執行緒運行完後會立即終止子執行緒的運行。但是由於有join(),故不生效。 t1.start() # 等待CPU調度..請注意這裡不是立即執行 t1.join(2) # <--- 放在start()下面,等2秒後主執行緒繼續執行 print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 主執行緒任務處理完畢 """
情況二
設置與獲取執行緒名
我們來看一下如何設置與獲取執行緒名。
threading.current_thread()
:獲取當前執行緒對象。
getName()
:獲取該執行緒的名字。
setName()
:設置該執行緒的名字。
name
:可以通過=
給該執行緒設置一個通俗的名字。如直接使用該屬性則返回該執行緒的默認名字。
import threading import time print("主執行緒任務開始處理") def task(th_name): print("子執行緒任務開始處理,參數:{0}".format(th_name)) obj = threading.current_thread() # 獲取當前執行緒對象 print("獲取當前的執行緒名:{0}".format(obj.getName())) print("開始設置執行緒名") obj.setName("yyy") print("獲取修改後的執行緒名:{0}".format(obj.getName())) time.sleep(3) # <-- 這裡睡眠了三秒,可以看見主執行緒繼續往下走了 print("子執行緒任務處理完畢") if __name__ == '__main__': # ==== 第一步:實例化出Thread類並添加子執行緒任務以及參數 ==== t1 = threading.Thread(target=task, args=("執行緒[1]",),name="xxx") # 可以在這裡設置,如果不設置則為默認格式:Thread-1 數字是按照執行緒個數來定的 t1.start() # 等待CPU調度..請注意這裡不是立即執行 print("主執行緒名:",threading.current_thread().name) # 直接使用屬性 name print("主執行緒任務處理完畢") # ==== 執行結果 ==== """ 主執行緒任務開始處理 子執行緒任務開始處理,參數:執行緒[1] 獲取當前的執行緒名:xxx 開始設置執行緒名 獲取修改後的執行緒名:yyy 主執行緒名: MainThread 主執行緒任務處理完畢 子執行緒任務處理完畢 """
多執行緒的應用場景
由於GIL鎖的存在,Python中對於I/O
操作來說可以使用多執行緒編程,如果是計算密集型的操作則不應該使用多執行緒進行處理,因為沒有I/O
操作就不能通過I/O切換
來執行其他執行緒,故對於計算密集型的操作來說多執行緒沒有什麼優勢。甚至還可能比普通串列還慢(因為涉及到執行緒切換,雖然是毫秒級別,但是計算的數值越大這個切換也就越密集,GIL鎖是100個CPU指令切換一次的)
注意:我們是在Python2版本下進行此次測試,Python3版本確實相差不大,但是,從本質上來說依然是這樣的。


import threading import time num = 0 def add(): global num for i in range(10000000): # 一千萬次 num += 1 def sub(): global num for i in range(10000000): # 一千萬次 num -= 1 if __name__ == '__main__': start_time = time.time() add() sub() end_time = time.time() print("執行時間:",end_time - start_time) # ==== 執行結果 ==== 三次採集 """ 大約在 1.3 - 1.4 秒 """
計算密集型程式的普通串列運行時間


# coding:utf-8 import threading import time num = 0 def add(): global num for i in range(10000000): # 一千萬次 num += 1 def sub(): global num for i in range(10000000): # 一千萬次 num -= 1 if __name__ == '__main__': start_time = time.time() t1 = threading.Thread(target=add,) t2 = threading.Thread(target=sub,) t1.start() t2.start() t1.join() t2.join() end_time = time.time() print(u"執行時間:",end_time - start_time) # ==== 執行結果 ==== 三次採集 """ 大約 4 - 5 秒 """
計算密集型程式的多執行緒並發運行時間