2.並發編程多編程

  • 2019 年 10 月 3 日
  • 筆記

一 multiprocessing模組介紹

​ python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
​ multiprocessing模組用來開啟子進程,並在子進程中執行我們訂製的任務(比如函數),該模組與多執行緒模組threading的編程介面類似。

  multiprocessing模組的功能眾多:支援子進程、通訊和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

​ 需要再次強調的一點是:與執行緒不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

二 Process類的介紹

創建進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)    強調:  1. 需要使用關鍵字的方式來指定參數  2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:

1 group參數未使用,值始終為None  2  3 target表示調用對象,即子進程要執行的任務  4  5 args表示調用對象的位置參數元組,args=(1,2,'egon',)  6  7 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}  8  9 name為子進程的名稱

  方法介紹:

 1 p.start():啟動進程,並調用該子進程中的p.run()   2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法   3   4 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了殭屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麼也將不會被釋放,進而導致死鎖   5 p.is_alive():如果p仍然運行,返回True   6   7 p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  

屬性介紹:

1 p.daemon:默認值為False,如果設為True,代表p為後台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置  2  3 p.name:進程的名稱  4  5 p.pid:進程的pid  6  7 p.exitcode:進程在運行時為None、如果為–N,表示被訊號N結束(了解即可)  8  9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字元的字元串。這個鍵的用途是為涉及網路連接的底層進程間通訊提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

三 Process類的使用

注意:在windows中Process()必須放到# if name == ‘main‘:下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module.  If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources).  This is the reason for hiding calls to Process() inside    if __name__ == "__main__"  since statements inside this if-statement will not get called upon import.  由於Windows沒有fork,多處理模組啟動一個新的Python進程並導入調用模組。  如果在導入時調用Process(),那麼這將啟動無限繼承的新進程(或直到機器耗盡資源)。  這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。    詳細解釋

詳細解釋

3.1創建開啟子進程的兩種方式

from multiprocessing import Process  import time  def task(name):      print('%s is runing' %(name))      time.sleep(3)      print('%s is done' % (name))      if __name__ == '__main__':      p = Process(target=task,args=('太白金星',))      # p = Process(target=task,kwargs={'name':'太白金星'})  兩種傳參方式      p.start()      print('====主')

方式一

from multiprocessing import Process  import time  # 方式二:    class MyProcess(Process):      def __init__(self,name):            self.name = name          super().__init__()        def run(self):  # 必須定義一個run方法          print('%s is runing' % (self.name))          time.sleep(3)          print('%s is done' % (self.name))      if __name__ == '__main__':      p = MyProcess('太白金星')      p.start()      print('===主')      

方式二

3.2驗證進程之間的空間隔離

 接下來我們驗證一下進程之間的互相隔離。     在一個進程中  x = 1000     def task():       global x       x = 2     task()   print(x)   在不同的進程中:   from multiprocessing import Process   import time   x = 1000     def task():       global x       x = 2     if __name__ == '__main__':       p = Process(target=task)       p.start()       time.sleep(3)       print(x)

程式碼驗證

3.3 進程對象的join方法

from multiprocessing import Process  import time     父進程等待子進程結束之後在執行   方法一 加sleep 不可取!     def task(n):       time.sleep(3)       print('子進程結束....')     if __name__ == '__main__':       p = Process(target=task,args=('太白金星',))       p.start()       time.sleep(5)       print('主進程開始運行....')     這樣雖然達到了目的,   1,但是你在程式中故意加sleep極大影響程式的效率。   2,sleep(3)只是虛擬子進程運行的時間,子進程運行完畢的時間是不固定的。       方法二: join     from multiprocessing import Process   import time       def task(n):       time.sleep(3)       print('子進程結束....')       if __name__ == '__main__':       p = Process(target=task,args=('太白金星',))       p.start()       p.join()  # 等待p這個子進程運行結束之後,在執行下面的程式碼(主進程).       print('主進程開始運行....')     接下來我要開啟十個子進程,先看看效果     from multiprocessing import Process   import time     def task(n):       print('%s is running' %n)     if __name__ == '__main__':       for i in range(1, 11):           p = Process(target=task,args=(i,))           p.start()           '''           我這裡是不是運行十個子進程之後,才會運行主進程?當然不會!!!           1,p.start()只是向作業系統發送一個請求而已,剩下的作業系統在記憶體開啟進程空間,運行進程程式不一定是馬上執行。           2,開啟進程的開銷是比較大的。           '''       print('主進程開始運行....')     那麼有人說,老師我對這個不理解,我給你拆解開來。   from multiprocessing import Process   import time     def task(n):       print('%s is running' %n)     if __name__ == '__main__':       p1 = Process(target=task,args=(1,))       p2 = Process(target=task,args=(2,))       p3 = Process(target=task,args=(3,))       p4 = Process(target=task,args=(4,))       p5 = Process(target=task,args=(5,))         p1.start()       p2.start()       p3.start()       p4.start()       p5.start()         print('主進程開始運行....')     接下來 實現起多子個進程,然後等待這些子進程都結束之後,在開啟主進程。     from multiprocessing import Process   import time     def task(n):       time.sleep(3)       print('%s is running' %n)     if __name__ == '__main__':       start_time = time.time()       p1 = Process(target=task,args=(1,))       p2 = Process(target=task,args=(2,))       p3 = Process(target=task,args=(3,))       # 幾乎同一個時刻發送三個請求       p1.start()       p2.start()       p3.start()       # 對著三個自己成使用三個join         p1.join()       p2.join()       p3.join()         print(time.time() - start_time,'主進程開始運行....')       # 3s 多一點點這是來回切換的所用時間。     那麼在進行舉例:     from multiprocessing import Process   import time     def task(n):       time.sleep(n)       print('%s is running' %n)     if __name__ == '__main__':       start_time = time.time()       p1 = Process(target=task,args=(1,))       p2 = Process(target=task,args=(2,))       p3 = Process(target=task,args=(3,))       # 幾乎同一個時刻發送三個請求       p1.start()       p2.start()       p3.start()       # 對著三個自己成使用三個join         p1.join()  # 1s       p2.join()  # 2s       p3.join()  # 3s         print(time.time() - start_time,'主進程開始運行....')      # 3s 多一點點這是來回切換的所用時間。     利用for循環精簡上面的示例:       from multiprocessing import Process   import time     def task(n):       time.sleep(1)       print('%s is running' %n)     if __name__ == '__main__':       start_time = time.time()       # for i in range(1,4):       #     p = Process(target=task,args=(i,))       #     p.start()       #     p.join()         p1 = Process(target=task,args=(1,))       p2 = Process(target=task,args=(2,))       p3 = Process(target=task,args=(3,))       # 幾乎同一個時刻發送三個請求       p1.start()       p1.join()       p2.start()       p2.join()       p3.start()       p3.join()       # 上面的程式碼,p1.join()他的作用:你的主進程程式碼必須等我的p1子進程執行完畢之後,在執行       # p2.start()這個命令是主進程的程式碼。       # 而 如果你這樣寫:       '''       p1.join()       p2.join()       p3.join()       '''         print(time.time() - start_time,'主進程開始運行....')     所以你上面的程式碼應該怎麼寫?       from multiprocessing import Process   import time     def task(n):       time.sleep(3)       print('%s is running' %n)     if __name__ == '__main__':       p_l = []       start_time = time.time()       for i in range(1,4):           p = Process(target=task,args=(i,))           p.start()           p_l.append(p)       # 對著三個自己成使用三個join       for i in p_l:           i.join()       print(time.time() - start_time,'主進程開始運行....')

程式碼實例

3.4 進程對象的其他屬性(了解)

 from multiprocessing import Process   import time   import os     def task(n):       time.sleep(3)       print('%s is running' %n,os.getpid(),os.getppid())     if __name__ == '__main__':       p1 = Process(target=task,args=(1,),name = '任務1')       # print(p1.name) # 給子進程起名字       # for i in range(3):       #     p = Process(target=task, args=(1,))       #     print(p.name)  # 給子進程起名字       p1.start()       # p1.terminate()       # time.sleep(2)  # 睡一會,他就將我的子進程殺死了。       # print(p1.is_alive())  # False       print(p1.pid)       # print('主')       print(os.getpid())

程式碼示例

3.5 殭屍進程與孤兒進程

參考部落格:http://www.cnblogs.com/Anker/p/3271773.html    一:殭屍進程(有害)    殭屍進程:一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態資訊,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。詳解如下    我們知道在unix/linux中,正常情況下子進程是通過父進程創建的,子進程在創建新的進程。子進程的結束和父進程的運行是一個非同步過程,即父進程永遠無法預測子進程到底什麼時候結束,如果子進程一結束就立刻回收其全部資源,那麼在父進程內將無法獲取子進程的狀態資訊。    因此,UNⅨ提供了一種機制可以保證父進程可以在任意時刻獲取子進程結束時的狀態資訊:  1、在每個進程退出的時候,內核釋放該進程所有的資源,包括打開的文件,佔用的記憶體等。但是仍然為其保留一定的資訊(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)  2、直到父進程通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果進程不調用wait / waitpid的話,那麼保留的那段資訊就不會釋放,其進程號就會一直被佔用,但是系統所能使用的進程號是有限的,如果大量的產生僵死進程,將因為沒有可用的進程號而導致系統不能產生新的進程. 此即為殭屍進程的危害,應當避免。      任何一個子進程(init除外)在exit()之後,並非馬上就消失掉,而是留下一個稱為殭屍進程(Zombie)的數據結構,等待父進程處理。這是每個子進程在結束時都要經過的階段。如果子進程在exit()之後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。如果父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不經過殭屍狀態。  如果父進程在子進程結束之前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。    二:孤兒進程(無害)      孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,並由init進程對它們完成狀態收集工作。      孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工作。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置為init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程凄涼地結束了其生命周期的時候,init進程就會代表黨和政府出面處理它的一切善後工作。因此孤兒進程並不會有什麼危害。    我們來測試一下(創建完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成為孤兒進程,而非殭屍進程),文件內容    import os  import sys  import time    pid = os.getpid()  ppid = os.getppid()  print 'im father', 'pid', pid, 'ppid', ppid  pid = os.fork()  #執行pid=os.fork()則會生成一個子進程  #返回值pid有兩種值:  #    如果返回的pid值為0,表示在子進程當中  #    如果返回的pid值>0,表示在父進程當中  if pid > 0:      print 'father died..'      sys.exit(0)    # 保證主執行緒退出完畢  time.sleep(1)  print 'im child', os.getpid(), os.getppid()    執行文件,輸出結果:  im father pid 32515 ppid 32015  father died..  im child 32516 1    看,子進程已經被pid為1的init進程接收了,所以殭屍進程在這種情況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明周期結束自然會被init來銷毀。      三:殭屍進程危害場景:      例如有個進程,它定期的產 生一個子進程,這個子進程需要做的事情很少,做完它該做的事情之後就退出了,因此這個子進程的生命周期很短,但是,父進程只管生成新的子進程,至於子進程 退出之後的事情,則一概不聞不問,這樣,系統運行上一段時間之後,系統中就會存在很多的僵死進程,倘若用ps命令查看的話,就會看到很多狀態為Z的進程。 嚴格地來說,僵死進程並不是問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。因此,當我們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL訊號啦)。槍斃了元兇進程之後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。    四:測試  #1、產生殭屍進程的程式test.py內容如下    #coding:utf-8  from multiprocessing import Process  import time,os    def run():      print('子',os.getpid())    if __name__ == '__main__':      p=Process(target=run)      p.start()        print('主',os.getpid())      time.sleep(1000)      #2、在unix或linux系統上執行  [root@vm172-31-0-19 ~]# python3  test.py &  [1] 18652  [root@vm172-31-0-19 ~]# 主 18652  子 18653    [root@vm172-31-0-19 ~]# ps aux |grep Z  USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND  root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現殭屍進程  root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z    [root@vm172-31-0-19 ~]# top #執行top命令發現1zombie  top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12  Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie  %Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st  KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache  KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem      PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND  root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin      #3、  等待父進程正常結束後會調用wait/waitpid去回收殭屍進程  但如果父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的  解決方法一:殺死父進程  解決方法二:對開啟的子進程應該記得使用join,join會回收殭屍進程  參考python2源碼注釋  class Process(object):      def join(self, timeout=None):          '''          Wait until child process terminates          '''          assert self._parent_pid == os.getpid(), 'can only join a child process'          assert self._popen is not None, 'can only join a started process'          res = self._popen.wait(timeout)          if res is not None:              _current_process._children.discard(self)    join方法中調用了wait,告訴系統釋放殭屍進程。discard為從自己的children中剔除    解決方法三:http://blog.csdn.net/u010571844/article/details/50419798

詳細了解

四 守護進程

主進程創建守護進程

  其一:守護進程會在主進程程式碼執行結束後就終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程程式碼運行結束,守護進程隨即終止

from multiprocessing import Process  import time  import random    class Piao(Process):      def __init__(self,name):          self.name=name          super().__init__()      def run(self):          print('%s is piaoing' %self.name)          time.sleep(random.randrange(1,3))          print('%s is piao end' %self.name)      p=Piao('egon')  p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程程式碼執行結束,p即終止運行  p.start()  print('主')

程式碼示例

#主進程程式碼運行完畢,守護進程就會結束  from multiprocessing import Process  from threading import Thread  import time  def foo():      print(123)      time.sleep(1)      print("end123")    def bar():      print(456)      time.sleep(3)      print("end456")      p1=Process(target=foo)  p2=Process(target=bar)    p1.daemon=True  p1.start()  p2.start()  print("main-------") #列印該行則主進程程式碼結束,則守護進程p1應該被終止,可能會有p1任務執行的列印資訊123,因為主進程列印main----時,p1也執行了,但是隨即被終止

經典例題

五 進程同步(鎖)

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個列印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

#並發運行,效率高,但競爭同一列印終端,帶來了列印錯亂  from multiprocessing import Process  import os,time  def work():      print('%s is running' %os.getpid())      time.sleep(2)      print('%s is done' %os.getpid())    if __name__ == '__main__':      for i in range(3):          p=Process(target=work)          p.start()    並發運行,效率高,但競爭同一列印終端,帶來了列印錯亂

不加鎖,效率高但是順序容易錯亂

#由並發變成了串列,犧牲了運行效率,但避免了競爭  from multiprocessing import Process,Lock  import os,time  def work(lock):      lock.acquire()      print('%s is running' %os.getpid())      time.sleep(2)      print('%s is done' %os.getpid())      lock.release()  if __name__ == '__main__':      lock=Lock()      for i in range(3):          p=Process(target=work,args=(lock,))          p.start()    加鎖:由並發變成了串列,犧牲了運行效率,但避免了競爭

加鎖處理,犧牲了效率,但是保證了順序

上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串列了,這樣確實會浪費了時間,卻保證了數據的安全。

接下來,我們以模擬搶票為例,來看看數據安全的重要性。

#文件db的內容為:{"count":1}  #注意一定要用雙引號,不然json無法識別  #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂  from multiprocessing import Process,Lock  import time,json,random  def search():      dic=json.load(open('db'))      print('33[43m剩餘票數%s33[0m' %dic['count'])    def get():      dic=json.load(open('db'))      time.sleep(0.1) #模擬讀數據的網路延遲      if dic['count'] >0:          dic['count']-=1          time.sleep(0.2) #模擬寫數據的網路延遲          json.dump(dic,open('db','w'))          print('33[43m購票成功33[0m')    def task():      search()      get()    if __name__ == '__main__':      for i in range(100): #模擬並發100個客戶端搶票          p=Process(target=task)          p.start()    多進程同時搶購余票

多進程搶票

#文件db的內容為:{"count":5}  #注意一定要用雙引號,不然json無法識別  #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂  from multiprocessing import Process,Lock  import time,json,random  def search():      dic=json.load(open('db'))      print('33[43m剩餘票數%s33[0m' %dic['count'])    def get():      dic=json.load(open('db'))      time.sleep(random.random()) #模擬讀數據的網路延遲      if dic['count'] >0:          dic['count']-=1          time.sleep(random.random()) #模擬寫數據的網路延遲          json.dump(dic,open('db','w'))          print('33[32m購票成功33[0m')      else:          print('33[31m購票失敗33[0m')    def task(lock):      search()      lock.acquire()      get()      lock.release()    if __name__ == '__main__':      lock = Lock()      for i in range(100): #模擬並發100個客戶端搶票          p=Process(target=task,args=(lock,))          p.start()    使用鎖來保證數據安全  

使用鎖保證數據安全

#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。  雖然可以用文件共享數據實現進程間通訊,但問題是:  1.效率低(共享數據基於文件,而文件是硬碟上的數據)  2.需要自己加鎖處理    #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊記憶體的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於消息的IPC通訊機制:隊列和管道。  隊列和管道都是將數據存放於記憶體中  隊列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,  我們應該盡量避免使用共享數據,儘可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。  

六 隊列

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模組支援兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 創建隊列的類(底層就是以管道和鎖定的方式實現)

1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。  

參數介紹:

1 maxsize是隊列中允許最大項數,省略則無大小限制。  

  方法介紹:

  主要方法:

1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。  2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.  3  4 q.get_nowait():同q.get(False)  5 q.put_nowait():同q.put(False)  6  7 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。  8 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。  9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣  

View Code

​   其他方法(了解):

1 q.cancel_join_thread():不會在進程退出時自動連接後台執行緒。可以防止join_thread()方法阻塞  2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後台執行緒將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束訊號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。  3 q.join_thread():連接隊列的後台執行緒。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為  

View Code

  應用:

'''  multiprocessing模組支援進程間通訊的兩種主要形式:管道和隊列  都是基於消息傳遞實現的,但是隊列介面  '''    from multiprocessing import Process,Queue  import time  q=Queue(3)      #put ,get ,put_nowait,get_nowait,full,empty  q.put(3)  q.put(3)  q.put(3)  print(q.full()) #滿了    print(q.get())  print(q.get())  print(q.get())  print(q.empty()) #空了  

View Code

  生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理數據的速度。

為什麼要使用生產者和消費者模式

在執行緒世界裡,生產者就是生產數據的執行緒,消費者就是消費數據的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue  import time,random,os  def consumer(q):      while True:          res=q.get()          time.sleep(random.randint(1,3))          print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))    def producer(q):      for i in range(10):          time.sleep(random.randint(1,3))          res='包子%s' %i          q.put(res)          print('33[44m%s 生產了 %s33[0m' %(os.getpid(),res))    if __name__ == '__main__':      q=Queue()      #生產者們:即廚師們      p1=Process(target=producer,args=(q,))        #消費者們:即吃貨們      c1=Process(target=consumer,args=(q,))        #開始      p1.start()      c1.start()      print('主')  

View Code

#生產者消費者模型總結        #程式中有兩類角色          一類負責生產數據(生產者)          一類負責處理數據(消費者)        #引入生產者消費者模型為了解決的問題是:          平衡生產者與消費者之間的工作能力,從而提高程式整體處理數據的速度        #如何實現:          生產者<-->隊列<——>消費者      #生產者消費者模型實現類程式的解耦和  

小結

時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死循環

from multiprocessing import Process,Queue  import time,random,os  def consumer(q):      while True:          res=q.get()          if res is None:break #收到結束訊號則結束          time.sleep(random.randint(1,3))          print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))    def producer(q):      for i in range(10):          time.sleep(random.randint(1,3))          res='包子%s' %i          q.put(res)          print('33[44m%s 生產了 %s33[0m' %(os.getpid(),res))      q.put(None) #發送結束訊號  if __name__ == '__main__':      q=Queue()      #生產者們:即廚師們      p1=Process(target=producer,args=(q,))        #消費者們:即吃貨們      c1=Process(target=consumer,args=(q,))        #開始      p1.start()      c1.start()      print('主')    生產者在生產完畢後發送結束訊號None  

生產者在生產完畢後發送結束訊號None

注意:結束訊號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束後才應該發送該訊號

from multiprocessing import Process,Queue  import time,random,os  def consumer(q):      while True:          res=q.get()          if res is None:break #收到結束訊號則結束          time.sleep(random.randint(1,3))          print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))    def producer(q):      for i in range(2):          time.sleep(random.randint(1,3))          res='包子%s' %i          q.put(res)          print('33[44m%s 生產了 %s33[0m' %(os.getpid(),res))    if __name__ == '__main__':      q=Queue()      #生產者們:即廚師們      p1=Process(target=producer,args=(q,))        #消費者們:即吃貨們      c1=Process(target=consumer,args=(q,))        #開始      p1.start()      c1.start()        p1.join()      q.put(None) #發送結束訊號      print('主')    主進程在生產者生產完畢後發送結束訊號None  

主進程在生產者生產完畢之後發送None

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

from multiprocessing import Process,Queue  import time,random,os  def consumer(q):      while True:          res=q.get()          if res is None:break #收到結束訊號則結束          time.sleep(random.randint(1,3))          print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))    def producer(name,q):      for i in range(2):          time.sleep(random.randint(1,3))          res='%s%s' %(name,i)          q.put(res)          print('33[44m%s 生產了 %s33[0m' %(os.getpid(),res))        if __name__ == '__main__':      q=Queue()      #生產者們:即廚師們      p1=Process(target=producer,args=('包子',q))      p2=Process(target=producer,args=('骨頭',q))      p3=Process(target=producer,args=('泔水',q))        #消費者們:即吃貨們      c1=Process(target=consumer,args=(q,))      c2=Process(target=consumer,args=(q,))        #開始      p1.start()      p2.start()      p3.start()      c1.start()        p1.join() #必須保證生產者全部生產完畢,才應該發送結束訊號      p2.join()      p3.join()      q.put(None) #有幾個消費者就應該發送幾次結束訊號None      q.put(None) #發送結束訊號      print('主')    有幾個消費者就需要發送幾次結束訊號:相當low  

有幾個消費者就需要發送幾個結束訊號

七 管道

進程間通訊(IPC)方式二:管道(不推薦使用,了解即可)

#創建管道的類:  Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道  #參數介紹:  dumplex:默認管道是全雙工的,如果將duplex射成False,conn1隻能用於接收,conn2隻能用於發送。  #主要方法:      conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麼recv方法會拋出EOFError。      conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象   #其他方法:  conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法  conn1.fileno():返回連接使用的整數文件描述符  conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的位元組消息。maxlength指定要接收的最大位元組數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。  conn.send_bytes(buffer [, offset [, size]]):通過連接發送位元組數據緩衝區,buffer是支援緩衝區介面的任意對象,offset是緩衝區中的位元組偏移量,而size是要發送位元組數。結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收    conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組消息,並把它保存在buffer對象中,該對象支援可寫入的緩衝區介面(即bytearray對象或類似的對象)。offset指定緩衝區中放置消息處的位元組位移。返回值是收到的位元組數。如果消息長度大於可用的緩衝區空間,將引發BufferTooShort異常。    介紹  

概念介紹

from multiprocessing import Process,Pipe    import time,os  def consumer(p,name):      left,right=p      left.close()      while True:          try:              baozi=right.recv()              print('%s 收到包子:%s' %(name,baozi))          except EOFError:              right.close()              break  def producer(seq,p):      left,right=p      right.close()      for i in seq:          left.send(i)          # time.sleep(1)      else:          left.close()  if __name__ == '__main__':      left,right=Pipe()        c1=Process(target=consumer,args=((left,right),'c1'))      c1.start()          seq=(i for i in range(10))      producer(seq,(left,right))        right.close()      left.close()        c1.join()      print('主進程')    基於管道實現進程間通訊(與隊列的方式是類似的,隊列就是管道加鎖實現的)  

基於管道實現進程間的通訊(與隊列方式相似,隊列就是管道加鎖實現的)

但是,管道是有問題的,管道會造成數據的不安全,官方給予的解釋是管道有可能會造成數據損壞。

八 進程池和mutiprocess.Poll

為什麼要有進程池?進程池的概念。

  在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程(空間,變數,文件資訊等等的內容)也需要消耗時間。第二即便開啟了成千上萬的進程,作業系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束進程。就看我們上面的一些程式碼例子,你會發現有些程式是不是執行的時候比較慢才出結果,就是這個原因,那麼我們要怎麼做呢?

  在這裡,要給大家介紹一個進程池的概念,定義一個池子,在裡面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增加作業系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果

multiprocess.Poll模組

  創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據你的並發量,搞成動態增加或減少進程池中的進程數量的操作),不會開啟其他進程,提高作業系統效率,減少空間的佔用等。

  概念介紹:

Pool([numprocess  [,initializer [, initargs]]]):創建進程池  
numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值  initializer:是每個工作進程啟動時要執行的可調用對象,默認為None  initargs:是要傳給initializer的參數組  

參數介紹

p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。  '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同執行緒調用p.apply()函數或者使用p.apply_async()'''    p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。  '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''    p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成    P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用  

主要方法介紹

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法  obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。  obj.ready():如果調用完成,返回True  obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常  obj.wait([timeout]):等待結果變為可用。  obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數    其他方法(了解)  

其他方法了解

import time  from multiprocessing import Pool,Process    #針對range(100)這種參數的  # def func(n):  #     for i in range(3):  #         print(n + 1)    def func(n):      print(n)      # 結果:      #     (1, 2)      #     alex  def func2(n):      for i in range(3):          print(n - 1)  if __name__ == '__main__':      #1.進程池的模式      s1 = time.time()  #我們計算一下開多進程和進程池的執行效率      poll = Pool(5) #創建含有5個進程的進程池      # poll.map(func,range(100)) #非同步調用進程,開啟100個任務,map自帶join的功能      poll.map(func,[(1,2),'alex']) #非同步調用進程,開啟100個任務,map自帶join的功能      # poll.map(func2,range(100))  #如果想讓進程池完成不同的任務,可以直接這樣搞      #map只限於接收一個可迭代的數據類型參數,列表啊,元祖啊等等,如果想做其他的參數之類的操作,需要用後面我們要學的方法。      # t1 = time.time() - s1      #      # #2.多進程的模式      # s2 = time.time()      # p_list = []      # for i in range(100):      #     p = Process(target=func,args=(i,))      #     p_list.append(p)      #     p.start()      # [pp.join() for pp in p_list]      # t2 = time.time() - s2      #      # print('t1>>',t1) #結果:0.5146853923797607s 進程池的效率高      # print('t2>>',t2) #結果:12.092015027999878s    進程池的簡單應用及與進程池的效率對比  

進程池的簡單應用與效率對比

有一點,map是非同步執行的,並且自帶close和join

  一般約定俗成的是進程池中的進程數量為CPU的數量,工作中要看具體情況來考量。

  實際應用程式碼示例:

  同步與非同步兩種執行方式:

import os,time  from multiprocessing import Pool    def work(n):      print('%s run' %os.getpid())      time.sleep(1)      return n**2    if __name__ == '__main__':      p=Pool(3) #進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務      res_l=[]      for i in range(10):          res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞                                      # 但不管該任務是否存在阻塞,同步調用都會在原地等著          res_l.append(res)      print(res_l)    進程池的同步調用  

進程池的同步調用

import os  import time  import random  from multiprocessing import Pool    def work(n):      print('%s run' %os.getpid())      time.sleep(random.random())      return n**2    if __name__ == '__main__':      p=Pool(3) #進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務      res_l=[]      for i in range(10):          res=p.apply_async(work,args=(i,)) # 非同步運行,根據進程池中有的進程數,每次最多3個子進程在非同步執行,並且可以執行不同的任務,傳送任意的參數了。                                            # 返回結果之後,將結果放入列表,歸還進程,之後再執行新的任務                                            # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束                                            # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。          res_l.append(res)        # 非同步apply_async用法:如果使用非同步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然後可以用get收集結果      # 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了      p.close() #不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來。      p.join()   #感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法      for res in res_l:          print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get    進程池的非同步調用  

進程池的非同步調用

#一:使用進程池(非同步調用,apply_async)  #coding: utf-8  from multiprocessing import Process,Pool  import time    def func(msg):      print( "msg:", msg)      time.sleep(1)      return msg    if __name__ == "__main__":      pool = Pool(processes = 3)      res_l=[]      for i in range(10):          msg = "hello %d" %(i)          res=pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去          res_l.append(res)          # s = res.get() #如果直接用res這個結果對象調用get方法獲取結果的話,這個程式就變成了同步,因為get方法直接就在這裡等著你創建的進程的結果,第一個進程創建了,並且去執行了,那麼get就會等著第一個進程的結果,沒有結果就一直等著,那麼主進程的for循環是無法繼續的,所以你會發現變成了同步的效果      print("==============================>") #沒有後面的join,或get,則程式整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了        pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成      pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束        print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果      for i in res_l:          print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get    #二:使用進程池(同步調用,apply)  #coding: utf-8  from multiprocessing import Process,Pool  import time    def func(msg):      print( "msg:", msg)      time.sleep(0.1)      return msg    if __name__ == "__main__":      pool = Pool(processes = 3)      res_l=[]      for i in range(10):          msg = "hello %d" %(i)          res=pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去          res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個      print("==============================>")      pool.close()      pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束        print(res_l) #看到的就是最終的結果組成的列表      for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法          print(i)  

詳解:apply_async和apply

進程池版的socket並發聊天程式碼示例:

#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())  #開啟6個客戶端,會發現2個客戶端處於等待狀態  #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程  from socket import *  from multiprocessing import Pool  import os    server=socket(AF_INET,SOCK_STREAM)  server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)  server.bind(('127.0.0.1',8080))  server.listen(5)    def talk(conn):      print('進程pid: %s' %os.getpid())      while True:          try:              msg=conn.recv(1024)              if not msg:break              conn.send(msg.upper())          except Exception:              break    if __name__ == '__main__':      p=Pool(4)      while True:          conn,*_=server.accept()          p.apply_async(talk,args=(conn,))          # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問    server端:tcp_server.py  

server端:tcp_server.py

from socket import *    client=socket(AF_INET,SOCK_STREAM)  client.connect(('127.0.0.1',8080))      while True:      msg=input('>>: ').strip()      if not msg:continue        client.send(msg.encode('utf-8'))      msg=client.recv(1024)      print(msg.decode('utf-8'))    client端:tcp_client.py  

client端:tcp_client.py

 發現:並發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.

  同時最多和4個人進行聊天,因為進程池中只有4個進程可供調用,那有同學會問,我們這麼多人想同時聊天怎麼辦,又不讓用多進程,進程池也不能開太多的進程,那咋整啊,後面我們會學到多執行緒,到時候大家就知道了,現在你們先這樣記住就好啦

然後我們再提一個回調函數

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。    我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果  
import os  from multiprocessing import Pool    def func1(n):      print('func1>>',os.getpid())      print('func1')      return n*n    def func2(nn):      print('func2>>',os.getpid())      print('func2')      print(nn)      # import time      # time.sleep(0.5)  if __name__ == '__main__':      print('主進程:',os.getpid())      p = Pool(5)      #args裡面的10給了func1,func1的返回值作為回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值      # for i in range(10,20): #如果是多個進程來執行任務,那麼當所有子進程將結果給了回調函數之後,回調函數又是在主進程上執行的,那麼就會出現列印結果是同步的效果。我們上面func2裡面註銷的時間模組打開看看      #     p.apply_async(func1,args=(i,),callback=func2)      p.apply_async(func1,args=(10,),callback=func2)        p.close()      p.join()    #結果  # 主進程: 11852  #發現回調函數是在主進程中完成的,其實如果是在子進程中完成的,那我們直接將程式碼寫在子進程的任務函數func1裡面就行了,對不對,這也是為什麼稱為回調函數的原因。  # func1>> 17332  # func1  # func2>> 11852  # func2  # 100    回調函數的簡單使用  

回調函數的簡單應用

 回調函數在寫的時候注意一點,回調函數的形參執行有一個,如果你的執行函數有多個返回值,那麼也可以被回調函數的這一個形參接收,接收的是一個元祖,包含著你執行函數的所有返回值。

  

  使用進程池來搞爬蟲的時候,最耗時間的是請求地址的網路請求延遲,那麼如果我們在將處理數據的操作加到每個子進程中,那麼所有在進程池後面排隊的進程就需要等更長的時間才能獲取進程池裡面的執行進程來執行自己,所以一般我們就將請求作成一個執行函數,通過進程池去非同步執行,剩下的數據處理的內容放到另外一個進程或者主進程中去執行,將網路延遲的時間也利用起來,效率更高。

  requests這個模組的get方法請求頁面,就和我們在瀏覽器上輸入一個網址然後回車去請求別人的網站的效果是一樣的。安裝requests模組的指令:在cmd窗口執行pip install requests。

import requests  response = requests.get('http://www.baidu.com')  print(response)  print(response.status_code) #200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤  print(response.content.decode('utf-8'))  

爬蟲相關的requests模組簡單使用

from multiprocessing import Pool  import requests  import json  import os    def get_page(url):      print('<進程%s> get %s' %(os.getpid(),url))      respone=requests.get(url)      if respone.status_code == 200:          return {'url':url,'text':respone.text}    def pasrse_page(res):      print('<進程%s> parse %s' %(os.getpid(),res['url']))      parse_res='url:<%s> size:[%s]n' %(res['url'],len(res['text']))      with open('db.txt','a') as f:          f.write(parse_res)      if __name__ == '__main__':      urls=[          'https://www.baidu.com',          'https://www.python.org',          'https://www.openstack.org',          'https://help.github.com/',          'http://www.sina.com.cn/'      ]        p=Pool(3)      res_l=[]      for url in urls:          res=p.apply_async(get_page,args=(url,),callback=pasrse_page)          res_l.append(res)        p.close()      p.join()      print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了    '''  列印結果:  <進程3388> get https://www.baidu.com  <進程3389> get https://www.python.org  <進程3390> get https://www.openstack.org  <進程3388> get https://help.github.com/  <進程3387> parse https://www.baidu.com  <進程3389> get http://www.sina.com.cn/  <進程3387> parse https://www.python.org  <進程3387> parse https://help.github.com/  <進程3387> parse http://www.sina.com.cn/  <進程3387> parse https://www.openstack.org  [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>rn...',...}]  '''    使用多進程請求多個url來減少網路等待浪費的時間  

使用多進程請求多個url來減少網路等待浪費的時間

from multiprocessing import Pool  import time,random  import requests  import re    def get_page(url,pattern):      response=requests.get(url)      if response.status_code == 200:          return (response.text,pattern)    def parse_page(info):      page_content,pattern=info      res=re.findall(pattern,page_content)      for item in res:          dic={              'index':item[0],              'title':item[1],              'actor':item[2].strip()[3:],              'time':item[3][5:],              'score':item[4]+item[5]            }          print(dic)  if __name__ == '__main__':      pattern1=re.compile(r'<dd>.*?board-index.*?>(d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)        url_dic={          'http://maoyan.com/board/7':pattern1,      }        p=Pool()      res_l=[]      for url,pattern in url_dic.items():          res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)          res_l.append(res)        for i in res_l:          i.get()        # res=requests.get('http://maoyan.com/board/7')      # print(re.findall(pattern,res.text))    爬蟲示例  

爬蟲示例

如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

from multiprocessing import Pool  import time,random,os    def work(n):      time.sleep(1)      return n**2  if __name__ == '__main__':      p=Pool()        res_l=[]      for i in range(10):          res=p.apply_async(work,args=(i,))          res_l.append(res)        p.close()      p.join() #等待進程池中所有進程執行完畢        nums=[]      for res in res_l:          nums.append(res.get()) #拿到所有結果      print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理    無需回調函數的示例  

無需回調函數的示例

進程池和訊號量的區別:

  進程池是多個需要被執行的任務在進程池外面排隊等待獲取進程對象去執行自己,而訊號量是一堆進程等待著去執行一段邏輯程式碼。

  訊號量不能控制創建多少個進程,但是可以控制同時多少個進程能夠執行,但是進程池能控制你可以創建多少個進程。

  舉例:就像那些開大車拉煤的,訊號量是什麼呢,就好比我只有五個車道,你每次只能過5輛車,但是不影響你創建100輛車,但是進程池相當於什麼呢?相當於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。

  其他語言裡面有更高級的進程池,在設置的時候,可以將進程池中的進程動態的創建出來,當需求增大的時候,就會自動在進程池中添加進程,需求小的時候,自動減少進程,並且可以設置進程數量的上線,最多為多,python裡面沒有。

  進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html