threading多線程模塊

threading多線程模塊

基本使用

  Python中提供了threading模塊用來實現線程並發編程,使用方法有兩種,一種是將threading模塊下的Therad類進行實例化的方式實現,一種是通過繼承threading模塊下的Therad類並覆寫run()方法實現。

 

實例化Therad類創建子線程


  這種方式是最常用的也是推薦使用的方式。先來介紹一個Therad類中的方法,然後再看代碼。

 

  start():開始線程活動。

  它在一個線程里最多只能被調用一次。它安排對象的 run() 方法在一個獨立的控制進程中調用。如果同一個線程對象中調用這個方法的次數大於一次,會拋出 RuntimeError

  PS:該方法不會立即執行,只是告訴CPU說你可以調度我了,我準備好了,一定要注意不是立即執行!

 

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)  # <-- 這裡睡眠了三秒,可以看見主線程繼續往下走了
    print("子線程任務處理完畢")


if __name__ == '__main__':
    
    # ==== 實例化出Thread類並添加子線程任務以及參數 ====

    t1 = threading.Thread(target=task, args=("線程[1]",))  # <-- 參數必須添加逗號。因為是args所以會打散,如果不加逗號則不能進行打散會拋出異常
    t1.start()  # 等待CPU調度..請注意這裡不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

 

  我們可以看見,在進行time.sleep()的時候線程做了一次切換,這是因為該方法是屬於IO操作,所以GIL鎖將執行權限丟給了主線程。還有一點要注意的就是主線程任務處理完畢後不會立馬結束掉,而是等子線程任務處理完畢後才會真正將主線程連同子線程一起kill掉。

  圖示:

image-20200701030622391

 

自定義類繼承Therad並覆寫run方法


  這種方法並不常見,但是還是要舉例說出來。我們可以看到第一種方法是實例化出了Therad類,並且執行了其start()方法,然後子線程就可以被調度了,其實在內部是通過start()方法調用了Therad類下的run()方法的。

 

  run():代表線程活動的方法。

  你可以在子類型里重載這個方法。 標準的 run() 方法會對作為 target 參數傳遞給該對象構造器的可調用對象(如果存在)發起調用,並附帶從 argskwargs 參數分別獲取的位置和關鍵字參數。

 

  那麼我們就可以自定義一個類並繼承Therad類,再覆寫run()方法

 

import threading
import time

print("主線程任務開始處理")


class Threading(threading.Thread):
    """自定義類"""


    def __init__(self, th_name):
        self.th_name = th_name
        super(Threading, self).__init__()

    def run(self):
        print("子線程任務開始處理,參數:{0}".format(self.th_name))
        time.sleep(3)  # <-- 這裡睡眠了三秒,可以看見主線程繼續往下走了
        print("子線程任務處理完畢")


if __name__ == '__main__':
    
    t1 = Threading("線程[1]")
    t1.start()  # 等待CPU調度..請注意這裡不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

 

  注意現在依然是主線程任務處理完畢後現在是不會立馬結束掉的,而是等子線程任務處理完畢後才會真正將主線程kill掉。其實原則上這兩種創建線程的方式都一模一樣。

 

源碼淺析-選讀


  這個源碼淺析非常淺,主要是來看一下基於實例化Therad類創建子線程內部是如何做的。

  那麼我們看一下其Thread類的源碼,:

 

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)

  調用這個構造函數時,必需帶有關鍵字參數。參數如下:

  group 應該為 None;為了日後擴展 ThreadGroup 類實現而保留。

  target 是用於 run() 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。

  name 是線程名稱。默認情況下,由 “Thread-N” 格式構成一個唯一的名稱,其中 N 是小的十進制數。

  args 是用於調用目標函數的參數元組。默認是 ()

  kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}

  如果不是 Nonedaemon 參數將顯式地設置該線程是否為守護模式。 如果是 None (默認值),線程將繼承當前線程的守護模式屬性。

  如果子類型重載了構造函數,它一定要確保在做任何事前,先發起調用基類構造器(Thread.__init__())。

 

class Thread:
​
    """注釋被我刪掉了"""
    
    _initialized = False  # 這是一個狀態位,來表示該線程是否被被初始化過
def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
                 
        """注釋被我刪掉了"""             
      
        assert group is None, "group argument must be None for now" #如果不是 None,daemon參數將顯式地設置該線程是否為守護模式。 如果是 None (默認值),線程將繼承當前線程的守護模式屬性。
        if kwargs is None:
            kwargs = {}  # kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。
        self._target = target  # 對於第一種調用方式來說,它就是我們的task函數。
        self._name = str(name or _newname())  # 線程名
        self._args = args  # _args是用於調用目標函數的參數元組。默認是 ()。
        self._kwargs = kwargs 
        if daemon is not None: # 判斷其是否為守護線程
            self._daemonic = daemon
        else:
            self._daemonic = current_thread().daemon
        self._ident = None # 這個是線程的編號
        if _HAVE_THREAD_NATIVE_ID:  # 判斷是否具有本地ID
            self._native_id = None
        self._tstate_lock = None  # 鎖定的狀態
        self._started = Event() # 開始
        self._is_stopped = False # 狀態位,是否停止
        self._initialized = True  # 將初始化狀態為改為True
        # Copy of sys.stderr used by self._invoke_excepthook()
        self._stderr = _sys.stderr
        self._invoke_excepthook = _make_invoke_excepthook()
        # For debugging and _after_fork()
        _dangling.add(self)

Thread類的__init__方法

 

  我們可以看見其__init__方法大多都是做了一些初始化的東西。下面我們來看run()方法,它才是離我們最近的一個方法。

def run(self):
​
    """注釋被我刪掉了"""
    
    try:
        if self._target:  # 簡單吧,這個方法,就是判斷你有沒有傳入一個函數。即我們定義的task
            self._target(*self._args, **self._kwargs)  # 有的話就立即執行,我們傳入的name其實就放在了_args中。這裡將它打散出來了,所以我們的task函數中的第一個參數name能收到。
    finally:
        # Avoid a refcycle if the thread is running a function with
        # an argument that has a member that points to the thread.
        del self._target, self._args, self._kwargs  # 不管處不出錯,都會清理他們。當然,如果有則是執行完成後清理

TCPServer中的run()方法

 

  好了,其實看到這裡就行了。其實我們自定義類的傳參也可以不用覆寫__init__再去調用父類方法初始化進行傳參,我們完全以另一種方式,但是我個人不太推薦。

import threading
import time

print("主線程任務開始運行")

class Threading(threading.Thread):
    """自定義類"""

    def run(self):
        print(self._args)  # ('線程[1]',)
        print(self._kwargs)  # {}

        print("子線程任務開始處理,參數:{0}".format(self._args[0]))
        time.sleep(3)  # <-- 這裡睡眠了三秒,可以看見主線程繼續往下走了
        print("子線程任務運行完畢")



if __name__ == '__main__':
    
    t1 = Threading(args=("線程[1]",))
    t1.start()  # 等待CPU調度..請注意這裡不是立即執行
    
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
('線程[1]',)
主線程任務處理完畢
{}
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
"""

自定義類繼承Therad並覆寫run方法的其他方式參數傳入

 

threading通用方法大全

 

thrading的通用方法  
方法/屬性名稱 通俗的功能描述 官方功能描述(譯版)
threading.active_count() 查看當前進程下一共存活了多少個線程的數量,返回的是一個int值。

返回當前存活的線程類 Thread 對象。

返回的計數等於 enumerate() 返回的列表長度。

threading.current_thread() 獲取當前線程對象。

返回當前對應調用者的控制線程的 Thread 對象。

如果調用者的控制線程不是利用 threading 創建,會返回一個功能受限的虛擬線程對象。

threading.currentThread() 同上 同上
threading.excepthook(args, /) 處理由 Thread.run() 引發的未捕獲異常。 太長了,自己去看。懶得寫啊,想要看的自己去看一眼。threading.excepthook()
threading.get_ident() 返回當前線程對象的編號。

返回當前線程的 「線程標識符」。

它是一個非零的整數。它的值沒有直接含義,主要是用作 magic cookie,比如作為含有線程相關數據的字典的索引。

線程標識符可能會在線程退出,新線程創建時被複用。

threading.get_native_id() 返回當前線程對象的編號。和threading.get_ident()相同。

返回內核分配給當前線程的原生集成線程 ID。

這是一個非負整數。 它的值可被用來在整個系統中唯一地標識這個特定線程(直到線程終結,在那之後該值可能會被 OS 回收再利用)。

threading.enumerate() 查看當前進程存活了的所有線程對象,以列表形式返回。

以列表形式返回當前所有存活的 Thread 對象。 該列表包含守護線程,current_thread() 創建的虛擬線程對象和主線程。

它不包含已終結的線程和尚未開始的線程。

threading.main_thread() 返回主線程對象。 返回主 Thread 對象。一般情況下,主線程是Python解釋器開始時創建的線程。
threading.settrace(func) 不太清楚..好像是測試用的。

為所有 threading 模塊開始的線程設置性能測試函數。

在每個線程的 run() 方法被調用前,func 會被傳遞給 sys.setprofile()

threading.stack_size([size]) 返回創建線程時使用的堆棧大小。

返回創建線程時使用的堆棧大小。

可選參數 size 指定之後新建的線程的堆棧大小,而且一定要是0(根據平台或者默認配置)或者最小是32,768(32KiB)的一個正整數。

如果 size 沒有指定,默認是0。

如果不支持改變線程堆棧大小,會拋出 RuntimeError 錯誤。

如果指定的堆棧大小不合法,會拋出 ValueError 錯誤並且不會修改堆棧大小。

32KiB是當前最小的能保證解釋器有足夠堆棧空間的堆棧大小。

需要注意的是部分平台對於堆棧大小會有特定的限制,例如要求大於32KiB的堆棧大小或者需要根據系統內存頁面的整數倍進行分配 – 應當查閱平台文檔有關詳細信息(4KiB頁面比較普遍,在沒有更具體信息的情況下,建議的方法是使用4096的倍數作為堆棧大小)。

threading.TIMEOUT_MAX 規定一個全局的所有阻塞函數的最大時間。

阻塞函數( Lock.acquire(), RLock.acquire(), Condition.wait(), …)中形參 timeout 允許的最大值。

傳入超過這個值的 timeout 會拋出 OverflowError 異常。

 

線程對象方法大全

 

線程對象方法大全(即Thread類的實例對象)  
方法/屬性名稱 通俗的功能描述 官方功能描述(譯版)
start() 啟動線程,該方法不會立即執行,而是告訴CPU自己準備好了,可以隨時調度,而非立即啟動。

開始線程活動。

它在一個線程里最多只能被調用一次。

它安排對象的 run() 方法在一個獨立的控制進程中調用。如果同一個線程對象中調用這個方法的次數大於一次,會拋出 RuntimeError

run() 一般是自定義類繼承Thread並覆寫的方法,即線程的詳細任務邏輯。

代表線程活動的方法。

你可以在子類型里重載這個方法。

標準的 run() 方法會對作為 target 參數傳遞給該對象構造器的可調用對象(如果存在)發起調用,並附帶從 argskwargs 參數分別獲取的位置和關鍵字參數。

join(timeout=None) 主線程默認會等待子線程運行結束後再繼續執行,timeout為等待的秒數,如不設置該參數則一直等待。

等待,直到線程終結。

這會阻塞調用這個方法的線程,直到被調用 join() 的線程終結 — 不管是正常終結還是拋出未處理異常 — 或者直到發生超時,超時選項是可選的。

timeout 參數存在而且不是 None 時,它應該是一個用於指定操作超時的以秒為單位的浮點數或者分數。因為 join() 總是返回 None ,所以你一定要在 join() 後調用 is_alive() 才能判斷是否發生超時 — 如果線程仍然存活,則 join() 超時。

timeout 參數不存在或者是 None ,這個操作會阻塞直到線程終結。一個線程可以被 join() 很多次。如果嘗試加入當前線程會導致死鎖, join() 會引起 RuntimeError 異常。如果嘗試 join() 一個尚未開始的線程,也會拋出相同的異常。

name 可以通過 = 給該線程設置一個通俗的名字。如直接使用該屬性則返回該線程的默認名字。 只用於識別的字符串。它沒有語義。多個線程可以賦予相同的名稱。 初始名稱由構造函數設置。
getName() 獲取該線程的名字。 舊的 name 取值 API;直接當做特徵屬性使用它。
setName() 設置該線程的名字。 舊的 name 設值 API;直接當做特徵屬性使用它。
ident 獲取線程的編號。

這個線程的 ‘線程標識符’,如果線程尚未開始則為 None

這是個非零整數。參見 get_ident() 函數。當一個線程退出而另外一個線程被創建,線程標識符會被複用。

即使線程退出後,仍可得到標識符。

native_id 獲取線程的編號,和ident相同。

此線程的原生集成線程 ID。

這是一個非負整數,或者如果線程還未啟動則為 None。 請參閱 get_native_id() 函數。 這表示線程 ID (TID) 已被 OS (內核) 賦值給線程。

它的值可能被用來在全系統範圍內唯一地標識這個特定線程(直到線程終結,在那之後該值可能會被 OS 回收再利用)。註解類似於進程 ID,線程 ID 的有效期(全系統範圍內保證唯一)將從線程被創建開始直到線程被終結。可用性: 需要 get_native_id() 函數。

is_alive() 查看線程是否存活,返回布爾值。 run() 方法剛開始直到 run() 方法剛結束,這個方法返回 True 。模塊函數 enumerate() 返回包含所有存活線程的列表。
isAlive( ) 同上,但是不推薦使用這種方法。 同上,但是不推薦使用這種方法。
daemon 查看線程是否為一個守護線程,返回布爾值。默認為False

一個表示這個線程是(True)否(False)守護線程的布爾值。

一定要在調用 start() 前設置好,不然會拋出 RuntimeError

初始值繼承於創建線程;主線程不是守護線程,因此主線程創建的所有線程默認都是 daemon = False

當沒有存活的非守護線程時,整個Python程序才會退出。

isDaemon() 查看線程是否為一個守護線程,返回布爾值。默認為False 舊的 daemon取值 API;建議直接當做特徵屬性使用它。
setDaemon() 設置一個線程為守護線程,參數如果為True則表示該線程被設置為守護線程,默認為False。當主線程運行完畢之後設置為守護線程的子線程便立即結束執行… 舊的 daemon設值 API;建議直接當做特徵屬性使用它。

 

常用方法示例

  由於方法太多了,所以這裡就只例舉一些非常常用的。

 

守護線程setDaemon()


 

  setDaemon() :設置一個線程為守護線程,參數如果為True則表示該線程被設置為守護線程,默認為False。當主線程運行完畢之後設置為守護線程的子線程便立即結束執行…

 

  我們對比上面的圖,現在子線程是沒有設置為守護線程的:

image-20200701030701850

  當他設置為守護線程之後會是這樣的:

image-20200701031136387

 

 

  代碼如下:

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")


if __name__ == '__main__':
    
    t1 = threading.Thread(target=task, args=("線程[1]",))
    
    t1.setDaemon(True)  # <-- 設置線程對象t1為守護線程,注意這一步一定要放在start之前。
    
    t1.start()  # 等待CPU調度..請注意這裡不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
"""

 

線程阻塞join()


 

  join(timeout=None):主線程默認會等待子線程運行結束後再繼續執行,timeout為等待的秒數,如不設置該參數則一直等待。

 

  圖示如下:(未設置超時時間)

image-20200701032331066

  代碼如下:

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.start()  #  等待CPU調度..請注意這裡不是立即執行

    t1.join()  # <--- 放在start()下面,死等

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
主線程任務處理完畢
"""

 

  圖示如下:(設置超時時間)

image-20200701032942587

  代碼如下:

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.start()  #  等待CPU調度..請注意這裡不是立即執行

    t1.join(2)  # <--- 放在start()下面,等2秒後主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

 

注意,join()方法可以多次設置!

 

join()與setDaemon(True)共存


 

  如果同時設置setDaemon(True)join()方法會怎麼樣呢?有兩種情況:

    1.join()方法沒有設置timeout(沒有設置即表示死等)或者timeout的時間比子線程作業時間要長,這代表子線程會死在主線程之前,setDaemon(True)也就沒有了意義,即失效了。

    2.join()設置了timeout並且timeout的時間比子線程作業時間要短,這代表主線程會死在子線程之前,setDaemon(True)生效,子線程會跟着主線程一起死亡。

 

# ==== 情況一 ====

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.setDaemon(True)  # <--- 放在start()上面,主線程運行完後會立即終止子線程的運行。但是由於有join(),故不生效。

    t1.start()  #  等待CPU調度..請注意這裡不是立即執行

    t1.join()  # <--- 放在start()下面,等2秒後主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
主線程任務處理完畢
"""

情況一

# ==== 情況二 ====

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.setDaemon(True)  # <--- 放在start()上面,主線程運行完後會立即終止子線程的運行。但是由於有join(),故不生效。

    t1.start()  #  等待CPU調度..請注意這裡不是立即執行

    t1.join(2)  # <--- 放在start()下面,等2秒後主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
"""

情況二

 

設置與獲取線程名


  我們來看一下如何設置與獲取線程名。

 

  threading.current_thread() :獲取當前線程對象。

  getName() :獲取該線程的名字。

  setName() :設置該線程的名字。

  name :可以通過 = 給該線程設置一個通俗的名字。如直接使用該屬性則返回該線程的默認名字。

 

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    obj  =  threading.current_thread()  # 獲取當前線程對象
    print("獲取當前的線程名:{0}".format(obj.getName()))
    print("開始設置線程名")
    obj.setName("yyy")
    print("獲取修改後的線程名:{0}".format(obj.getName()))
    time.sleep(3)  # <-- 這裡睡眠了三秒,可以看見主線程繼續往下走了
    print("子線程任務處理完畢")


if __name__ == '__main__':
    # ==== 第一步:實例化出Thread類並添加子線程任務以及參數 ====
    t1 = threading.Thread(target=task, args=("線程[1]",),name="xxx")  # 可以在這裡設置,如果不設置則為默認格式:Thread-1 數字是按照線程個數來定的
    t1.start()  # 等待CPU調度..請注意這裡不是立即執行

    print("主線程名:",threading.current_thread().name)  # 直接使用屬性 name
    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
獲取當前的線程名:xxx
開始設置線程名
獲取修改後的線程名:yyy
主線程名: MainThread
主線程任務處理完畢
子線程任務處理完畢
"""

 

多線程的應用場景


  由於GIL鎖的存在,Python中對於I/O操作來說可以使用多線程編程,如果是計算密集型的操作則不應該使用多線程進行處理,因為沒有I/O操作就不能通過I/O切換來執行其他線程,故對於計算密集型的操作來說多線程沒有什麼優勢。甚至還可能比普通串行還慢(因為涉及到線程切換,雖然是毫秒級別,但是計算的數值越大這個切換也就越密集,GIL鎖是100個CPU指令切換一次的)

 

  注意:我們是在Python2版本下進行此次測試,Python3版本確實相差不大,但是,從本質上來說依然是這樣的。

 

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000): # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    add()
    sub()

    end_time = time.time()
    print("執行時間:",end_time - start_time)
    
# ==== 執行結果 ==== 三次採集

"""
大約在 1.3 - 1.4 秒
"""

計算密集型程序的普通串行運行時間

# coding:utf-8

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000):  # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    t1 = threading.Thread(target=add,)
    t2 = threading.Thread(target=sub,)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    end_time = time.time()
    print(u"執行時間:",end_time - start_time)

# ==== 執行結果 ==== 三次採集

"""
大約 4 - 5 秒
"""

計算密集型程序的多線程並發運行時間