2.並發編程多編程
- 2019 年 10 月 3 日
- 筆記
一 multiprocessing模組介紹
python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模組用來開啟子進程,並在子進程中執行我們訂製的任務(比如函數),該模組與多執行緒模組threading的編程介面類似。
multiprocessing模組的功能眾多:支援子進程、通訊和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與執行緒不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
二 Process類的介紹
創建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1 group參數未使用,值始終為None 2 3 target表示調用對象,即子進程要執行的任務 4 5 args表示調用對象的位置參數元組,args=(1,2,'egon',) 6 7 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 8 9 name為子進程的名稱
方法介紹:
1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 4 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了殭屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麼也將不會被釋放,進而導致死鎖 5 p.is_alive():如果p仍然運行,返回True 6 7 p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹:
1 p.daemon:默認值為False,如果設為True,代表p為後台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置 2 3 p.name:進程的名稱 4 5 p.pid:進程的pid 6 7 p.exitcode:進程在運行時為None、如果為–N,表示被訊號N結束(了解即可) 8 9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字元的字元串。這個鍵的用途是為涉及網路連接的底層進程間通訊提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
三 Process類的使用
注意:在windows中Process()必須放到# if name == ‘main‘:下
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 由於Windows沒有fork,多處理模組啟動一個新的Python進程並導入調用模組。 如果在導入時調用Process(),那麼這將啟動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。 詳細解釋
詳細解釋
3.1創建開啟子進程的兩種方式
from multiprocessing import Process import time def task(name): print('%s is runing' %(name)) time.sleep(3) print('%s is done' % (name)) if __name__ == '__main__': p = Process(target=task,args=('太白金星',)) # p = Process(target=task,kwargs={'name':'太白金星'}) 兩種傳參方式 p.start() print('====主')
方式一
from multiprocessing import Process import time # 方式二: class MyProcess(Process): def __init__(self,name): self.name = name super().__init__() def run(self): # 必須定義一個run方法 print('%s is runing' % (self.name)) time.sleep(3) print('%s is done' % (self.name)) if __name__ == '__main__': p = MyProcess('太白金星') p.start() print('===主')
方式二
3.2驗證進程之間的空間隔離
接下來我們驗證一下進程之間的互相隔離。 在一個進程中 x = 1000 def task(): global x x = 2 task() print(x) 在不同的進程中: from multiprocessing import Process import time x = 1000 def task(): global x x = 2 if __name__ == '__main__': p = Process(target=task) p.start() time.sleep(3) print(x)
程式碼驗證
3.3 進程對象的join方法
from multiprocessing import Process import time 父進程等待子進程結束之後在執行 方法一 加sleep 不可取! def task(n): time.sleep(3) print('子進程結束....') if __name__ == '__main__': p = Process(target=task,args=('太白金星',)) p.start() time.sleep(5) print('主進程開始運行....') 這樣雖然達到了目的, 1,但是你在程式中故意加sleep極大影響程式的效率。 2,sleep(3)只是虛擬子進程運行的時間,子進程運行完畢的時間是不固定的。 方法二: join from multiprocessing import Process import time def task(n): time.sleep(3) print('子進程結束....') if __name__ == '__main__': p = Process(target=task,args=('太白金星',)) p.start() p.join() # 等待p這個子進程運行結束之後,在執行下面的程式碼(主進程). print('主進程開始運行....') 接下來我要開啟十個子進程,先看看效果 from multiprocessing import Process import time def task(n): print('%s is running' %n) if __name__ == '__main__': for i in range(1, 11): p = Process(target=task,args=(i,)) p.start() ''' 我這裡是不是運行十個子進程之後,才會運行主進程?當然不會!!! 1,p.start()只是向作業系統發送一個請求而已,剩下的作業系統在記憶體開啟進程空間,運行進程程式不一定是馬上執行。 2,開啟進程的開銷是比較大的。 ''' print('主進程開始運行....') 那麼有人說,老師我對這個不理解,我給你拆解開來。 from multiprocessing import Process import time def task(n): print('%s is running' %n) if __name__ == '__main__': p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) p4 = Process(target=task,args=(4,)) p5 = Process(target=task,args=(5,)) p1.start() p2.start() p3.start() p4.start() p5.start() print('主進程開始運行....') 接下來 實現起多子個進程,然後等待這些子進程都結束之後,在開啟主進程。 from multiprocessing import Process import time def task(n): time.sleep(3) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # 幾乎同一個時刻發送三個請求 p1.start() p2.start() p3.start() # 對著三個自己成使用三個join p1.join() p2.join() p3.join() print(time.time() - start_time,'主進程開始運行....') # 3s 多一點點這是來回切換的所用時間。 那麼在進行舉例: from multiprocessing import Process import time def task(n): time.sleep(n) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # 幾乎同一個時刻發送三個請求 p1.start() p2.start() p3.start() # 對著三個自己成使用三個join p1.join() # 1s p2.join() # 2s p3.join() # 3s print(time.time() - start_time,'主進程開始運行....') # 3s 多一點點這是來回切換的所用時間。 利用for循環精簡上面的示例: from multiprocessing import Process import time def task(n): time.sleep(1) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() # for i in range(1,4): # p = Process(target=task,args=(i,)) # p.start() # p.join() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # 幾乎同一個時刻發送三個請求 p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() # 上面的程式碼,p1.join()他的作用:你的主進程程式碼必須等我的p1子進程執行完畢之後,在執行 # p2.start()這個命令是主進程的程式碼。 # 而 如果你這樣寫: ''' p1.join() p2.join() p3.join() ''' print(time.time() - start_time,'主進程開始運行....') 所以你上面的程式碼應該怎麼寫? from multiprocessing import Process import time def task(n): time.sleep(3) print('%s is running' %n) if __name__ == '__main__': p_l = [] start_time = time.time() for i in range(1,4): p = Process(target=task,args=(i,)) p.start() p_l.append(p) # 對著三個自己成使用三個join for i in p_l: i.join() print(time.time() - start_time,'主進程開始運行....')
程式碼實例
3.4 進程對象的其他屬性(了解)
from multiprocessing import Process import time import os def task(n): time.sleep(3) print('%s is running' %n,os.getpid(),os.getppid()) if __name__ == '__main__': p1 = Process(target=task,args=(1,),name = '任務1') # print(p1.name) # 給子進程起名字 # for i in range(3): # p = Process(target=task, args=(1,)) # print(p.name) # 給子進程起名字 p1.start() # p1.terminate() # time.sleep(2) # 睡一會,他就將我的子進程殺死了。 # print(p1.is_alive()) # False print(p1.pid) # print('主') print(os.getpid())
程式碼示例
3.5 殭屍進程與孤兒進程
參考部落格:http://www.cnblogs.com/Anker/p/3271773.html 一:殭屍進程(有害) 殭屍進程:一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態資訊,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。詳解如下 我們知道在unix/linux中,正常情況下子進程是通過父進程創建的,子進程在創建新的進程。子進程的結束和父進程的運行是一個非同步過程,即父進程永遠無法預測子進程到底什麼時候結束,如果子進程一結束就立刻回收其全部資源,那麼在父進程內將無法獲取子進程的狀態資訊。 因此,UNⅨ提供了一種機制可以保證父進程可以在任意時刻獲取子進程結束時的狀態資訊: 1、在每個進程退出的時候,內核釋放該進程所有的資源,包括打開的文件,佔用的記憶體等。但是仍然為其保留一定的資訊(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等) 2、直到父進程通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果進程不調用wait / waitpid的話,那麼保留的那段資訊就不會釋放,其進程號就會一直被佔用,但是系統所能使用的進程號是有限的,如果大量的產生僵死進程,將因為沒有可用的進程號而導致系統不能產生新的進程. 此即為殭屍進程的危害,應當避免。 任何一個子進程(init除外)在exit()之後,並非馬上就消失掉,而是留下一個稱為殭屍進程(Zombie)的數據結構,等待父進程處理。這是每個子進程在結束時都要經過的階段。如果子進程在exit()之後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。如果父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不經過殭屍狀態。 如果父進程在子進程結束之前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。 二:孤兒進程(無害) 孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,並由init進程對它們完成狀態收集工作。 孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工作。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置為init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程凄涼地結束了其生命周期的時候,init進程就會代表黨和政府出面處理它的一切善後工作。因此孤兒進程並不會有什麼危害。 我們來測試一下(創建完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成為孤兒進程,而非殭屍進程),文件內容 import os import sys import time pid = os.getpid() ppid = os.getppid() print 'im father', 'pid', pid, 'ppid', ppid pid = os.fork() #執行pid=os.fork()則會生成一個子進程 #返回值pid有兩種值: # 如果返回的pid值為0,表示在子進程當中 # 如果返回的pid值>0,表示在父進程當中 if pid > 0: print 'father died..' sys.exit(0) # 保證主執行緒退出完畢 time.sleep(1) print 'im child', os.getpid(), os.getppid() 執行文件,輸出結果: im father pid 32515 ppid 32015 father died.. im child 32516 1 看,子進程已經被pid為1的init進程接收了,所以殭屍進程在這種情況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明周期結束自然會被init來銷毀。 三:殭屍進程危害場景: 例如有個進程,它定期的產 生一個子進程,這個子進程需要做的事情很少,做完它該做的事情之後就退出了,因此這個子進程的生命周期很短,但是,父進程只管生成新的子進程,至於子進程 退出之後的事情,則一概不聞不問,這樣,系統運行上一段時間之後,系統中就會存在很多的僵死進程,倘若用ps命令查看的話,就會看到很多狀態為Z的進程。 嚴格地來說,僵死進程並不是問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。因此,當我們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL訊號啦)。槍斃了元兇進程之後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。 四:測試 #1、產生殭屍進程的程式test.py內容如下 #coding:utf-8 from multiprocessing import Process import time,os def run(): print('子',os.getpid()) if __name__ == '__main__': p=Process(target=run) p.start() print('主',os.getpid()) time.sleep(1000) #2、在unix或linux系統上執行 [root@vm172-31-0-19 ~]# python3 test.py & [1] 18652 [root@vm172-31-0-19 ~]# 主 18652 子 18653 [root@vm172-31-0-19 ~]# ps aux |grep Z USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出現殭屍進程 root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z [root@vm172-31-0-19 ~]# top #執行top命令發現1zombie top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12 Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie %Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin #3、 等待父進程正常結束後會調用wait/waitpid去回收殭屍進程 但如果父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的 解決方法一:殺死父進程 解決方法二:對開啟的子進程應該記得使用join,join會回收殭屍進程 參考python2源碼注釋 class Process(object): def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self) join方法中調用了wait,告訴系統釋放殭屍進程。discard為從自己的children中剔除 解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
詳細了解
四 守護進程
主進程創建守護進程
其一:守護進程會在主進程程式碼執行結束後就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程程式碼運行結束,守護進程隨即終止
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程程式碼執行結束,p即終止運行 p.start() print('主')
程式碼示例
#主進程程式碼運行完畢,守護進程就會結束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #列印該行則主進程程式碼結束,則守護進程p1應該被終止,可能會有p1任務執行的列印資訊123,因為主進程列印main----時,p1也執行了,但是隨即被終止
經典例題
五 進程同步(鎖)
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個列印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
#並發運行,效率高,但競爭同一列印終端,帶來了列印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start() 並發運行,效率高,但競爭同一列印終端,帶來了列印錯亂
不加鎖,效率高但是順序容易錯亂
#由並發變成了串列,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start() 加鎖:由並發變成了串列,犧牲了運行效率,但避免了競爭
加鎖處理,犧牲了效率,但是保證了順序
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串列了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性。
#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('