Python學習—pyhton中的進程
- 2020 年 1 月 6 日
- 筆記
1.進程定義
進程: 進程就是一個程式在一個數據集上的一次動態執行過程。進程一般由程式、數據、進程式控制制塊(pcb)三部分組成。 (1)我們編寫的程式用來描述進程要完成哪些功能以及如何完成; (2)數據則是程式在執行過程中所需要使用的資源; (3)進程式控制制塊用來記錄進程的所有資訊。系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標誌。
2.創建進程
新創建的進程在記憶體獨立開闢一塊空間,不與其他進程共享空間、數據。 同一個進程中,新創建的執行緒與此進程里其他執行緒共享空間、數據。
1.os.fork()函數
os模組的三個方法: os.fork()創建一個當前進程的子進程 os.getpid()獲取當前進程pid os.getppid()獲取當前進程的父進程的Pid 關於fork(): 它用來創建一個進程,即為當前進程的子進程,複製父進程的所有程式碼並從fork語句處開始運行。運行父進程還是子進程的取決於當前os調度策略。 在父進程中返回子進程的pid,在子進程中返回0。即返回0表示在子進程中運行,返回大與0的數表示在父進程中運行。
例子:
import os print('當前進程:',os.getpid()) print('當前進程的父進程:',os.getppid()) pid = os.fork() if pid == 0: print('此時為子進程:',os.getpid(),'n其父進程:',os.getppid()) else: print('父進程:',os.getpid(),'nos.fork的返回值pid:',pid)
運行結果:
當前進程: 16839 當前進程的父進程: 2912 父進程: 16839 os.fork的返回值pid: 16842 此時為子進程: 16842 其父進程: 16839
從運行結果中看,在linux中fork產生子進程後是先運行父進程,當父進程結束後再進入子進程運行。
2.實例化進程類
直接通過實例化進程類multiprocessing.Process創建新進程。 和執行緒類一樣,進程類也有start()方法,join()方法。調用對象的start()方法實例上也是調用的類中的run()方法。
# 導入進程模組 import multiprocessing import os def job(ss): print(ss,'當前子進程:%s' %os.getpid()) #實例化進程類,並提交任務,傳入任務所需要的參數 p1 = multiprocessing.Process(target=job,args=('abc',)) p1.start() p2 = multiprocessing.Process(target=job,args=('123',)) p2.start() # 和執行緒一樣,進程也有join方法。 p1.join() p2.join() print('完成......')
運行結果:
abc 當前子進程:17234 123 當前子進程:17235 完成......
3.繼承進程類來自定義進程類
繼承python提供的進程類,重寫方法,創建自己所需要的進程類,再實例化自定義的進程類。
import multiprocessing class Job(multiprocessing.Process): #重寫構造方法 def __init__(self,cc): super(Job, self).__init__() self.cc = cc #重寫run方法,和執行緒一樣 def run(self): print(self.cc) #實例化對象 if __name__ == "__main__": pp = [] for i in range(10): p = Job(str(i)+':123456') pp.append(p) p.start() for p in pp: p.join() print('hahhahaha')
運行結果:
0:123456 1:123456 2:123456 3:123456 4:123456 5:123456 6:123456 7:123456 8:123456 9:123456 hahhahaha
3.多進程與多執行緒的對比
import threading import multiprocessing from timeit import timeit class Jobthread(threading.Thread): def __init__(self,li): super(Jobthread,self).__init__() self.li = li def run(self): sum(self.li) class Jobprocess(multiprocessing.Process): def __init__(self,li): super(Jobprocess, self).__init__() self.li = li def run(self): for i in self.li: sum(i) # 這個裝飾器是自己寫的,用來計算某個函數執行時間 @timeit def use_Pro(list): for i in range(0,len(list), 1000): p = Jobprocess(list[i:i+1000]) p.start() @timeit def use_Thr(list): for li in list: t = Jobthread(li) t.start if __name__ == "__main__": list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000 use_Pro(list) use_Thr(list)
運行結果:
use_Pro運行時間0.0041866302490234375 use_Thr運行時間0.02240157127380371
正如看到的結果一樣,多進程適合計算密集型任務,多執行緒適合i/o密集型任務。
3.守護進程與終止進程
1.守護進程-daemon屬性
和執行緒類似,進程類也有一個daemon屬性,默認值為False。 當改變他的值為True時,當主進程結束,就會強行終止其他的所以進程。 實例: (1)第一個程式
import multiprocessing import time def job(): print('開始子進程') time.sleep(3) print('子進程結束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.start() print("程式結束......")
運行結果:
程式結束...... 開始子進程 子進程結束
主進程結束,其他進程還在繼續執行。 (2)第二個程式
import multiprocessing import time def job(): print('開始子進程') time.sleep(3) print('子進程結束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.daemon = True p.start() print("程式結束......")
運行結果:
程式結束......
當主進程結束,其他進程將會被強制終止結束。
2.終止進程
import multiprocessing import time def job(): print('開始子進程') time.sleep(3) print('子進程結束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.daemon = True print(p.is_alive()) #啟動進程之前查看進程狀態 p.start() print(p.is_alive()) #啟動進程之後查看進程狀態 p.terminate() #終止進程 print(p.is_alive()) #終止進程命令一發出後,查看進程狀態。此時進程在釋放過程中,還沒有被完全釋放。 p.join() #先讓進程完全釋放 print(p.is_alive()) #最後查看進程狀態 print("程式結束......")
運行結果:
False True True False 程式結束......
4.進程間通訊
""" 通過隊列實現進程間通訊,隊列充當消息管道的作用(類似生產者消費者模型) 這裡通訊一直存在,也就是這兩個進程會一直存在,沒有銷毀釋放。 """ import multiprocessing from multiprocessing import Queue import time class Put_news(multiprocessing.Process): def __init__(self,queue): super(Put_news, self).__init__() self.queue = queue def run(self): for i in range(100): self.queue.put(i) print("傳遞消息:%s" %i) time.sleep(0.1) class Get_news(multiprocessing.Process): def __init__(self,queue): super(Get_news, self).__init__() self.queue = queue def run(self): while True: time.sleep(0.11) print("接收消息++++++++++++:%s" %(self.queue.get())) if __name__ == "__main__": q = Queue() p = Put_news(q) g = Get_news(q) p.start() g.start() if not p.is_alive(): g.terminate()
運行結果:


5.分散式進程
任務需要處理的數據特別大, 希望多台主機共同處理任務。multiprocessing.managers子模組裡面可以實現將進程分布到多台機器上 (管理端主機要運算一些列任務,通過與其他主機建立「連接「,將任務分配給其他主機執行,並將執行結果返回給管理端主機。) 管理端主機程式碼:
import random from queue import Queue from multiprocessing.managers import BaseManager # 1.創建隊列(發送任務的隊列,收取結果的隊列) task_queue = Queue() result_queue = Queue() # 第二三步驟可以互換順序 # 2.將隊列註冊到網路(這樣其他主機可以通過網路接收任務,發送結果) # 註冊的隊列(任務隊列,結果隊列)的唯一標識碼分別為'put_task_queue','get_result_queue' BaseManager.register('put_task_queue',callable=lambda :task_queue) BaseManager.register('get_result_queue',callable=lambda : result_queue) # 3.綁定埠(3333),設定密碼(hahahaha) manager = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha') # 4.啟動manager,開始共享隊列 manager.start() # 5.通過網路訪問共享的隊列 task = manager.put_task_queue() result = manager.get_result_queue() # 6.向任務隊列中放入執行任務的數據,這裡放入100個任務 for i in range(100): n = random.randint(10,500) task.put(n) print('任務列表加入數據:'+str(n)) # 7.從結果隊列中讀取各個主機的任務執行結果 for j in range(100): res = result.get() print('執行結果:'+str(res)) # 8.任務執行結束,關閉共享隊列 manager.shutdown()
運算主機程式碼:
""" 在各個工作主機上執行的程式碼相同 """ from multiprocessing.managers import BaseManager # 1. 連接manager端,獲取共享的隊列 import time worker = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha') # 2.註冊隊列,去獲取網路上共享的隊列中的內容 BaseManager.register('put_task_queue') BaseManager.register('get_result_queue') # 3.連接網路 worker.connect() # 4.通過網路訪問共享的隊列 task = worker.put_task_queue() result = worker.get_result_queue() # 5.讀取任務,處理任務,這裡讀取了50個任務進行處理 # 每台運算主機上的處理任務數量可以不同,不過為了避免修改程式碼,一般都相同。 for i in range(50): n = task.get() print('執行任務 %d**2 = '%(n)) res = '%d**2=%d' %(n,n**2) #這裡設置執行的任務是求平方 result.put(res) #將結果放入結果隊列 time.sleep(1) #休息1秒 print('工作主機執行任務結束.....')
6.進程池
和執行緒一樣,進程也有進程池。 1.第一種方法
import multiprocessing import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(3) # 創建含有8個進程的進程池 pool = multiprocessing.Pool(8) # 給進城池的進程分配任務 for i in range(12): pool.apply_async(job,args=(i,)) # 關閉進程池,使進程池不再工作運行 pool.close() # 等待所有子進程結束之後再開始主進程 pool.join() print('all works completed!')
運行結果:
start id ---> 0 end id ----> 0 start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4 start id ---> 5 end id ----> 5 start id ---> 6 end id ----> 6 start id ---> 7 end id ----> 7 start id ---> 8 end id ----> 8 start id ---> 9 end id ----> 9 start id ---> 10 end id ----> 10 start id ---> 11 end id ----> 11 all works completed!
2.第二種方法
from concurrent.futures import ProcessPoolExecutor import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(3) # 創建含有2個進程的進程池 pool = ProcessPoolExecutor(max_workers=2) # 給進程池的進程分配任務,submit方法返回一個_base.Future對象 f1 = pool.submit(job,1) f2 = pool.submit(job,2) f3 = pool.submit(job,3) f4 = pool.submit(job,4) # 執行f1對象的各種方法 f1.done() f1.result()
運行結果:
start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4
3.第三種方法
from concurrent.futures import ProcessPoolExecutor import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(1) pool = ProcessPoolExecutor(max_workers=3) pool.map(job,range(1,10))
運行結果:
start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4 start id ---> 5 end id ----> 5 start id ---> 6 end id ----> 6 start id ---> 7 end id ----> 7 start id ---> 8 end id ----> 8 start id ---> 9 end id ----> 9