Python第十二章-多進程和多線程01-多進程
- 2020 年 4 月 5 日
- 筆記
多進程和多線程
一、進程
1.1 進程的引入
現實生活中,有很多的場景中的事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;試想,如果把唱歌和跳舞這2件事情分開依次完成的話,估計就沒有那麼好的效果了(想一下場景:先唱歌,然後在跳舞,O(∩_∩)O哈哈~)
程序中
如下程序,來模擬「唱歌跳舞」這件事情
# 模擬唱歌,跳舞 from time import sleep def sing(): for i in range(3): print("正在唱歌...%d"%i) sleep(1) def dance(): for i in range(3): print("正在跳舞...%d"%i) sleep(1) if __name__ == '__main__': sing() # 唱歌 dance() # 跳舞
運行結果
注意
- 很顯然剛剛的程序並沒有完成唱歌和跳舞同時進行的要求
- 如果想要實現「唱歌跳舞」同時進行,那麼就需要一個新的方法,叫做:多任務
1.2 多任務的概念
什麼叫「多任務」呢?簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務,至少同時有3個任務正在運行。還有很多任務悄悄地在後台同時運行着,只是桌面上沒有顯示而已。
現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?
答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。
其實就是CPU執行速度太快啦。。
1.2.1 進程
每個獨立執行的程序稱為進程
進程是程序的一次動態執行過程,它經歷了從代碼加載、執行到執行完畢的一個完整過程,這個過程也是進程本身從產生、發展到最終消亡的過程。
多進程(多任務)操作系統能同時運行多個進程(程序),由於CPU具備分時機制,所以每個進程都能循環獲得自己的CPU時間片。由於CPU執行速度非常快,使得所有程序好象是在「同時」運行一樣。
在操作系統中進程是進行系統資源分配、調度和管理的最小單位,進程在執行過程中擁有獨立的內存單元。
比如:Windows採用進程作為最小隔離單位,每個進程都有自己的數據段、代碼段,並且與別的進程沒有任何關係。因此進程間進行信息交互比較麻煩。
進程也可以通過派生 (fork 或 spawn)新的進程來執行其他任務,不過因為每個新進程也都擁有自己的內存和數據棧等,所以只能採用進程間通信(IPC)的方式共享信息。
1.2.2 線程
為了解決進程調度資源的浪費,為了能夠共享資源,出現了線程。有時候把線程稱之為輕量級進程.
線程是CPU調度和分派的基本單位,它可與同屬一個進程的其他的線程共享進程所擁有的全部資源,多個線程共享內存,從而極大地提高了程序的運行效率。
線程是比進程更小的執行單位,線程是進程內部單一的一個順序控制流。
所謂多線程是指一個進程在執行過程中可以產生多個線程,這些線程可以同時存在、同時運行,形成多條執行線索。一個進程可能包含了多個同時執行的線程。
一個或更多的線程構成了一個進程(操作系統是以進程為單位的,而進程是以線程為單位的,進程中必須有一個主線程main)
如果一個進程沒有了,那麼這個進程內的所有線程肯定會消失,如果線程消失了,但是進程未必會消失。只有所有的線程都結束了,進程才會結束!!!而且所有線程都是在進程的基礎之上同時運行。
1.3 Python 和並發編程
在大多數系統上, Python支持多進程(基於消息傳遞)編程和多線程編程.
大多數人比較熟悉的是多線程編程, 但是在python中的多線程編程卻是有諸多的限制.
python中多線程的限制
為了線程安全考慮, python的解釋器還是使用了內部的GIL(Global Interperter Lock, 全局解釋器鎖定), 在任意時刻只運行單個python的線程執行.即使有多個可用的cpu核心, 也是如此.這就限制了python只能在一個cpu核心上運行.
GIL的存在直接影響了程序的並發編程問題.
如果一個應用程序是大部分與I/O相關, 那麼使用線程一般沒有問題, 因為大部分時間是在I/O等待.
如果一個應用程序是CPU密集型的, 則使用多線程的壞處大於好處, 返回會降低程序的運行速度, 一般比你想像的還要慢的多.
因此, 用戶在有些情況需要使用多進程(子進程和消息傳遞)
子進程和消息傳遞
展望未來, 如果要再python中進行各種類型的並發編程, 消息傳遞應該是最應該掌握的概念.
1.4 multiprocessing包
multiprocessing是一個package, 這個包支持使用類似threading模塊的類似API去創建新的進程.
multiprocessing支持本地和遠程並發編程, 通過使用子進程來代替線程高效的規避了GIL問題.
所以, multiprocessing允許程序員重複利用給定計算機的多核cpu.
由於python的跨平台, 所以multiprocessing支持多個平台:unix, window, linux.
1.4.1 Process類
Process語法結構如下:
Process([group [, target [, name [, args [, kwargs]]]]])
- target:表示這個進程實例所調用對象;
- args:表示調用對象的位置參數元組;
- kwargs:表示調用對象的關鍵字參數字典;
- name:為當前進程實例的別名;
- group:大多數情況下用不到;
最簡單的使用代碼:
# 從multiprocessing中導入Process from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('子進程運行中,hello,= %s ,pid=%d...' % (name, os.getpid())) if __name__ == '__main__': # 判斷是否為主程序 print('父進程 %d.' % os.getpid()) """ 創建Process對象, 表示一個子進程. 1. target參數表示子進程要做的任務(一個可執行對象) 2. args是一個元組, 表示傳遞給target的可執行對象的位置參數. 本例中就是把"王二狗"傳遞給函數f的name參數 """ p = Process(target=run_proc, args=('王二狗',)) print('子進程將要執行。。') p.start() # 啟動子進程 p.join() # 等待進程終止 print("子進程已經終止")
說明
- 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動。
- join()方法可以等待子進程結束後再繼續往下運行,通常用於進程間的同步。
1.4.1.1 Process類的實例具有以下方法
Process實例p具有以下方法:
-
p.start()
啟動子進程. 這將運行代表進程的子進程, 並調用該子進程中的p.run()方法.
-
p.join([timeout])
等待進程p終止, timeout是可選的超時時間. 這個方法通常用戶進程間的同步.
-
p.is_alive()
測試進程p是否還在運行, 如果扔在運行, 則返回True
-
run():
如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法;
-
p.terminate()
強制終止p進程. 如果調用此方法, 進程p將立即被終止, 同時不會進行任何清理工作. 如果再進程p中也開啟了子進程, 則這些子進程將成為僵死進程.s如果p保存了一個鎖定或有進程間通信, 那麼終止可能會導致死鎖或I/O崩潰.
1.4.1.2 Process實例具有以下實例屬性:
Process實例p具有以下實例屬性:
-
p.daemon
一個布爾標誌, 指示這個進程是否為後台進程. 當創建他的python進程終止時, 後台進程將自動終止.
另外禁止後台進程創建自己的新進程. p.daemon的值必須再進程啟動前設置.
-
p.exitcode
進程的整數退出碼. 如果進程仍在運行, 則它的值是None. 如果是負數, -N表示由信號N所終止
-
p.name
當前進程實例別名,默認為Process-N,N為從1開始遞增的整數;
-
p.pid
進程的整數ID
1.4.1.3 實例
實例1:
from multiprocessing import Process import os from time import sleep # 子進程要執行的代碼 def run_proc(name, age, **kwargs): for i in range(10): print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid())) print(kwargs) sleep(0.5) if __name__=='__main__': print('父進程 %d.' % os.getpid()) p = Process(target=run_proc, args=('test',18), kwargs={"m":20}) print('子進程將要執行') p.start() sleep(1) p.terminate() p.join() print('子進程已結束')
運行結果:
實例2:
from multiprocessing import Process import time import os # 兩個子進程將會調用的兩個方法 def worker_1(interval): print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(), os.getpid())) t_start = time.time() time.sleep(interval) # 程序將會被掛起interval秒 t_end = time.time() print("worker_1,執行時間為'%0.2f'秒" % (t_end - t_start)) def worker_2(interval): print("worker_2,父進程(%s),當前進程(%s)" % (os.getppid(), os.getpid())) t_start = time.time() time.sleep(interval) t_end = time.time() print("worker_2,執行時間為'%0.2f'秒" % (t_end - t_start)) if __name__ == '__main__': # 判斷是否為主程序 # 輸出當前程序的ID print("進程ID:%s" % os.getpid()) """ 創建兩個進程對象,target指向這個進程對象要執行的對象名稱, args後面的元組中,是要傳遞給worker_1方法的參數, 因為worker_1方法就一個interval參數,這裡傳遞一個整數2給它, 如果不指定name參數,默認的進程對象名稱為Process-N,N為一個遞增的整數 """ p1=Process(target=worker_1, args=(2,)) p2=Process(target=worker_2, name="王二狗", args=(1,)) # 使用"進程對象名稱.start()"來創建並執行一個子進程, # 這兩個進程對象在start後,就會分別去執行worker_1和worker_2方法中的內容 p1.start() p2.start() # 同時父進程仍然往下執行,如果p2進程還在執行,將會返回True print("p2.is_alive=%s " % p2.is_alive()) # 輸出p1和p2進程的別名和pid print("p1.name=%s" % p1.name) print("p1.pid=%s" % p1.pid) print("p2.name=%s" % p2.name) print("p2.pid=%s" % p2.pid) """ join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成後,再繼續執行下面的語句,一般用於進程間的數據同步 如果不寫這一句,下面的is_alive判斷將會是True, 改成p1.join(1), 因為p2需要2秒以上才可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成,所以下面的print會輸出True,即p1仍然在執行 """ p1.join() print("p1.is_alive=%s" % p1.is_alive())
運行結果:
1.4.1.4 進程的創建-Process子類
創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象
示例代碼:
from multiprocessing import Process import time import os # 繼承Process類 class ProcessClass(Process): """ 因為Process類本身也有__init__方法,這個子類相當於重寫了這個方法, 但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性, 最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作 """ def __init__(self,interval): Process.__init__(self) self.interval = interval # 重寫了Process類的run()方法 def run(self): print("子進程(%s) 開始執行,父進程為(%s)" % (os.getpid(), os.getppid())) t_start = time.time() time.sleep(self.interval) t_stop = time.time() print("(%s)執行結束,耗時%0.2f秒"%(os.getpid(), t_stop-t_start)) if __name__ == "__main__": t_start = time.time() print("當前程序進程(%s)"%os.getpid()) p1 = ProcessClass(2) # 對一個不包含target屬性的Process類執行start()方法,就會運行這個類中的run()方法,所以這裡會執行p1.run() p1.start() p1.join() t_stop = time.time() print("(%s)執行結束,耗時%0.2f"%(os.getpid(),t_stop-t_start))
運行結果:
1.4.2 進程池:Pool
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。使用類Pool可以創建進程池, 然後把各種數據處理任務都提交給進程池.
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來執行
Pool([numprocess, initializer, initargs])
說明:
numprocess 是指要創建的線程數. 默認是cpu的核心數.(os.cpu_count()的返回值)
initializer 是每個進程啟動時要執行的可調用對象, 默認是None
initargs是傳遞給initializer的元組參數.
1.4.2.1 multiprocessing.Pool常用函數解析:
-
apply(func[, args[, kwds]]):使用阻塞方式調用func
在進程池的一個工作進程中執行func函數, args是傳給func的元組參數. 注意使用這個方法讓多個進程去執行, 他們是同步執行的. 即:多個進程是順序執行的.
func的返回值就是p.apply的返回值.
-
apply_async(func[, args, kwargs, callback]) :使用非阻塞方式調用func
(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給異步的執行func
callback 是可調用對象, 當func執行結束, 則立即調用callback並把func的返回值傳遞給callback.
-
func的關鍵字參數列表;
-
close():關閉Pool,使其不再接受新的任務;
-
terminate():不管任務是否完成,立即終止;
-
join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;
AsyncResult對象(apply_async()的返回值)
apply_async()的返回值是AsyncResult實例. 具有如下方法:
- a.get([timeout])
等待返回結果, 結果就是任務函數的返回值.
- a.ready()
如果任務函數執行結束返回True
- a.successful()
如果任務函數執行結束, 且在執行的過程中沒有發生異常則
- a.wait([timeout])
等待任務結束, 這個方法與get()的區別就是它沒有返回值.