第37天並發編程之執行緒篇

  • 2020 年 1 月 21 日
  • 筆記

一. 執行緒初識

  1. 什麼是執行緒和進程 進程指的是一個程式執行的過程,是一個資源單位。它包含了作業系統開闢記憶體空間,將應用程式載入到記憶體中以及執行應用程式程式碼的整個過程。就像是一個車間內的一個小工廠一樣,整個的生產過程被稱之為一個進程。 執行緒是作業系統真正的執行單元。它僅僅代表的是進程中的最後一步,也就是執行應用程式程式碼的過程。就像是車間里的一條條流水線一樣,是真正的用來執行程式碼的一個過程。
  2. 執行緒和進程的區別
  3. 進程佔用開銷較大,這是因為進程要重新開闢一段記憶體空間,執行緒開銷較小。
  4. 進程之間記憶體是物理隔離的,但是同一進程內的執行緒之間記憶體是共享的。注意不同進程之間的執行緒通訊還是需要通過隊列進行通訊的。 from threading import Thread import time # 在主執行緒中設置一個全局變數x x = 100 def task(): global x x = 0 t = Thread(target=task) t.start() t.join() # 此時列印出來的結果是0,說明同一個進程之間執行緒數據是共享的 print(x)
  5. 開啟執行緒的兩種方式
  6. 通過類Thread 因為執行緒開銷較小的原因,我們會發現結果是先列印task裡面的內容,然後列印主執行緒三個字,這個和進程正好相反。 from threading import Thread import time def task(): print('thread is running ….') time.sleep(3) Thread(target=task).start() print('主執行緒')
  7. 通過繼承類Thread 我們創建一個執行緒並且執行的過程本質上就是創建記憶體之後執行Thread類的run函數,因此我們可以通過繼承類的方式去創建。 from threading import Thread import time class MyThread(Thread): def run(self): print('thread is running ….') time.sleep(3) MyThread().start() print('主執行緒')
  8. 執行緒相關的屬性方法 # current_thread() 當前執行緒對象,.name是裡面的一個屬性,可以得到執行緒名稱,主執行緒名稱為MainTread # active_count() 當前活躍的執行緒數,記得還有一個主執行緒 # enumerate 返回一個列表,裡面都是當前活躍的執行緒數目 from threading import Thread, current_thread, active_count, enumerate import time def task(): print('%s is running' % current_thread().name) time.sleep(3) #: name表示自定義執行緒名稱 t = Thread(target=task, name='第一個執行緒') t.start() print(current_thread().name) print(active_count()) print(enumerate())
  9. 守護進程與守護執行緒
  10. 守護進程 對於進程而言,如果程式碼中有守護進程,也有非守護進程,等主進程程式碼執行完畢之後守護進程也就結束了,並不會等待非守護進程的執行。 from multiprocessing import Process def task1(): print(123) time.sleep(1) print('end123') def task2(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) p1.daemon = True p1.start() p2.start() # 當執行完print之後就代表主進程程式碼已經執行完畢,此時就會終止守護進程 # 所以不會列印task1裡面的內容 # 但是還是要等待非守護進程結束之後主進程才會真正的結束 # 因此我們看到了task2裡面的內容 print('主進程over…') # 執行結果; # 主進程over… # 456 # end456
  11. 守護執行緒 對於執行緒而言,如果程式碼中有守護執行緒,也有非守護執行緒,等主執行緒程式碼執行完畢之後並不會終止守護執行緒的執行,只有等到所有的非守護執行緒執行完畢之後才意味著主執行緒的結束,此時才會總之守護執行緒。 將上面的程式碼的進程改成執行緒,就會出現不一樣的效果。 from threading import Thread import time def task1(): print(123) time.sleep(5) print('end123') def task2(): print(456) time.sleep(3) print('end456') t1 = Thread(target=task1) t2 = Thread(target=task2) t1.daemon = True t1.start() t2.start() # 當print程式碼執行完畢之後,就代表這主執行緒程式碼執行完畢了,但是並不是主執行緒執行完畢了,這個是和進程的一個區別 # 因此要等待非守護執行緒t2執行完畢之後才代表主執行緒真的結束了,此時task1作為守護進程也就被終止了 # 因此我們會看到能夠列印全部的task2內容,但是不會列印task1的內容 print('主執行緒..') # 運行結果 # 123 # 456 # 主執行緒.. # end456 二.鎖
    1. 互斥鎖 執行緒中的互斥鎖和進程中的互斥鎖都是為了解決多個進程或者執行緒同時訪問同一個系統資源時出現數據混亂的問題,不同之處在於所使用模組不一樣,因此執行緒互斥鎖只能在執行緒中使用,進程互斥鎖只能在進程中使用。 不使用執行緒鎖的問題 from threading import Thread import time x = 100 def task(): global x temp = x time.sleep(0.1) x = temp – 1 #: 創建100個執行緒全局變數x進行修改,每次減1 t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() # 用來等待所有執行緒結束 for i in t_l: i.join() #: 當我們列印數據的時候發現結果是99,為什麼呢? #: 因為每個執行緒temp = x然後等待i/o,當cpu再次切換過來執行的時候temp=100,x的值就是99 print(x) 因此在一些數據的交互過程中我們需要加上執行緒鎖來保證數據的安全性 from threading import Thread, Lock import time x = 100 mutex = Lock() # 創建鎖 def task(): global x mutex.acquire() # 在數據修改之前添加鎖 temp = x time.sleep(0.1) x = temp – 1 mutex.release() # 在數據修改之後釋放鎖 #: 創建100個執行緒全局變數x進行修改,每次減1 t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() # 用來等待所有執行緒結束 for i in t_l: i.join() print(x)
    2. 死鎖 死鎖的現象就是兩個執行緒(或者兩個進程)A和B,還有兩個互斥鎖C和D。A和B都需要C和D這兩把鎖,但是A搶到了C卻沒有搶到B,而B搶到了D卻沒有搶到C,從而導致程式進入死鎖狀態。 from threading import Thread, Lock, current_thread mutex1 = Lock() mutex2 = Lock() def task1(): mutex1.acquire() print('%s 搶到了鎖1' % current_thread().name) # 當搶到鎖1之後,cpu就會執行其他的程式 # 當再回來的時候卻發現鎖2被task2搶到了因此在等待task2釋放鎖2 # 但是此時task2和task1是一樣的,在等待task1釋放鎖1,此時就進入了死鎖的狀態 time.sleep(0.1) mutex2.acquire() print('%s搶到了鎖2' % current_thread().name) mutex2.release() print('%s釋放了鎖2' % current_thread().name) mutex1.release() print('%s釋放了鎖1' % current_thread().name) def task2(): mutex2.acquire() print('%s 搶到了鎖1' % current_thread().name) time.sleep(0.1) mutex1.acquire() print('%s搶到了鎖2' % current_thread().name) mutex1.release() print('%s釋放了鎖2' % current_thread().name) mutex2.release() print('%s釋放了鎖1' % current_thread().name) t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() 3.遞歸鎖 遞歸鎖所使用的是RLock函數,其原理是如果我自己需要多把鎖的時候,我就把這多把鎖設置成一個遞歸鎖,搶到一次遞歸鎖計數就加1,當其他的執行緒或者進程想使用這一把鎖的時候,會首先去查看鎖計數是否為0,如果不為零,就等待其他的進程或者執行緒來釋放鎖,這就可以解決死鎖問題了。 # mutex1 = Lock() # mutex2 = Lock() # 只需要將上面的鎖修改成遞歸鎖就可以解決上面的死鎖問題了 mutex1 = mutex2 = RLock() 4.訊號量 訊號量使用的是Semaphore類,我們可以給他傳遞一個值來代表一次可以同時運行幾個執行緒或者是幾個進程,類似於一種池的概念,在某種意義上它是不能夠保證數據的安全性。只是說在某些沒有數據交互的場景下我們可以控制其每次進程或者執行緒的數量。 from threading import Thread, Semaphore, current_thread import time import random s = Semaphore(5) x = 30 def go_wc(): global x s.acquire() temp = x time.sleep(random.randint(1, 3)) x = temp – 1 s.release() t_l = [] for i in range(30): t = Thread(target=go_wc) t_l.append(t) t.start() for i in t_l: i.join() print(x) 5.GIL全局解釋器鎖
    3. python程式執行的三個步驟 (1). 開闢一塊記憶體空間,啟動Python解釋器 (2). 載入python程式到記憶體中 (3). 將記憶體中的程式傳遞給python解釋器一步一步執行

    問題:為什麼多個執行緒不能同時使用一個python解釋器呢? 這是因為在Python中有一種垃圾回收機制,當一個value的引用計數為0之後,就會被python的垃圾回收機制所清空掉。但是python的垃圾回收機制其實也是通過一個執行緒來執行的,如果可以同時調用解釋器,這就會出現這樣一個問題:如果我賦值了一個操作a = [1, 2, 3]的時候,當我這個執行緒還沒有執行這個操作,只是創建了一個值[1, 2, 3]的時候,突然python解釋器把垃圾回收機制的執行緒給執行了,這是垃圾回收機制就會發現這個值[1, 2, 3]當前引用計數還是0呢,就直接清掉了,但是此時我還沒有來得及給a賦值呢,這就出現了數據錯亂的問題。 # This lock is necessary mainly because CPython』s memory management is not thread-safe. # 意思是CPython的記憶體管理機制(垃圾回收機制)不是執行緒安全的,因此我們不能讓python執行緒同時去調用python解釋器。

    1. 什麼叫做全局解釋器鎖 # In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple # native threads from executing Python bytecodes at once. # GIL全局解釋器鎖是一種互斥鎖,在同一時刻,保證多個執行緒中只能有一個執行緒使用python解釋器
    2. 多執行緒是串列還是並發還是並行 多執行緒其實也是並發的,串列指的是task1完全執行完畢之後才去執行task2,如下的程式碼解析。這也是CPython的一大詬病之一,雖然說在I/O密集型的操作中並不會太多的去影響性能,但是相對於多核多執行緒並發的來說,很顯然CPython做的還不是很好。(以目前我的水品來看,覺得如果一個執行緒內可以有多個python解釋器就好了…….哈哈哈) from threading import Thread, current_thread def task1(): # 1. 列印當前資訊,遇到sleep之後,cpu會執行其他的操作,此時釋放GIL鎖 # 3. 當釋放了GIL鎖之後,task2會立馬搶到GIL鎖,然後cpu執行 print('is running ….', current_thread().name) time.sleep(1) # 6. 列印end資訊之後,子執行緒執行完畢,釋放GIL鎖 print('ending1….') def task2(): # 4. 搶到GIL鎖之後,列印當前資訊,遇到sleep之後,cpu去執行其他的操作,此時會釋放GIL鎖 # 5. 此時task1睡眠完成之後,會立馬去搶GIL鎖,然後列印end資訊 print('is running ….', current_thread().name) time.sleep(1) print('ending2….') t1 = Thread(target=task1) t2 = Thread(target=task2) # 此時會去執行task1函數 # 1. 首先我要去搶GIL鎖,當搶到鎖之後進入task1函數 t1.start() t2.start()
    3. 什麼時候開多執行緒和多進程 需要有四個任務去處理 方案一:開啟四個進程 方案二:開啟四個執行緒 單核: 無論是哪種程式都應該使用方案二,因為單核情況下無論是進程還是執行緒都需要不停的切換執行,但是在執行緒的情況下可以減少開啟進程的開銷以及節省記憶體空間的使用。 多核: 1. I/O密集型,再多的核也解決不了I/O等待的問題,應該選擇方案二 2. 計算密集型,多核意味著並行計算,應該選擇方案一 計算密集型性能測試 from multiprocessing import Process from threading import Thread import time def task1(): x = 1 for i in range(60000000): x += i def task2(): x = 1 for i in range(60000000): x += i if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() – start) # 結論 # 對於計算密集型的程式我們應該使用多進程來代替多執行緒。因為python中的多執行緒並不能利用多核實現真正的並行 # 使用多進程的結果 # 8.21526575088501 # 使用多執行緒的結果 # 12.482805252075195 I/O密集型性能測試 from threading import Thread import time def task1(): time.sleep(3) def task2(): time.sleep(3) if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() – start) # 結論 # 對於I/O密集型的程式我們應該用多執行緒去代替多進程 # 使用多進程的結果 # 3.2341346740722656 # 使用多執行緒的結果 # 3.002285242080688

三. 進程池與執行緒池

  1. 進程池和執行緒池 池就是一個容器,進程池就是用來裝進程的容器,那麼執行緒池就是用來裝執行緒池的容器,為什麼我們需要進程池和執行緒池呢?因為在大部分的情況下由於電腦硬體條件的限制我們並不能無線的開啟進程或者執行緒,儘管執行緒的開銷很小,因此我們需要用一個容器來限制能夠開啟的最大進程數和執行緒數,從而保證我們的電腦可以正常的提供服務。
  2. 進程池的簡單使用 from concurrent.futures import ProcessPoolExecutor import os import random import time def task1(x): print('%s is running..' % os.getpid()) time.sleep(random.randint(1,3)) if name == 'main': # 創建一個進程池,進程池的大小可以通過參數進行傳遞, 如果不指定,默認是cpu的核數 process_pool = ProcessPoolExecutor(4) # 當執行完submit之後,就會額外的創建出4個進程,用來執行任務 # 函數task1的參數直接在submit中輸入就可以傳參 process_pool.submit(task1, 1) time.sleep(30)

在cmd中執行輸入命令 【tasklist |findstr pyt】 會發現出現了5個python進程,其中有一個是主執行緒,另外四個就是進程池內的進程數。

  1. 執行緒池的簡單使用

執行緒池的使用和進程池是一樣的,只是導入的名稱不一樣。

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor  from threading import current_thread  import os  import random  import time    def task1(x):      print('%s is running..' % x)      time.sleep(random.randint(1,3))    if __name__ == '__main__':      # 創建一個執行緒池,執行緒池的大小可以通過參數進行傳遞, 如果不指定,默認是cpu的核數 * 5      process_pool = ThreadPoolExecutor(4)        for i in range(20):          # 當第一次執行submit之後,就會額外的創建出4個執行緒等待著執行任務          # 因此當我們執行了程式碼之後就會看到一下列印出了四行內容,之後就是執行完一個任務再進行下一個任務,最多的執行緒數是5個          # 其中有一個執行緒是主執行緒          process_pool.submit(task1, i)          # time.sleep(2)

四.同步vs非同步 阻塞vs非阻塞

  1. 阻塞和非阻塞 阻塞和非阻塞描述的是程式的一種運行狀態 阻塞(阻塞態) 遇到I/0之後,程式在原地等待I/0,並釋放cpu資源 非阻塞(就緒態或者運行態): 沒有遇到I/0,或者通過某種方式即便是程式遇到I/0也不會停在原地,而是去執行其他的操作,儘可能多的使用cpu的資源。
  2. 同步調用和非同步調用 同步和非同步描述的是程式執行的方式 同步調用 提交完任務之後就在原地進行等待,直到任務執行完畢並拿到了返回值,才會去執行下一行程式碼。 非同步調用 提交完任務之後不會在原地進行等待,直接運行下一行程式碼,結果可以通過非同步回調得到。
  3. 同步調用簡單的實現過程 from concurrent.futures import ThreadPoolExecutor import time import random def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此處可以返回一個對象,對象有一個result屬性可以讓我們獲得返回值 obj = thread_pool.submit(task, i) # 因為有了result,所以此時的程式就是一個同步調用的方式,我們必須等待任務一個一個完成並且拿到返回值之後才會繼續執行循環 print(obj.result())
  4. 非同步調用簡單的實現過程 from concurrent.futures import ThreadPoolExecutor import time import random def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此處可以返回一個對象,對象有一個result屬性可以讓我們獲得返回值 # 此時是非同步調用方式,因為在所有的任務創建的時候,我們並不需要去等待結果,就可以直接去運行下一次循環 obj = thread_pool.submit(task, i) obj_l.append(obj) # 就相當於之前的的join等待執行緒結束 # 因為現在是一個執行緒池,所以我們不能單純的使用join去等待執行緒結束,我們還需要close將目前的任務先鎖定住 # 其實shutdown內部就是實現了先鎖定任務,然後等待所有任務執行完畢的過程 thread_pool.shutdown() # 當所有的任務結束之後,我們就可以通過返回的對象列表中隨便查看其中的值 print(obj_l[0].result())

五. 非同步+回調機制

案例:寫一個簡單的爬蟲案例,來詳細的分析一下非同步和回調機制

首先,我們先以多進程的方式寫一個同步的程式,用來爬取網頁上的資訊

import requests  import time  import os  import random  from concurrent.futures import ProcessPoolExecutor      def get(url):      """獲取網頁的資訊"""      print('%s get %s' % (os.getpid(), url))      response = requests.get(url)      time.sleep(0.5)  # 模擬下載時間      if response.status_code == 200:          return response.text      def parse(data):      """解析數據"""      time.sleep(0.2)  # 模擬解析時間      print('%s 解析長度為%s' % (os.getpid(), len(data)))      if __name__ == '__main__':      # 創建一個進程池,設置進程池的數量為4      pool = ProcessPoolExecutor(4)      # 這是我們需要爬取的url      urls = [          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',      ]        for url in urls:          # 每執行一個任務,都通過result去獲得相應的數據,然後通過parse去解析數據          data = pool.submit(get, url).result()          parse(data)

問題:我們發現雖然說能夠實現基本的功能,但是太慢了,每次都要等待一個任務全部完成獲得返回值之後才會去執行下面的程式碼,為了提升效率,我們考慮可以把同步的方式轉換成非同步。因此我們需要將name裡面的內容轉換成下面的樣子。

if __name__ == '__main__':      # 創建一個進程池,設置進程池的數量為4      pool = ProcessPoolExecutor(4)      # 這是我們需要爬取的url      urls = [          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',      ]        obj_l = []      for url in urls:          obj = pool.submit(get, url)          obj_l.append(obj)      # 等待進程池內的任務全部完成之後才去執行下面的程式碼      pool.shutdown()      # 此時url的內容都已經下載完成並且保存在對象obj_l列表中,我們通過parse就可以解析了      for obj in obj_l:          parse(obj.result())

問題: 雖然說效率有所提升但是依然存在一些問題

  1. 解析過程要等待所有的下載任務執行完成之後才能進行解析
  2. 解析數據是串列的,如果一個解析過程很慢,就會大大的降低整個程式的效率

基於上面的問題,我們可以將解析過程放在get函數裡面,在一個任務下載結束之後就會立馬的進行解析數據,可以解決問題1,而且對於解析數據而言都是通過下載數據相同的進程進行解析的,可以解決第二個問題。因此我們的程式碼可以修改成下面這個樣子。

import requests  import time  import os  from concurrent.futures import ProcessPoolExecutor      def get(url):      """獲取網頁的資訊"""      print('%s get %s' % (os.getpid(), url))      response = requests.get(url)      time.sleep(0.5)  # 模擬下載時間      if response.status_code == 200:          # 返回值我們也不需要,直接去調用parse解析就可以了          parse(response.text)      def parse(data):      """解析數據"""      time.sleep(0.2)      print('%s 解析長度為%s' % (os.getpid(), len(data)))      if __name__ == '__main__':      # 創建一個進程池,設置進程池的數量為4      pool = ProcessPoolExecutor(4)      # 這是我們需要爬取的url      urls = [          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',      ]        for url in urls:          pool.submit(get, url)

問題: 這樣寫的話我們會發現下載內容和解析內容的程式碼被寫在一塊了,而且還是被同一個進程執行,這就和我們之前所講的生產者和消費者模型相悖了,如果我們想讓兩個函數解耦合,我們可以在get函數中將結果返回回來,然後在主進程中接收,並執行解析函數。此時我們就需要用到回調函數了。

import requests  import time  import os  from concurrent.futures import ProcessPoolExecutor      def get(url):      """獲取網頁的資訊"""      print('%s get %s' % (os.getpid(), url))      response = requests.get(url)      time.sleep(0.5)  # 模擬下載時間      if response.status_code == 200:          # 返回值我們也不需要,直接去調用parse解析就可以了          return response.text      def parse(data):      """解析數據"""      time.sleep(0.2)      print('%s 解析長度為%s' % (os.getpid(), len(data)))      if __name__ == '__main__':      # 創建一個進程池,設置進程池的數量為4      pool = ProcessPoolExecutor(4)      # 這是我們需要爬取的url      urls = [          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',          'https://www.baidu.com',      ]        for url in urls:          # 通過submit將對象obj扔到進程池中          obj = pool.submit(get, url)          # 通過函數add_done_callback函數將傳遞的函數parse綁定到對象上obj          # 當對象obj所執行的任務一旦完成並獲得一個返回值的時候就會自動的去調用parse函數,並將對象當做參數傳遞進去          # 這種方式和get方法中直接解析數據所實現的效果都是差不多的,只是說解決了耦合的問題,從某種意義上講,有時候解析的時間還要更長一點          # 注意: parse函數是全部都是通過主進程進行調用的,這也就解釋了回調的意思,我來調用你,結果給我之後就應該就由我來解析          obj.add_done_callback(parse)

多執行緒和多進程的方式都是一樣的,唯一不同的地方就在於回調函數是哪個執行緒空閑哪個執行緒去執行回調函數

六. 執行緒queue, Event

  1. Queue和進程的隊列一樣,先進先出 # 這種隊列是先進先出 q1 = queue.Queue(3) q1.put(1) q1.put('2') q1.put([234]) # q1.put({5: 6}) # 此時會阻塞,等到取出去一個之後才能添加到隊列中 print(q1.get()) # 結果1
  2. LifoQueue 堆棧,先進後出 # 先進後出,堆棧 q1 = queue.LifoQueue(3) q1.put(1) q1.put('2') q1.put([234]) # q1.put({5: 6}) # 此時會阻塞,等到取出去一個之後才能添加到隊列中 print(q1.get()) # 結果是[234]
  3. PriorityQueue 優先順序隊列,數字越小優先順序越大 # 優先順序隊列,傳入的參數是一個元組,第一個值為優先順序,必須是int,第二個是要壓入隊列中的值 # 優先順序越小越優先,如果優先順序相等,比較後面的值,越小越優先 q1 = queue.PriorityQueue(3) q1.put((1, '123')) q1.put((1, '456')) q1.put((2, '789')) # q1.put((2, {5: 6})) # 此時會阻塞,等到取出去一個之後才能添加到隊列中 print(q1.get()) # 結果(1, '123')
  4. Event 用來進程之間協同工作的 簡單的event的使用,用來提前連接伺服器判斷伺服器是否可連 from threading import Thread, Event, current_thread import time # 創建一個event對象 event = Event() def check(): """首先啟動一個執行緒嘗試連接請求""" print('%s 嘗試伺服器是否可以連接' % current_thread().name) time.sleep(3) # 模擬連接請求持續了秒 event.set() # 當連接請求成功之後設置事件 def connection(): """當check檢測通過之後再開始連接""" print('%s 嘗試連接' % current_thread().name) event.wait() # 當event.set之後才會執行下面的程式碼,否則阻塞 print('%s 連接成功' % current_thread().name) t1 = Thread(target=connection) t2 = Thread(target=connection) t3 = Thread(target=connection) t4 = Thread(target=check) t1.start() t2.start() t3.start() t4.start() 可以使用is_set功能模擬嘗試三次連接之後斷開連接的操作,將connection函數改成下面這個樣子 def connection(): """當check檢測通過之後再開始連接""" # 如果沒有設置,也就是說check還沒有來連接上伺服器,就一直嘗試連接 count = 1 while not event.is_set(): if count > 3: print('您嘗試連接的次數過多,請稍後重試') return time.sleep(0.8) print('%s 嘗試連接' % current_thread().name) count += 1 # event.wait() # 當event.set之後才會執行下面的程式碼,否則阻塞 print('%s 連接成功' % current_thread().name)