Python第十二章-多進程和多線程01-多進程

多進程和多線程

一、進程

1.1 進程的引入

現實生活中,有很多的場景中的事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;試想,如果把唱歌和跳舞這2件事情分開依次完成的話,估計就沒有那麼好的效果了(想一下場景:先唱歌,然後在跳舞,O(∩_∩)O哈哈~)

程序中

如下程序,來模擬「唱歌跳舞」這件事情

# 模擬唱歌,跳舞  from time import sleep      def sing():      for i in range(3):          print("正在唱歌...%d"%i)          sleep(1)      def dance():      for i in range(3):          print("正在跳舞...%d"%i)          sleep(1)      if __name__ == '__main__':          sing()  # 唱歌          dance() # 跳舞    

運行結果

注意

  • 很顯然剛剛的程序並沒有完成唱歌和跳舞同時進行的要求
  • 如果想要實現「唱歌跳舞」同時進行,那麼就需要一個新的方法,叫做:多任務

1.2 多任務的概念

什麼叫「多任務」呢?簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務,至少同時有3個任務正在運行。還有很多任務悄悄地在後台同時運行着,只是桌面上沒有顯示而已。

現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?

答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。

真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。

其實就是CPU執行速度太快啦。。

1.2.1 進程

每個獨立執行的程序稱為進程

進程是程序的一次動態執行過程,它經歷了從代碼加載、執行到執行完畢的一個完整過程,這個過程也是進程本身從產生、發展到最終消亡的過程。

多進程(多任務)操作系統能同時運行多個進程(程序),由於CPU具備分時機制,所以每個進程都能循環獲得自己的CPU時間片。由於CPU執行速度非常快,使得所有程序好象是在「同時」運行一樣。

在操作系統中進程是進行系統資源分配、調度和管理的最小單位,進程在執行過程中擁有獨立的內存單元。

比如:Windows採用進程作為最小隔離單位,每個進程都有自己的數據段、代碼段,並且與別的進程沒有任何關係。因此進程間進行信息交互比較麻煩。

進程也可以通過派生 (fork 或 spawn)新的進程來執行其他任務,不過因為每個新進程也都擁有自己的內存和數據棧等,所以只能採用進程間通信(IPC)的方式共享信息。

1.2.2 線程

為了解決進程調度資源的浪費,為了能夠共享資源,出現了線程。有時候把線程稱之為輕量級進程.

線程是CPU調度和分派的基本單位,它可與同屬一個進程的其他的線程共享進程所擁有的全部資源,多個線程共享內存,從而極大地提高了程序的運行效率。

線程是比進程更小的執行單位,線程是進程內部單一的一個順序控制流。

所謂多線程是指一個進程在執行過程中可以產生多個線程,這些線程可以同時存在、同時運行,形成多條執行線索。一個進程可能包含了多個同時執行的線程。

一個或更多的線程構成了一個進程(操作系統是以進程為單位的,而進程是以線程為單位的,進程中必須有一個主線程main)

如果一個進程沒有了,那麼這個進程內的所有線程肯定會消失,如果線程消失了,但是進程未必會消失。只有所有的線程都結束了,進程才會結束!!!而且所有線程都是在進程的基礎之上同時運行。

1.3 Python 和並發編程

在大多數系統上, Python支持多進程(基於消息傳遞)編程和多線程編程.

大多數人比較熟悉的是多線程編程, 但是在python中的多線程編程卻是有諸多的限制.

python中多線程的限制

為了線程安全考慮, python的解釋器還是使用了內部的GIL(Global Interperter Lock, 全局解釋器鎖定), 在任意時刻只運行單個python的線程執行.即使有多個可用的cpu核心, 也是如此.這就限制了python只能在一個cpu核心上運行.

GIL的存在直接影響了程序的並發編程問題.

如果一個應用程序是大部分與I/O相關, 那麼使用線程一般沒有問題, 因為大部分時間是在I/O等待.

如果一個應用程序是CPU密集型的, 則使用多線程的壞處大於好處, 返回會降低程序的運行速度, 一般比你想像的還要慢的多.

因此, 用戶在有些情況需要使用多進程(子進程和消息傳遞)

子進程和消息傳遞
展望未來, 如果要再python中進行各種類型的並發編程, 消息傳遞應該是最應該掌握的概念.

1.4 multiprocessing包

multiprocessing是一個package, 這個包支持使用類似threading模塊的類似API去創建新的進程.

multiprocessing支持本地和遠程並發編程, 通過使用子進程來代替線程高效的規避了GIL問題.

所以, multiprocessing允許程序員重複利用給定計算機的多核cpu.

由於python的跨平台, 所以multiprocessing支持多個平台:unix, window, linux.

1.4.1 Process類

Process語法結構如下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:表示這個進程實例所調用對象;
  • args:表示調用對象的位置參數元組;
  • kwargs:表示調用對象的關鍵字參數字典;
  • name:為當前進程實例的別名;
  • group:大多數情況下用不到;

最簡單的使用代碼:

# 從multiprocessing中導入Process  from multiprocessing import Process  import os      # 子進程要執行的代碼  def run_proc(name):      print('子進程運行中,hello,= %s ,pid=%d...' % (name, os.getpid()))      if __name__ == '__main__':  # 判斷是否為主程序      print('父進程 %d.' % os.getpid())      """      創建Process對象, 表示一個子進程.      1. target參數表示子進程要做的任務(一個可執行對象)      2. args是一個元組, 表示傳遞給target的可執行對象的位置參數.          本例中就是把"王二狗"傳遞給函數f的name參數      """      p = Process(target=run_proc, args=('王二狗',))      print('子進程將要執行。。')      p.start() # 啟動子進程      p.join() # 等待進程終止      print("子進程已經終止")    

說明

  1. 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動。
  2. join()方法可以等待子進程結束後再繼續往下運行,通常用於進程間的同步。
1.4.1.1 Process類的實例具有以下方法

Process實例p具有以下方法:

  1. p.start()

    啟動子進程. 這將運行代表進程的子進程, 並調用該子進程中的p.run()方法.

  2. p.join([timeout])

    等待進程p終止, timeout是可選的超時時間. 這個方法通常用戶進程間的同步.

  3. p.is_alive()

    測試進程p是否還在運行, 如果扔在運行, 則返回True

  4. run():

    如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法;

  5. p.terminate()

    強制終止p進程. 如果調用此方法, 進程p將立即被終止, 同時不會進行任何清理工作. 如果再進程p中也開啟了子進程, 則這些子進程將成為僵死進程.s如果p保存了一個鎖定或有進程間通信, 那麼終止可能會導致死鎖或I/O崩潰.

1.4.1.2 Process實例具有以下實例屬性:

Process實例p具有以下實例屬性:

  1. p.daemon

    一個布爾標誌, 指示這個進程是否為後台進程. 當創建他的python進程終止時, 後台進程將自動終止.

    另外禁止後台進程創建自己的新進程. p.daemon的值必須再進程啟動前設置.

  2. p.exitcode

    進程的整數退出碼. 如果進程仍在運行, 則它的值是None. 如果是負數, -N表示由信號N所終止

  3. p.name

    當前進程實例別名,默認為Process-N,N為從1開始遞增的整數;

  4. p.pid

    進程的整數ID

1.4.1.3 實例

實例1:

from multiprocessing import Process  import os  from time import sleep      # 子進程要執行的代碼  def run_proc(name, age, **kwargs):      for i in range(10):          print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))          print(kwargs)          sleep(0.5)      if __name__=='__main__':      print('父進程 %d.' % os.getpid())      p = Process(target=run_proc, args=('test',18), kwargs={"m":20})      print('子進程將要執行')      p.start()      sleep(1)      p.terminate()      p.join()      print('子進程已結束')  

運行結果:

實例2:

from multiprocessing import Process  import time  import os      # 兩個子進程將會調用的兩個方法  def worker_1(interval):      print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(), os.getpid()))      t_start = time.time()      time.sleep(interval)  # 程序將會被掛起interval秒      t_end = time.time()      print("worker_1,執行時間為'%0.2f'秒" % (t_end - t_start))      def worker_2(interval):      print("worker_2,父進程(%s),當前進程(%s)" % (os.getppid(), os.getpid()))      t_start = time.time()      time.sleep(interval)      t_end = time.time()      print("worker_2,執行時間為'%0.2f'秒" % (t_end - t_start))      if __name__ == '__main__':  # 判斷是否為主程序      # 輸出當前程序的ID      print("進程ID:%s" % os.getpid())        """      創建兩個進程對象,target指向這個進程對象要執行的對象名稱,      args後面的元組中,是要傳遞給worker_1方法的參數,      因為worker_1方法就一個interval參數,這裡傳遞一個整數2給它,      如果不指定name參數,默認的進程對象名稱為Process-N,N為一個遞增的整數      """      p1=Process(target=worker_1, args=(2,))      p2=Process(target=worker_2, name="王二狗", args=(1,))        # 使用"進程對象名稱.start()"來創建並執行一個子進程,      # 這兩個進程對象在start後,就會分別去執行worker_1和worker_2方法中的內容      p1.start()      p2.start()        # 同時父進程仍然往下執行,如果p2進程還在執行,將會返回True      print("p2.is_alive=%s " % p2.is_alive())        # 輸出p1和p2進程的別名和pid      print("p1.name=%s" % p1.name)      print("p1.pid=%s" % p1.pid)      print("p2.name=%s" % p2.name)      print("p2.pid=%s" % p2.pid)          """      join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成後,再繼續執行下面的語句,一般用於進程間的數據同步      如果不寫這一句,下面的is_alive判斷將會是True,      改成p1.join(1),      因為p2需要2秒以上才可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成,所以下面的print會輸出True,即p1仍然在執行      """      p1.join()      print("p1.is_alive=%s" % p1.is_alive())  

運行結果:

1.4.1.4 進程的創建-Process子類

創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象

示例代碼:

from multiprocessing import Process  import time  import os      # 繼承Process類  class ProcessClass(Process):      """      因為Process類本身也有__init__方法,這個子類相當於重寫了這個方法,      但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性,      最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作      """      def __init__(self,interval):          Process.__init__(self)          self.interval = interval        # 重寫了Process類的run()方法      def run(self):          print("子進程(%s) 開始執行,父進程為(%s)" % (os.getpid(), os.getppid()))          t_start = time.time()          time.sleep(self.interval)          t_stop = time.time()          print("(%s)執行結束,耗時%0.2f秒"%(os.getpid(), t_stop-t_start))      if __name__ == "__main__":      t_start = time.time()      print("當前程序進程(%s)"%os.getpid())      p1 = ProcessClass(2)      # 對一個不包含target屬性的Process類執行start()方法,就會運行這個類中的run()方法,所以這裡會執行p1.run()      p1.start()      p1.join()      t_stop = time.time()      print("(%s)執行結束,耗時%0.2f"%(os.getpid(),t_stop-t_start))  

運行結果:

1.4.2 進程池:Pool

當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。使用類Pool可以創建進程池, 然後把各種數據處理任務都提交給進程池.

初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來執行

Pool([numprocess, initializer, initargs])  

說明:

numprocess 是指要創建的線程數. 默認是cpu的核心數.(os.cpu_count()的返回值)

initializer 是每個進程啟動時要執行的可調用對象, 默認是None

initargs是傳遞給initializer的元組參數.

1.4.2.1 multiprocessing.Pool常用函數解析:
  • apply(func[, args[, kwds]]):使用阻塞方式調用func

    在進程池的一個工作進程中執行func函數, args是傳給func的元組參數. 注意使用這個方法讓多個進程去執行, 他們是同步執行的. 即:多個進程是順序執行的.
    func的返回值就是p.apply的返回值.

  • apply_async(func[, args, kwargs, callback]) :使用非阻塞方式調用func

    (並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給異步的執行func

    callback 是可調用對象, 當func執行結束, 則立即調用callback並把func的返回值傳遞給callback.

  • func的關鍵字參數列表;

  • close():關閉Pool,使其不再接受新的任務;

  • terminate():不管任務是否完成,立即終止;

  • join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;

AsyncResult對象(apply_async()的返回值)

apply_async()的返回值是AsyncResult實例. 具有如下方法:

  1. a.get([timeout])

等待返回結果, 結果就是任務函數的返回值.

  1. a.ready()

如果任務函數執行結束返回True

  1. a.successful()

如果任務函數執行結束, 且在執行的過程中沒有發生異常則

  1. a.wait([timeout])

等待任務結束, 這個方法與get()的區別就是它沒有返回值.