【Python之旅】第六篇(六):Pyt
- 2020 年 1 月 11 日
- 筆記
關於進程與線程的對比,下面的解釋非常好的說明了這兩者的區別:

這裡主要說明關於Python多進程的下面幾點:
1.多進程的使用方法 2.進程間的通信之multiprocessing.Manager()使用 3.Python進程池 (1)比較簡單的例子 (2)多個進程多次並發的情況 (3)驗證apply.async方法是非阻塞的 (4)驗證apply.async中的get()方法是阻塞的
1.多進程的使用方法
直接給出下面程序代碼及注釋:
from multiprocessing import Process #從多進程模塊中導入Process import time def sayHi(name): print 'Hi my name is %s' % name time.sleep(3) for i in range(10): p = Process(target=sayHi, args=(i,)) #調用多進程使用方法 p.start() #開始執行多進程
程序執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing8.py Hi my name is 2 Hi my name is 3 Hi my name is 6 Hi my name is 1 Hi my name is 4 Hi my name is 5 Hi my name is 0 Hi my name is 7 Hi my name is 8 Hi my name is 9
輸出順序不一致,則是因為屏幕的搶佔問題而已,但不同的進程執行是並發的。在執行程序的過程中,可以打開另一個窗口來查看進程的執行情況(上面sleep了3秒,所以速度一定要快):
xpleaf@xpleaf-machine:~$ ps -ef | grep mul* xpleaf 10468 1827 1 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10469 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10470 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10471 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10472 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10473 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10474 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10475 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10476 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10477 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10478 10468 0 19:34 pts/1 00:00:00 python multiprocssing8.py xpleaf 10480 8436 0 19:34 pts/2 00:00:00 grep --color=auto mul*
可以看到上面有11個進程,但是前面其實只開了10個進程,為什麼會有11個呢?那是因為有一個主進程,即這整一個程序本身,而其它的10個進程則是這個主進程下面的子進程,但無論如何,它們都是進程。
同多線程一樣,多進程也有join方法,即可以在p.start()後面加上去,一個進程的執行需要等待上一個進程執行完畢後才行,這就相當於進程的執行就是串行的了。
2.進程間的通信multiprocessing.Manager()使用
Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。
Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
直接看下面的一個例子:
import multiprocessing import time def worker(d, key, value): d[key] = value mgr = multiprocessing.Manager() d = mgr.dict() jobs = [] #用來接收多進程函數的返回的結果,存放的是函數的入口 for i in range(10): jobs.append(multiprocessing.Process(target=worker,args=(d,i,i*i))) for j in jobs: #執行存放的函數入口 j.start() for j in jobs: #檢測進程是否執行完畢 j.join() #time.sleep(1) #如果有join()來進程進程是否執行完畢,則這裡可以省略 print ('Results:' ) print d
程序執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_manager9.py Results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
3.Python進程池
前面我們講過CPU在某一時刻只能執行一個進程,那為什麼上面10個進程還能夠並發執行呢?實際在CPU在處理上面10個進程時是在不停的切換執行這10個進程,但由於上面10個進程的程序代碼都是十分簡單的,並沒有涉及什麼複雜的功能,並且,CPU的處理速度實在是非常快,所以這樣一個過程在我們人為感知里確實是在並發執行的,實際只不過是CPU在不停地切換而已,這是通過增加切換的時間來達到目的的。
10個簡單的進程可以產生這樣的效果,那試想一下,如果我有100個進程需要CPU執行,但因為CPU還要進行其它工作,只能一次再處理10個進程(切換處理),否則有可能會影響其它進程工作,這下可怎麼辦?這時候就可以用到Python中的進程池來進行調控了,在Python中,可以定義一個進程池和這個池的大小,假如定義進程池的大小為10,那麼100個進程可以分10次放進進程池中,然後CPU就可以10次並發完成這100個進程了。
(1)比較簡單的例子
程序代碼及注釋如下:
from multiprocessing import Process,Pool #導入Pool模塊 import time def sayHi(num): time.sleep(1) return num*num p = Pool(processes=5) #定義進程池的數量為5 result = p.apply_async(sayHi, [10]) #開始執行多進程,async為異步執行,即不會等待其它 #子進程的執行結果,為非阻塞模式,除非使用了get()方法,get()方法會等待子進程返回執行結果, #再去執行下一次進程,可以看後面的例子;同理下有apply方法,阻塞模式,會等待子進程返回執行結果 print result.get() #get()方法
程序執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py 100 real 0m1.066s user 0m0.016s sys 0m0.032s
雖然是定義了進程池的數量為5,但由於這裡只執行一個子進程,所以時間為1秒多。
上面的程序可以改寫為下面的形式:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(1) return num*num p = Pool(processes=5) result = p.map(sayHi,range(3)) for i in result:print i
執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool10.py 0 1 4
(2)多個進程多次並發的情況:解釋進程池作用以及多進程並發執行消耗切換時間
修改上面的程序代碼如下:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(1) return num*num p = Pool(processes=5) result_list = [] for i in range(30): result_list.append(p.apply_async(sayHi, [i])) for res in result_list: print res.get()
程序執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ python multiprocssing_pool_2_11.py 0 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400 441 484 529 576 625 676 729 784 841
每一部分數字之間有空白是因為我按了回車鍵的原因,以讓這個結果更加明顯,同時也可以知道,上面的30個進程是分6次來完成的,是因為我定義了進程池的數量為5(30/6=5),為了更有說服力,可以看一下程序的執行時間:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real real 0m6.143s user 0m0.052s sys 0m0.028s
可以看到執行的時間為6秒多,之所以不是6秒是因為主程序本身的執行需要一點時間,同時進程間的切換也是需要時間的(這裡為5個進程間的切換,因為每次並發執行的進程數為5個),為了說明這一點,我們可以把pool大小改為100,但依然是並發執行6次,程序代碼修改為如下:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(1) return num*num p = Pool(processes=100) result_list = [] for i in range(600): result_list.append(p.apply_async(sayHi, [i])) for res in result_list: print res.get()
再觀察一下執行時間:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real real 0m6.371s user 0m0.080s sys 0m0.128s
雖然相差的時間只是零點幾秒,但隨着並發執行進程數的增加,進程間切換需要的時間越來越多,程序執行的時間也就越多,特別是當單個進程非常消耗CPU資源時。
(3)驗證apply.sync方法是非阻塞的
第一個程序代碼的注釋中,我們說apply.sync方法是非阻塞的,也就是說,無論子進程是否已經執行完畢,只要主進程執行完畢,程序就會退出,看下面的探索過程,以驗證一下。
看下面的程序代碼:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(10) return num*num p = Pool(processes=5) result_list = [] for i in range(30): result_list.append(p.apply_async(sayHi, [i])) for res in result_list: print res.get()
先查看程序的執行時間:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real real 0m0.149s user 0m0.020s sys 0m0.024s
第一次運行這個程序時,出乎了我的意料,本來我以為這個程序的執行要18s左右才對的,因為子進程並發執行了6次,每一次都sleep了3s(並發執行的進程數比較少,切換的時間就不算上去了),但實際上也並非是如此,因為我查看系統進程時,情況是下面這樣的:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul* xpleaf 11499 8436 0 20:35 pts/2 00:00:00 grep --color=auto mul*
如果原來我的想法是正確的,那麼應該在這裡可以看到多個我執行的進程才對(因為有個3s的時間在子進程里,並發6次,18s,應該有才對),為什麼會沒有呢?後來我把程序代碼修改為如下:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(3) return num*num p = Pool(processes=5) result_list = [] for i in range(30): result_list.append(p.apply_async(sayHi, [i])) time.sleep(3)
即我在主程序中添加了time.sleep(3)的代碼,還是先查看時間:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real real 0m3.107s user 0m0.040s sys 0m0.032s
在上面程序執行過程中,迅速地在另一個窗口查看系統進程:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul* xpleaf 11515 1827 4 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11517 11515 0 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11518 11515 0 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11519 11515 0 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11520 11515 0 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11521 11515 0 20:39 pts/1 00:00:00 python multiprocssing_pool_2_11.py xpleaf 11526 8436 0 20:39 pts/2 00:00:00 grep --color=auto mul*
程序執行結束後,即顯示了上面的時間後,我再查看進程:
xpleaf@xpleaf-machine:~$ ps -ef | grep mul* xpleaf 11529 8436 0 20:39 pts/2 00:00:00 grep --color=auto mul*
於是,上網查找了一些資料,apply.async是非阻塞的,所謂的非阻塞是指:主進程不會等待子進程的返回結果後再結束;正常情況下,如果是產生於主進程的子進程,在主進程結束後也應該不會退出才對,但因為這裡的子進程是由pool進程池產生的,所以主進程結束,pool即關閉,因為pool池中的進程需要pool調度才能執行,因此當pool關閉後,這些子進程也隨即結束運行。
其實join方法就可以實現一個功能,就是讓子進程結束後才結束主進程,把上面的代碼修改為如下:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(3) return num*num p = Pool(processes=5) result_list = [] for i in range(30): result_list.append(p.apply_async(sayHi, [i])) p.close() #執行p.join()前需要先關閉進程池,否則會出錯 p.join() #主進程等待子進程執行完後才結束
查看執行的時間:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool_2_11.py | grep real real 0m18.160s user 0m0.048s sys 0m0.044s xpleaf@xpleaf-mac
當然,結果就是我們可以預料的了。
(4)驗證apply.async中的get()方法是阻塞的
使用apply.sync中的get()方法時,是會阻塞的,即apply.sync會等進程返回執行結果後才會執行下一個進程,其實(2)中的第一個例子就可以體現出來(程序中有get(),於是就忽略apply.async的非阻塞特性,等待子進程返回結果並使用get()獲得結果)。這裡不妨看下來一個例子,以實現雖然是多進程並發,但是因為get()的緣故,進程是串行執行的。
程序代碼如下:
from multiprocessing import Process,Pool import time def sayHi(num): time.sleep(1) return num*num p = Pool(processes=5) for i in range(20): result = p.apply_async(sayHi, [i]) print result.get()
程序執行結果如下:
xpleaf@xpleaf-machine:/mnt/hgfs/Python/day6$ time python multiprocssing_pool10.py 0 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 real 0m20.194s user 0m0.044s sys 0m0.064s
結果是一個一個輸出的,其實從程序執行的時間也可以推算出來,至於為什麼,那就是因為get()導致阻塞的原因了。
上面說得其實思路是不太清晰,主要是因為對多進程的掌握是還不夠多的,在這個探索的過程中,自己也是慢慢接觸到了許多編程思想和方法,還有和操作系統相關的知識,往後深入學習後,如果有時間,會再完善一下。