Python中的多進程、多執行緒和協程
本文中的內容來自我的筆記。撰寫過程中參考了胡俊峰老師《Python程式設計與數據科學導論》課程的內容。
並發處理:多進程和多執行緒
前置
概念:
- 並發:一段時間內同時推進多個任務,但不一定要在一個時刻同時進行多個任務。
- 並行:一段時間內同時推進多個任務,且在一個時刻要同時進行多個任務。
- 並行是並發的子集;單核CPU交替執行多個任務是並發但不是並行;多核CPU同時執行多個任務既是並發也是並行。
何時需要並發?
- 需要同時處理多個任務
- 經常需要等待資源
- 多個子過程互相協作
電腦執行任務的機制:
- 作業系統內核 負責任務(i.e. 進程/執行緒)的掛起與喚醒,和資源的分配(比如一個程式能訪問哪些記憶體地址)
- 進程是資源分配的最小單元,不同進程之間不共享資源(比如可訪問的記憶體區域);進程可再分為執行緒,執行緒之間共享大部分資源。
- 正是因為 是否共享資源 上的區別,執行緒間的切換(即掛起和喚醒)比進程間的切換更快。
- 執行緒是調度執行的最小單元。這意味著作業系統內核會負責將多個執行緒並發執行。
多進程和多執行緒的比較
多進程:
- 將任務拆分為多個進程來進行
- 由內核決定是並行還是僅僅並發。
- 進程間不共享記憶體
- 優點:一個進程掛了不影響別的
- 缺點:切換進程耗時大、進程間通訊不便
多執行緒:
- 將任務拆分為一個進程內的多個執行緒來進行
- 由內核決定是並行還是僅僅並發。
- 在CPython解釋器中有全局解釋器鎖,導致多執行緒只能並發而不能並行(多進程可以並行)。
- 進程間共享記憶體
- 優點:切換耗時低、通訊方便
- 缺點:在並行時對全局變數要使用鎖機制
- 鎖機制:一個執行緒使用一個全局變數時,先等待其(被其他執行緒)解鎖,再將其上鎖,再使用,用後再解鎖。
- 如果不使用鎖的話:100個
a+=1
的執行緒執行完成後(初始a=0
),a
可能<100
。
- 如果不使用鎖的話:100個
- 數據科學中可以為了提高效率而不使用鎖機制,但同時要容忍由此帶來的差錯。
- 鎖機制:一個執行緒使用一個全局變數時,先等待其(被其他執行緒)解鎖,再將其上鎖,再使用,用後再解鎖。
多進程的機制和程式碼實現
以下介紹的函數中,幾乎每一個有阻塞可能的,都會有一個可選的timeout
參數。這件事將不再重提。
基本用法
from multiprocessing import Process
import os
import time
def task(duration, base_time):
pid = os.getpid()
print(f'son process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
time.sleep(duration)
print(f'son process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
p1 = Process(target=task, args=(1,base_time)) # a process that executes task(1,base_time); currently not running
p2 = Process(target=task, args=(2,base_time)) # a process that executes task(2,base_time); currently not running
p1.start()
p2.start()
print(p1.is_alive(), p2.is_alive()) # whether they are running
print('main process can proceed while son processes are running')
p1.join() # wait until p1 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
p2.join() # wait until p2 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
main process id 3316 starts at 0.000001s
True True
main process can proceed while son processes are running
son process id 15640 starts at 0.002056s with parameter 1
son process id 10716 starts at 0.003030s with parameter 2
son process id 15640 ends at 1.002352s
son process id 10716 ends at 2.017861s
main process id 3316 ends at 2.114324s
如果沒有p1.join()
和p2.join()
,主進程會在很短的時間內完成執行,而此時子進程仍在運行。輸出結果如下:
main process id 11564 starts at 0.000001s
True True
main process can proceed while son processes are running
main process id 11564 ends at 0.011759s
son process id 13500 starts at 0.004392s with parameter 1
son process id 11624 starts at 0.003182s with parameter 2
son process id 13500 ends at 1.009420s
son process id 11624 ends at 2.021817s
為何0.004秒的事件在0.003秒的事件之前被輸出?
- 因為print語句的耗時未被計算在內
- 因為perf_counter()在Windows下有bug,它給出的時間在不同進程之間不完全同步。
需要注意,一個子進程結束運行後仍然處於存活狀態;只有被join()
之後才會正式死亡(即被從系統中除名)。
關於if __name__ == '__main__':
:
- 在Python中,可以通過import來獲取其他文件中的程式碼;在文件B的開頭(其他位置同理)import文件A,相當於把A在B的開頭複製一份。
- 如果在複製A的內容時,我們希望A中的一部分程式碼在執行時被忽略(比如說測試語句),就可以給A中的這些程式碼加上
if __name__ == '__main__':
- 對於從別處import來的程式碼,系統變數
__name__
在這段程式碼中會等於來源文件的名字(或模組名,這你不用在意);對於存在於本文件中的程式碼,__name__
會等於__main__
。
- 對於從別處import來的程式碼,系統變數
- 由於某些原因,在Windows下,如果一個文件的程式碼中使用了多進程,則這個文件中會隱式地import自己(一次或多次);將所有零級縮進的程式碼放在
if __name__ == '__main__':
中,可以避免產生重複執行的問題(注意到如果不這樣做的話,import來的副本中還會再次import自身,導致無限遞歸import並報錯)。- 暫時可以認為,採取這一措施後就可完全消除「隱式import自身」所產生的效應。
進程複製
from multiprocessing import Process
import os
pid = os.getpid()
def task():
global pid
print(pid)
pid = 1
print(pid)
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print(pid)
在Windows下的輸出:
4836
1
2944
在Linux下的輸出:
511
1
511
前兩個數都是由子進程輸出,第三個數由父進程輸出。
- 注意到
pid
在子進程中被賦為1後,在父進程中並不是1。這說明,子進程的target
函數中對運行環境的修改,不影響父進程的運行環境。事實上,反之也是成立的(父不影響子)。也就是說,一旦子進程的運行環境完成創建之後,父進程的運行環境與子進程的運行環境之間就完全獨立。- 由於這個獨立性,子進程的運算結果也無法直接回饋給父進程。稍後會介紹兩種解決方式:1. 進程間通訊 2. 利用 進程池
apply
方法的返回值。
- 由於這個獨立性,子進程的運算結果也無法直接回饋給父進程。稍後會介紹兩種解決方式:1. 進程間通訊 2. 利用 進程池
- 注意到一三行的輸出在Windows下不同,而在Linux下相同。這說明,子進程中全局變數
pid
的取值,在Linux下是直接複製父進程中pid
的取值而得到的,在Windows下是通過重新運行pid = os.getpid()
而得到的。更一般地,有以下這兩個事實:- 在Windows中,
Process(target)
創建出的子進程是一張白紙(即運行環境空空如也);當調用start()
的時候,它會先通過import
語句來將父進程的整個程式碼文件完整執行一遍(從而創建出一個新的運行環境),然後再開始運行target
函數。所以,if __name__ == '__main__':
包起來的程式碼,就只會被父進程執行;而未被包起來的零級縮進程式碼,則也會被每個子進程(在自己的運行環境里)各自執行一遍。- 這就是之前提到的「隱式import自身」的機制。
- 在Linux中,
Process(target)
創建出的子進程,會全盤複製父進程的運行環境,而不會自己重新創建。複製出來的子進程運行環境,與父進程的運行環境完全獨立。
- 在Windows中,
Linux下的進程複製方式稱為fork,Windows下的進程複製方式稱為spawn。關於這些,詳見 //stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn 。
from multiprocessing import Process
import os
def task():
pass
if __name__ == '__main__':
p = Process(target=task)
print('son process created')
p.start()
print('son process starts')
p.join()
print('son process ends')
print('gu?')
在Windows下的輸出
son process created
son process starts
gu?
son process ends
gu?
由此可見,Windows下子進程(在初始化時)在執行父進程的程式碼文件時,父進程中son_process.start()
以後的內容(比如print('gu?')
)也會被執行。
進程池
如果我們有很多的任務要同時進行,為每個任務各開一個進程既低效(創建和銷毀進程開銷大、無法全部並行、內核難以調度)又可能不被內核允許。
解決方案:使用進程池,池中放著幾個進程(一般不會比CPU的核數多很多),有新任務時找一個空閑進程分配給它,沒有空閑進程則等待。缺點是沒有空閑進程時需要等待,因此不能算是完全的並發。
進程池的基本用法
from multiprocessing import Pool
import os, time
def task(duration, base_time, task_name):
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3) # a pool containing 3 subprocesses
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running
# if all son processes are busy, wait until one is free and then start
pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running.
print('all tasks assigned; wait for son processes to finish')
pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
輸出:(Win和Linux下輸出相似)
main process id 5236 starts at 0.000002s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 8724 starts working on TaskNo.1 at 0.030557s with parameter 1
son process id 14584 starts working on TaskNo.2 at 0.037581s with parameter 1
son process id 10028 starts working on TaskNo.3 at 0.041210s with parameter 1
son process id 14584 ends working on TaskNo.2 at 1.042662s
son process id 8724 ends working on TaskNo.1 at 1.040211s
son process id 14584 starts working on TaskNo.4 at 1.044109s with parameter 1
son process id 10028 ends working on TaskNo.3 at 1.054017s
son process id 14584 ends working on TaskNo.4 at 2.055515s
all tasks finished at 2.214534s
main process id 5236 ends at 2.214884s
當使用apply_async
(「非同步調用」)添加任務時,主進程在子進程執行任務期間會繼續運行;如果用apply
(「同步調用」)添加任務,則主進程會暫停(「阻塞」)直到該任務完成。一般使用apply_async
而不是apply
。
進程池中的進程複製
from multiprocessing import Pool
import os, time
all_tasks_on_this_son_process = []
def task(duration, base_time, task_name):
global all_tasks_on_this_son_process
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
all_tasks_on_this_son_process += [task_name]
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3)
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
pool.close()
print('all tasks assigned; wait for son processes to finish')
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
print('gu?')
Windows下輸出:
main process id 6116 starts at 0.000001s
start assigning tasks
all tasks assigned; wait for son processes to finish
gu?
gu?
gu?
son process id 16028 starts working on TaskNo.1 at 0.037577s with parameter 1, this process already executed []
son process id 11696 starts working on TaskNo.2 at 0.041393s with parameter 1, this process already executed []
son process id 5400 starts working on TaskNo.3 at 0.038409s with parameter 1, this process already executed []
son process id 11696 ends working on TaskNo.2 at 1.041521s
son process id 16028 ends working on TaskNo.1 at 1.038722s
son process id 11696 starts working on TaskNo.4 at 1.042543s with parameter 1, this process already executed ['TaskNo.2']
son process id 5400 ends working on TaskNo.3 at 1.052573s
son process id 11696 ends working on TaskNo.4 at 2.053483s
all tasks finished at 2.167447s
main process id 6116 ends at 2.167904s
gu?
在Windows下,池中的每個執行緒會在(且僅在)它分配到的的第一個任務將要開始執行時,運行一遍父進程的程式碼以構建運行環境。一個進程在前一個任務中對運行環境的改變,會原樣體現在下一個任務的運行環境里。(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)
Linux下輸出:
main process id 691 starts at 0.000001s
all tasks assigned; wait for son processes to finish
son process id 692 starts working on TaskNo.1 at 0.104757s with parameter 1, this process already executed []
son process id 693 starts working on TaskNo.2 at 0.104879s with parameter 1, this process already executed []
son process id 694 starts working on TaskNo.3 at 0.105440s with parameter 1, this process already executed []
son process id 692 ends working on TaskNo.1 at 1.106427s
son process id 693 ends working on TaskNo.2 at 1.106426s
son process id 694 ends working on TaskNo.3 at 1.107157s
son process id 692 starts working on TaskNo.4 at 1.107560s with parameter 1, this process already executed ['TaskNo.1']
son process id 692 ends working on TaskNo.4 at 2.110033s
all tasks finished at 2.117158s
main process id 691 ends at 2.117452s
gu?
在Linux下,池中的每個執行緒會在(且僅在)它的第一個任務將要開始執行時,從父進程將運行環境完整複製一遍。一個進程在前一個任務中對運行環境的改變,會原樣體現在下一個任務的運行環境里。(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)
from multiprocessing import Pool
import os, time
all_tasks_on_this_son_process = []
def init(init_name):
global all_tasks_on_this_son_process
all_tasks_on_this_son_process += [init_name]
def task(duration, base_time, task_name):
global all_tasks_on_this_son_process
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
all_tasks_on_this_son_process += [task_name]
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3, initializer=init, initargs=('init',)) # look here
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
pool.close()
print('all tasks assigned; wait for son processes to finish')
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
輸出(Win下與Linux下相似):
main process id 18416 starts at 0.000004s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 10052 starts working on TaskNo.1 at 0.053483s with parameter 1, this process already executed ['init']
son process id 17548 starts working on TaskNo.2 at 0.040412s with parameter 1, this process already executed ['init']
son process id 10124 starts working on TaskNo.3 at 0.049992s with parameter 1, this process already executed ['init']
son process id 10124 ends working on TaskNo.3 at 1.054387s
son process id 17548 ends working on TaskNo.2 at 1.044956s
son process id 10052 ends working on TaskNo.1 at 1.062396s
son process id 10124 starts working on TaskNo.4 at 1.055888s with parameter 1, this process already executed ['init', 'TaskNo.3']
son process id 10124 ends working on TaskNo.4 at 2.060094s
all tasks finished at 2.443017s
main process id 18416 ends at 2.444705s
在進程池中利用子進程的返回值
from multiprocessing import Pool
import time
def task(duration, base_time, task_name):
time.sleep(duration)
return f'{task_name} finished at {"%.6f" % (time.perf_counter()-base_time)}s'
if __name__ == '__main__':
base_time = time.perf_counter()
pool = Pool(2)
return_values = []
return_values.append(pool.apply(task, args=(1,base_time,'TaskNo.1_sync')))
print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
return_values.append(pool.apply_async(task, args=(2,base_time,'TaskNo.2_async')))
print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
pool.close()
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
assert return_values[1].ready() == True
return_values[1] = return_values[1].get() # from ApplyResult to true return value
print('results:', return_values)
at time 1.2109459, r_v is ['TaskNo.1_sync finished at 1.027223s']
at time 1.2124976, r_v is ['TaskNo.1_sync finished at 1.027223s', <multiprocessing.pool.ApplyResult object at 0x0000016D24D79AC0>]
all tasks finished at 3.258190s
results: ['TaskNo.1_sync finished at 1.027223s', 'TaskNo.2_async finished at 3.041053s']
這裡在pool.join()
之後調用result.get()
,所以可以立刻得到 子進程所執行的函數的返回值;如果在對應的子進程尚未return
時就調用result.get()
,則主進程會阻塞直到子進程返回,然後獲取子進程所執行的函數的返回值。result.ready()
返回一個bool
,表示對應的子進程是否已經return
。
此外,result.wait()
會阻塞直到子進程返回,但不會獲取返回值。
一個ApplyResult
實例可以多次調用get()
,即可以多次獲取返回值。
詳見 //docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult 。
進程間通訊
可以認為,任何一個被跨進程傳送的對象,在傳送過程中都會被深拷貝。
Pipe
from multiprocessing import Process, Pipe
import time
def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
conn.send(content)
print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = conn.recv()
print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task(conn, pipe_name, process_name, base_time):
receive_from_pipe(conn, pipe_name, process_name, base_time)
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 142857, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
conn_A, conn_B = Pipe() # two endpoints of the pipe
p1 = Process(target=task, args=(conn_B,'pipe','son',base_time))
p1.start()
time.sleep(1)
send_through_pipe(conn_A, 'pipe', 'main', ['hello','hello','hi'], base_time) # any object can be sent
receive_from_pipe(conn_A, 'pipe', 'main', base_time)
p1.join()
son tries to receive content from pipe at 0.036439
main tries to send ['hello', 'hello', 'hi'] through pipe at 1.035570
main successfully finishes sending at 1.037174
main tries to receive content from pipe at 1.037318
son successfully receives ['hello', 'hello', 'hi'] at 1.037794
son tries to send 142857 through pipe at 2.039058
son successfully finishes sending at 2.040158
main successfully receives 142857 at 2.040441
另外,還可以用conn.poll()
(返回Bool類型)來獲知conn
中是否有對面發來的未讀資訊。
from multiprocessing import Process, Pipe
import time
def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
conn.send(content)
print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = conn.recv()
print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task1(conn, pipe_name, process_name, base_time):
receive_from_pipe(conn, pipe_name, process_name, base_time)
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
def task2(conn, pipe_name, process_name, base_time):
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
time.sleep(2)
receive_from_pipe(conn, pipe_name, process_name, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
conn_A, conn_B = Pipe()
p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time))
p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time))
p1.start()
p2.start()
p1.join()
p2.join()
son1 tries to receive content from pipe at 0.033372
son2 tries to send greetings from son2 through pipe at 1.058998
son2 successfully finishes sending at 1.060660
son1 successfully receives greetings from son2 at 1.061171
son1 tries to send greetings from son1 through pipe at 2.062389
son1 successfully finishes sending at 2.063290
son2 tries to receive content from pipe at 3.061378
son2 successfully receives greetings from son1 at 3.061843
由此可見:
Pipe
可以暫存數據,而且其暫存的數據符合FIFO規則。- 但是,
Pipe
用於暫存數據的區域大小比較有限(具體大小隨OS而定),如果這個區域滿了,send()
就會被阻塞,直到對面用recv()
騰出位置為止。
- 但是,
Pipe
的兩個端點可以分配給任意兩個進程。- 不建議把同一個端點分配給多個進程,這可能會帶來風險;如果確實需要的話,請使用
Queue
。
- 不建議把同一個端點分配給多個進程,這可能會帶來風險;如果確實需要的話,請使用
Queue
本質上是一個能夠跨進程運行的隊列。
Queue
的操作的時間開銷約為Pipe
中對應操作的兩倍。
from multiprocessing import Process, Queue
import time
def put_into_queue(q, queue_name, putter_name, content, base_time):
print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
q.put(content)
print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time))
def get_from_queue(q, queue_name, getter_name, base_time):
print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = q.get()
print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task1(q, delay, queue_name, process_name, base_time):
time.sleep(delay)
put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
time.sleep(5)
get_from_queue(q, queue_name, process_name, base_time)
def task2(q, delay, queue_name, process_name, base_time):
time.sleep(delay)
get_from_queue(q, queue_name, process_name, base_time)
time.sleep(5)
put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
q = Queue()
put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time))
get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time))
get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time))
put_and_get_1.start()
get_and_put_1.start()
get_and_put_2.start()
put_and_get_1.join()
get_and_put_1.join()
get_and_put_2.join()
putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883
putAndGet_No.1 successfully finishes putting at 0.079291
getAndPut_No.1 tries to receive content from queue at 1.104196
getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489
getAndPut_No.2 tries to receive content from queue at 2.126434
putAndGet_No.1 tries to receive content from queue at 5.081044
getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381
getAndPut_No.1 successfully finishes putting at 6.107820
getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565
getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579
getAndPut_No.2 successfully finishes putting at 11.112493
putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546
另外,如果Queue的大小實在過大以至於達到了某個上限,則put()
操作也會被阻塞。不過應該很難把大小弄到那麼大。
多執行緒
基本語法和多進程很相似,但機制上有重要的不同。由於全局解釋器鎖的存在,Python多執行緒並不實用,這裡僅作簡單介紹。
從下圖中可以看到,多執行緒的基本程式碼和多進程完全一致。下圖中的程式碼在CPython解釋器中會運行大約3s。
另外,多執行緒中其實不需要這個if __name__ == '__main__':
的判斷。
多執行緒的變數機制
import threading
lock_n = threading.Lock()
n = 0
def inc_n(m):
global n
lock_n.acquire(blocking=True)
n += m
lock_n.release()
threads = [threading.Thread(target=inc_n, args=(i,)) for i in range(1,11)]
[t.start() for t in threads]
[t.join() for t in threads]
print(n)
55
- 由上可見,不同的執行緒之間共享運行環境(比如上面的變數
n
)。 lock.acquire(blocking=True)
會一直阻塞直到鎖空出來為止;一旦空出來就會把它鎖上。
並發處理:協程
不同的過程在同一個執行緒內交替執行。每個協程在運行時獨佔資源,一段運行結束後自阻塞,等待著被外部(如main函數)控制它的程式碼喚醒。
相比多執行緒的優點:輕量級(在解釋器層面實現,不需要內核來做切換)、數量不限。
和多執行緒一樣,不同協程之間共用運行環境。
用簡單的生成器實現協程
def sum(init):
s = init
while True:
delta = yield s # output s, and then input delta
s += delta
g = sum(0)
print(next(g)) # run entil receiving the first output
print(g.send(1)) # send the first input, and then get the second output
print(g.send(2)) # send the second input, and then get the third output
0
1
3
上例中只是演示了生成器的自阻塞,以及生成器與其調用者之間的交互。
更進一步,還可以定義多個生成器執行不同的過程,並在main函數里進行對它們的調度(比如實現一個任務隊列),從而實現協程。
用回調函數(callback)將普通函數變為協程
def calc(n,callback):
r = 0
for i in range(n):
r += i
callback()
def pause():
print('pause')
yield # just pause, do not return anything
g = calc(10,pause)
用async
/await
實現協程
相比生成器實現的優點:可以在等待IO/等待網路通訊等情況下時阻塞當前協程執行其他協程(而且不會中斷等待IO/通訊)以節省時間(而只用生成器則無法做到);使用更靈活、方便。
- 多執行緒其實也有前一個優點。所以CPython下的多執行緒也並不是毫無用處,但它的用處是協程用處的子集。
- 一個注意事項:若想通過協程加速IO,必須使用python中專門的非同步IO庫才行。
基礎使用
import time
start = time.perf_counter()
def sayhi(delay):
time.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
def main():
sayhi(1)
sayhi(2)
main()
hi! at 1.0040732999914326
hi! at 3.015253899997333
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await sayhi1
await sayhi2
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0037910000100965
hi! at 2.0026504999987083
上面的程式中:
async
聲明當前函數是一個協程。這一聲明使得函數體內可以使用create_task
和await
,也使得該函數本身可以被create_task
和await
。- 一旦一個函數
f
被聲明為協程,f()
做的事就不再是運行f
,而只是創建一個協程對象並返回之(這個對象並不會自動被運行)。需要使用asyncio
中的相關工具來運行這個對象。
- 一旦一個函數
run(main())
表示開始執行協程main()
。要求main()
必須是「主協程」,即它是整個程式中所有協程的入口(類似主函數)。一個程式中run(main())
只應被調用一次,且在main()
之外不應有任何協程被調用。run(main())
是阻塞的。協程的並發特性只有在main()
內部才會顯現,從外部來看這就是一個普普通通的黑箱調用。run()
的作用是啟動 運行協程所需的環境(並在main()
完成後將其關閉)。但在IPython中,一開始運行就已經自動幫你啟動好了,所以可以直接用await
(而且也不必把所有協程都放在一個主協程中,而可以散布在程式各處)。
-
create_task(sayhi(1))
表示為協程sayhi(1)
在某個「任務池」中創建一個任務,並且開始執行該任務。返回值是這個任務的handle,或者說「遙控器」。- 任務池中的任務會並發地執行。任務在何時可以中斷並切換到別的任務,這一點由
await
指定。
- 任務池中的任務會並發地執行。任務在何時可以中斷並切換到別的任務,這一點由
-
await sayhi1
有兩重含義:-
- 阻塞當前協程(該語句所在的協程,這裡是
main()
)的執行,直到任務sayhi1
完成。(類似Process.join()
)
- 阻塞當前協程(該語句所在的協程,這裡是
-
- 告訴解釋器,現在當前協程(該語句所在的協程)開始阻塞,你可以切換協程了。
- 如果這裡
await
的不是sayhi1
而是,比如說,一個接受http請求的操作,那麼在解釋器切換協程後不會影響對這個請求的等待。這就是asyncio
的強大之處。- 這一點在
await asyncio.sleep(delay)
就有體現。asyncio.sleep()
就具有「切換協程不影響等待」的特性。
- 這一點在
-
-
關於
await
的幾件事:await
的可以不是已創建的任務而是一個協程對象(比如await sayhi(1)
),此時不會將其加入任務池,而會直接開始執行(當然,也可能剛開始執行就被切換到別的協程,因為用了await
),並一直阻塞直到完成。這會導致sayhi(1)
無法作為一個任務、與其他任務平等地參與並發,但是它仍然可以隨著父協程(這裡是main()
)的中斷和恢復而間接地參與並發。- 能夠被
await
的不只有協程對象和任務handle,還有任何awaitable object
,即任何實現了__await__
方法(從而告訴了解釋器如何在自己剛開始執行時就阻塞並切換協程,且不影響內部可能在進行的等待和其他操作)的對象。 await
的對象只可能在剛開始執行時立刻阻塞並切換協程。執行過程中其他可以阻塞的位置,是由這個對象內部使用的其他await
語句指定的,而不是調用這個對象的那條await
語句。
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
await sayhi(1)
await sayhi(2)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0072715999995125
hi! at 3.0168006000021705
wait_for()
把await A
(A
為任意awaitable
)改成await asyncio.wait_for(A,timeout)
,就可以給await
操作加上timeout
秒的時限,一旦await
了這麼多秒還沒有結束,就會中斷A
的執行並拋出asyncio.TimeoutError
。
不用關心wait_for
具體做了什麼,你只需要記住await asyncio.wait_for(A,timeout)
這個句子就行。可以認為這個句子和await A
在(除了timeout以外的)其他方面上沒有區別。下面是例子。
import time
import asyncio
async def eternity():
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
timeout!
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await asyncio.wait_for(sayhi1,1.05)
await asyncio.wait_for(sayhi2,1.05)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0181081000046106
hi! at 2.0045300999918254
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await asyncio.wait_for(sayhi1,0.95)
await asyncio.wait_for(sayhi2,1.05)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-89-7f639d54114e> in <module>
15
16 # asyncio.run(main()) # use outside IPython
---> 17 await main() # use inside IPython
<ipython-input-89-7f639d54114e> in main()
11 sayhi1 = asyncio.create_task(sayhi(1))
12 sayhi2 = asyncio.create_task(sayhi(2))
---> 13 await asyncio.wait_for(sayhi1,0.95)
14 await asyncio.wait_for(sayhi2,1.05)
15
~\anaconda3\lib\asyncio\tasks.py in wait_for(fut, timeout, loop)
488 # See //bugs.python.org/issue32751
489 await _cancel_and_wait(fut, loop=loop)
--> 490 raise exceptions.TimeoutError()
491 finally:
492 timeout_handle.cancel()
TimeoutError:
hi! at 2.0194762000028277
另外,注意到即使協程sayhi1
拋出了異常,父協程main()
仍然能夠繼續執行sayhi2
。可見不同協程間是有一定的獨立性的。
實現生產者-消費者協程
為此需要使用 asyncio.Queue
。它相比普通的隊列的區別是,其put/get
操作會在無法執行時阻塞(這一點和multiprocessing.Queue
很像),而且這些操作都是協程(注意到,這使得你調用它們時只會返回協程對象而不會實際執行),可以await
。
import time
import asyncio
start = time.perf_counter()
async def producer(q):
for i in range(5):
await asyncio.sleep(1) # producing takes 1 sec
await q.put(i) # will wait if q is full
print(f'put {i} at {time.perf_counter() - start}')
await q.join() # will wait until all objects produced are **taken out** and **consumed**.
async def consumer(q):
for i in range(5):
item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command.
print(f'get {item} at {time.perf_counter() - start}')
await asyncio.sleep(1) # consuming takes 1 sec
q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough!
print(f'consumed {item} at {time.perf_counter() - start}')
async def main():
q = asyncio.Queue()
P = asyncio.create_task(producer(q))
C = asyncio.create_task(consumer(q))
await P
await C
print(f'done at {time.perf_counter() - start}')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
put 0 at 1.0108397000003606
get 0 at 1.0112231999955839
put 1 at 2.017216499996721
consumed 0 at 2.0176210000063293
get 1 at 2.0177472999930615
put 2 at 3.0279211000015493
consumed 1 at 3.0283254999958444
get 2 at 3.028457599997637
put 3 at 4.039952199993422
consumed 2 at 4.041183299996192
get 3 at 4.041302300000098
put 4 at 5.0465819999953965
consumed 3 at 5.04690839999239
get 4 at 5.047016099997563
consumed 4 at 6.047789799995371
done at 6.048323099996196
import time
import asyncio
start = time.perf_counter()
async def sleep_and_put(q):
await asyncio.sleep(1)
await q.put(1)
async def main():
q = asyncio.Queue()
C = asyncio.create_task(q.get())
P = asyncio.create_task(sleep_and_put(q))
await C
await P
print(f'finished at {time.perf_counter() - start}')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
finished at 1.01112650000141
由上例可見,Queue.get()
(其實Queue.put()
等其他方法也一樣)是一個協程,因此也可以給它創建任務以進行並發。