python的多執行緒和多進程(一)
- 2019 年 11 月 3 日
- 筆記
在進入主題之前,我們先學習一下並發和並行的概念:
–並發:在作業系統中,並發是指一個時間段中有幾個程式都處於啟動到運行完畢之間,且這幾個程式都是在同一個處理機上運行。但任一時刻點上只有一個程式在處理機上運行。形象的點描述:一個人做很多事情,但同一時刻只能做一件事情。
–並行:當系統有一個CPU時,則程式的操作有可能非並發。當一個CPU執行一個程式時,另一個CPU可以執行另一個程式,兩個程式互不搶佔CPU資源,可以同時進行。形象的描述:多人同時做多件事情
可能有小夥伴又會問串列又是什麼鬼呢?再來嘮叨一下:
串列和並行是數據傳輸方式的區別。其實和並發和並行的概念是兩個領域:
1、數據傳輸方式不同,串列口傳輸方式為數據排成一行,一位一位送出,接受也是一樣。並行口傳輸8位數據一次送出。
2、針腳不同,串列口針腳少,並行口針腳多
3、用途不同,串列口現在只用作控制介面,並行口多用作印表機、掃描儀等介面。
來吧,進入正題:
進程的概念:電腦程式其實只是存儲在磁碟上可執行的二進位(或其他可執行的類型)文件。只有把它們載入到記憶體中並被作業系統調用,才算是被執行,並用擁有生命周期。所以說進程是一個執行中的程式。
每個進程都擁有自己的地址空間,記憶體,數據棧以及其他用於跟蹤執行的輔助數據。
此間,作業系統管理其上的所有進程的執行,並為這些進程合理分配時間。
多進程:那就是在一個多核的電腦上同時運行多個程唄,其實是實現並行。
但是呢,我們先從多執行緒學習起。多執行緒搞定了,多進程也就是小意思了。
在此之前先介紹一個概念,之後會用到。GIL鎖,有可能大家會聽到別人說python運行很慢啥的,其實就是這個GIL鎖的原因,那它到底是個啥呢:
GIL全局解釋性鎖,python在設計的時候,還沒有多核處理器的概念。因此,為了設計的方便和執行緒的安全,python之父就設計了一個鎖。這個鎖要求,任何一個進程中,同時只能有一個執行緒在執行。因此並不能多個執行緒分配多個CPU資源。所以python中的執行緒只能實現並發,而不能實現真正的並行,這就是python所執行緒的局限性。在python3中GIL鎖進行了該井,在遇到任何IO阻塞(不是耗時)的時候,會自動切換執行緒,這就使得在進行多IO操作的時候python的鎖多執行緒有很大的優勢。同時一個執行緒在運行時間或者運行步驟達到一定的閾值時,也會自動的切換執行緒。
對比於多進程,多執行緒其實是實現並發的操作,在在python中使用multiprocessing包實現多進程:
首先來看下多執行緒的實現,這裡使用的是threading包。用sleep模擬耗時操作:
1 import time 2 import threading 3 4 def get_detail_html(url): 5 print("starting get detail html") 6 time.sleep(4) 7 print("ending get detail html") 8 9 def get_detail_url(url): 10 print("starting get detail url") 11 time.sleep(2) 12 print("ending get detail url") 13 14 if __name__ == '__main__': 15 thread1 = threading.Thread(target=get_detail_html, args=("",)) 16 thread2 = threading.Thread(target=get_detail_url, args=("",)) 17 start_time = time.time() 18 thread1.setDaemon(True) 19 thread2.setDaemon(True) 20 thread1.start() 21 thread2.start() 22 print(123456) 23 thread1.join() 24 thread2.join()
25 print("using time: {}".format(time.time()-start_time)
做個注釋吧,可能有點亂:
1、在子執行緒sleep時候,主執行緒繼續往下執行,主執行緒執行完,程式此刻並沒有退出。子執行緒的程式還是會執行完。
2、為了在主執行緒執行完之後kill掉子執行緒。需要給子執行緒.setdeDaemon(True)。將子執行緒設置為守護執行緒。意思為:主執行緒結束後設置為守護執行緒的子執行緒也會義無反顧的殉情而亡當有多個子執行緒,其中部分設置守護執行緒。主執行緒會等到沒有設置為守護執行緒的子執行緒運行完之後推出而設置為守護執行緒的子執行緒,如果在未設置為守護執行緒的子執行緒結束前運行完畢,那萬事大吉,如果沒有運行完成還是跟著主執行緒殉情
3、如果想讓主執行緒等待子執行緒執行完成再往下面執行的話,用thread1.join()。將主執行緒阻塞住,也就是說.join()之後的主執行緒程式碼不會執行,直到子執行緒執行完畢再執行這個時候的using time 就是4秒鐘。如果只有部分子執行緒阻塞,那麼阻塞時間就是設置了.join()的子執行緒運行的時間。然後主執行緒結束,但是還是會等到所有子執行緒運行結束後主執行緒才會退出
上邊是使用函數的方法實現多執行緒,那用類呢?可定也能實現咯:
import time
import threading
# 用類實現多執行緒 class GetDetailHtml(threading.Thread): """ 繼承threading.Thread類 """ def __init__(self, name): # 給調用父類init方法執行緒命名 super().__init__(name=name) def run(self): """ 重寫run方法 :return: """ print("starting get detail html") time.sleep(4) print("ending get detail html") class GetDetailUrl(threading.Thread): def __init__(self, name): # 給調用父類init方法執行緒命名 super().__init__(name=name) def run(self): print("starting get detail url") time.sleep(2) print("ending get detail url") if __name__ == '__main__': thread1 = GetDetailHtml('get_detail_html') thread2 = GetDetailUrl('get_detail_url') start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print('used time: {}'.format(time.time() - start_time))
只需要重寫類的run方法即可,其他和函數方法使用一樣。
如果多個執行緒需要用到相同的數據咋辦呢?這就涉及到執行緒間的通訊問題了,接下來我們來詳細的了解一下。
方法一:利用共享變數的方式,嗯,也就是全局變數global。
import time import threading # 1、共享全局變數 使用global。 detail_url_list = [] def get_detail_html(): global detail_url_list # 使用for循環,這樣子做得話並發不高 # for url in detail_url_list: # print("starting get detail html") # time.sleep(4) # print("ending get detail html") # 使用pop操作,並開啟多個get_detail_html的子執行緒進行處理。但是執行緒是不安全的 url = detail_url_list.pop() print("starting get detail html") time.sleep(4) print("ending get detail html") def get_detail_url(): global detail_url_list print("starting get detail url") time.sleep(2) for i in range(20): detail_url_list.append(i) print("ending get detail url") if __name__ == '__main__': thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start()thread1.join() thread2.join() print("using time: {}".format(time.time() - start_time))
這個很容易理解,用起來也很方便。可以幫我們快速的解決比較簡單的問題。
方法二:利用隊列queue。。比價高級點的用法。先看實現方式。
import time import threading from queue import Queue, PriorityQueue #可以設置優先順序的queue,調整執行順序 def get_detail_html(queue): while True: url = queue.get() print("starting get detail html") time.sleep(4) print("ending get detail html") def get_detail_url(queue): while True: print("starting get detail url") time.sleep(2) for i in range(20): queue.put(i) print("ending get detail url") if __name__ == '__main__': detail_url_queue = Queue(maxsize=10) thread1 = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) thread2 = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) start_time = time.time() thread1.start() thread2.start()thread1.join() thread2.join() print("using time: {}".format(time.time() - start_time))
簡單的說就是:put往queue里放東西,get從queue里拿東西。這樣就能達到執行緒間的通訊以及執行緒安全。
既然執行緒中的數據大家都能用,會不會出現一個執行緒在修改數據,結果在還沒有修改完之後,GIL鎖已經被切換。切換後的執行緒也修改這個數據,那就會出現數據的錯亂,臟數據。針對這種情況,python中的lock就完美的解決了這個情況。
from threading import Lock, RLock #可重入鎖 #RLock使得一個執行緒中,可以連續調用多次acquire,但是的有相對應數量的release import threading total = 0 lock = Lock() """ 1、用鎖會影響性能 2、鎖會引起死鎖 1、acquire之後沒有release 2、執行緒相互等待 兩個數據被兩個執行緒分別獲取,相互等待對方的資源釋放 """ def add(lock): global total for i in range(1000000):
# 上鎖 lock.acquire() total += 1
# 釋放鎖 lock.release() def desc(lock): global total for i in range(1000000):
# 上鎖 lock.acquire() total -= 1
# 釋放鎖 lock.release()
thread1 = threading.Thread(target=add, args=(lock,)) thread2 = threading.Thread(target=desc, args=(lock,)) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
針對一般情況下我們lock就可以搞定問題,那有沒有更牛逼的工具讓我們應對更加複雜的情況呢?答案當然是肯定的:
conditon:條件變數,用於複雜的執行緒間同步的鎖
import threading # condition(條件變數)是用於複雜的執行緒間同步的鎖 from threading import Condition cond = Condition() class XiaoAi(threading.Thread): def __init__(self, cond): super(XiaoAi, self).__init__(name="小愛") self.cond = cond def run(self): # with是一個魔法方法,相當於先上鎖操作完之後再解鎖,和with open相似 with self.cond: # 等待狀態,等著接受通知,接到通知後往下放執行 self.cond.wait() print("{} : 在".format(self.name)) # 發送通知 self.cond.notify() # 等待狀態,等著接受通知,接到通知後往下放執行 self.cond.wait() print("{} : 好啊".format(self.name)) class TianMao(threading.Thread): def __init__(self, cond): super(TianMao, self).__init__(name="天貓") self.cond = cond def run(self): with self.cond: print("{} : 小愛在么?".format(self.name)) # 發送通知 self.cond.notify() # 等待狀態,等著接受通知,接到通知後往下放執行 self.cond.wait() print("{} : 我們來對古詩吧!".format(self.name)) # 發送通知 self.cond.notify() if __name__ == '__main__': xiaoai = XiaoAi(cond) tianmao = TianMao(cond) xiaoai.start() tianmao.start() xiaoai.join() tianmao.join()
在condition中我們可以運行任意多的執行緒。那麼如果我們想控制執行緒的數量該怎麼辦呢?針對這個場景python也是給我們一個工具的:
semaphore 是控制進入數量的鎖, 基於condition實現
# semaphore 是控制進入數量的鎖, 基於condition實現 # 文件的讀寫,寫一般只有一個執行緒寫,讀可以允許多個 # 爬蟲,控制爬蟲的數量 import threading import time class HtmlSpider(threading.Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print('get html text success') # 釋放鎖, 一共釋放3次 self.sem.release() class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): # 調用一次,次數減一,為0時,執行緒鎖住 self.sem.acquire() html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) html_thread.start() if __name__ == '__main__': sem = threading.Semaphore(3) url_producer = UrlProducer(sem) url_producer.start()
是不是常聽說執行緒池?那這個池子到底是個啥呢?當然是放執行緒的池子唄!!在python中是用
ThreadPoolExecutor來實現執行緒池的!!
第一種用法:
from concurrent.futures import ( wait, # 使主執行緒阻塞 ThreadPoolExecutor, # 執行緒池 as_completed, #是一個生成器 Future # 未來對象, task的返回容器 ) """ 執行緒池 主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值 當一個執行緒完成的時候主執行緒能夠立刻知道 futures可以讓多執行緒和多進程介面一致 """ import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通過submit函數提交的函數到執行緒池中,submit是立刻返回的 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done 用於判定某個任務是否完成 print(task1.done()) # cancle取消該執行緒,取消成功返回True,失敗返回False 注意:執行緒在執行過程中不能被取消 # 此時開啟兩個執行緒,所以task1和task2添加之後立刻運行,因此取消不成功 print(task2.cancel()) time.sleep(3) print(task1.done()) # 可以獲取task1的執行結果 print(task1.result()) """ # 通過executor 的map(yield task的返回值)獲取已經完成的task值 for data in executor.map(get_html, urls): print('future get {} page'.format(data)) """
第二種用法:
from concurrent.futures import ( wait, # 使主執行緒阻塞 ThreadPoolExecutor, # 執行緒池 as_completed, #是一個生成器 Future # 未來對象, task的返回容器 ) import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 獲取已經成功的task urls = [3, 3, 2, 4, 8] # 添加到excutor之後子執行緒立刻開始執行 all_tasks = [executor.submit(get_html, (url)) for url in urls] # 使主執行緒阻塞,所有子執行緒執行完成後主執行緒再往下邊執行 wait(all_tasks) print(12314123) # as_completed 的程式碼塊和子執行緒的執行並不是同步的 for future in as_completed(all_tasks): data = future.result() print('get {} page 111111'.format(data))
第三種用法:
from concurrent.futures import ( wait, # 使主執行緒阻塞 ThreadPoolExecutor, # 執行緒池 as_completed, #是一個生成器 Future # 未來對象, task的返回容器 ) import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通過executor 的map(yield task的返回值)獲取已經完成的task值 for data in executor.map(get_html, urls): print('future get {} page'.format(data))
好了多執行緒就講這麼多吧,有點粗糙,更多的細節以後再補充吧! 下一篇記錄一下多進程!!