­

Python學習—pyhton中的進程

1.進程定義

進程: 進程就是一個程式在一個數據集上的一次動態執行過程。進程一般由程式、數據、進程式控制制塊(pcb)三部分組成。 (1)我們編寫的程式用來描述進程要完成哪些功能以及如何完成; (2)數據則是程式在執行過程中所需要使用的資源; (3)進程式控制制塊用來記錄進程的所有資訊。系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標誌。

2.創建進程

新創建的進程在記憶體獨立開闢一塊空間,不與其他進程共享空間、數據。 同一個進程中,新創建的執行緒與此進程里其他執行緒共享空間、數據。

1.os.fork()函數

os模組的三個方法: os.fork()創建一個當前進程的子進程 os.getpid()獲取當前進程pid os.getppid()獲取當前進程的父進程的Pid 關於fork(): 它用來創建一個進程,即為當前進程的子進程,複製父進程的所有程式碼並從fork語句處開始運行。運行父進程還是子進程的取決於當前os調度策略。 在父進程中返回子進程的pid,在子進程中返回0。即返回0表示在子進程中運行,返回大與0的數表示在父進程中運行。

例子:

import os    print('當前進程:',os.getpid())  print('當前進程的父進程:',os.getppid())    pid = os.fork()  if pid == 0:      print('此時為子進程:',os.getpid(),'n其父進程:',os.getppid())  else:      print('父進程:',os.getpid(),'nos.fork的返回值pid:',pid)

運行結果:

當前進程: 16839  當前進程的父進程: 2912  父進程: 16839  os.fork的返回值pid: 16842  此時為子進程: 16842  其父進程: 16839

從運行結果中看,在linux中fork產生子進程後是先運行父進程,當父進程結束後再進入子進程運行。

2.實例化進程類

直接通過實例化進程類multiprocessing.Process創建新進程。 和執行緒類一樣,進程類也有start()方法,join()方法。調用對象的start()方法實例上也是調用的類中的run()方法。

# 導入進程模組  import multiprocessing  import os    def job(ss):      print(ss,'當前子進程:%s' %os.getpid())    #實例化進程類,並提交任務,傳入任務所需要的參數  p1 = multiprocessing.Process(target=job,args=('abc',))  p1.start()  p2 = multiprocessing.Process(target=job,args=('123',))  p2.start()    # 和執行緒一樣,進程也有join方法。  p1.join()  p2.join()    print('完成......')

運行結果:

abc 當前子進程:17234  123 當前子進程:17235  完成......

3.繼承進程類來自定義進程類

繼承python提供的進程類,重寫方法,創建自己所需要的進程類,再實例化自定義的進程類。

import multiprocessing    class Job(multiprocessing.Process):      #重寫構造方法      def __init__(self,cc):          super(Job, self).__init__()          self.cc = cc        #重寫run方法,和執行緒一樣      def run(self):          print(self.cc)    #實例化對象  if __name__ == "__main__":      pp = []      for i in range(10):          p = Job(str(i)+':123456')          pp.append(p)          p.start()        for p in pp:          p.join()      print('hahhahaha')

運行結果:

0:123456  1:123456  2:123456  3:123456  4:123456  5:123456  6:123456  7:123456  8:123456  9:123456  hahhahaha

3.多進程與多執行緒的對比

import threading  import multiprocessing  from timeit import timeit    class Jobthread(threading.Thread):      def __init__(self,li):          super(Jobthread,self).__init__()          self.li = li      def run(self):          sum(self.li)    class Jobprocess(multiprocessing.Process):      def __init__(self,li):          super(Jobprocess, self).__init__()          self.li = li      def run(self):          for i in self.li:              sum(i)    # 這個裝飾器是自己寫的,用來計算某個函數執行時間  @timeit  def use_Pro(list):      for i in range(0,len(list), 1000):          p = Jobprocess(list[i:i+1000])          p.start()    @timeit  def use_Thr(list):      for li in list:          t = Jobthread(li)          t.start    if __name__ == "__main__":      list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000      use_Pro(list)      use_Thr(list)

運行結果:

use_Pro運行時間0.0041866302490234375  use_Thr運行時間0.02240157127380371

正如看到的結果一樣,多進程適合計算密集型任務,多執行緒適合i/o密集型任務。

3.守護進程與終止進程

1.守護進程-daemon屬性

和執行緒類似,進程類也有一個daemon屬性,默認值為False。 當改變他的值為True時,當主進程結束,就會強行終止其他的所以進程。 實例: (1)第一個程式

import multiprocessing  import time    def job():      print('開始子進程')      time.sleep(3)      print('子進程結束')    if __name__ == "__main__":      p = multiprocessing.Process(target=job)      p.start()      print("程式結束......")

運行結果:

程式結束......  開始子進程  子進程結束

主進程結束,其他進程還在繼續執行。 (2)第二個程式

import multiprocessing  import time    def job():      print('開始子進程')      time.sleep(3)      print('子進程結束')    if __name__ == "__main__":      p = multiprocessing.Process(target=job)      p.daemon = True      p.start()      print("程式結束......")

運行結果:

程式結束......

當主進程結束,其他進程將會被強制終止結束。

2.終止進程

import multiprocessing  import time    def job():      print('開始子進程')      time.sleep(3)      print('子進程結束')    if __name__ == "__main__":      p = multiprocessing.Process(target=job)      p.daemon = True      print(p.is_alive())     #啟動進程之前查看進程狀態      p.start()      print(p.is_alive())     #啟動進程之後查看進程狀態      p.terminate()           #終止進程      print(p.is_alive())     #終止進程命令一發出後,查看進程狀態。此時進程在釋放過程中,還沒有被完全釋放。      p.join()                #先讓進程完全釋放      print(p.is_alive())     #最後查看進程狀態        print("程式結束......")

運行結果:

False  True  True  False  程式結束......

4.進程間通訊

"""  通過隊列實現進程間通訊,隊列充當消息管道的作用(類似生產者消費者模型)  這裡通訊一直存在,也就是這兩個進程會一直存在,沒有銷毀釋放。  """  import multiprocessing  from multiprocessing import Queue  import time    class Put_news(multiprocessing.Process):      def __init__(self,queue):          super(Put_news, self).__init__()          self.queue = queue      def run(self):          for i in range(100):              self.queue.put(i)              print("傳遞消息:%s" %i)              time.sleep(0.1)    class Get_news(multiprocessing.Process):      def __init__(self,queue):          super(Get_news, self).__init__()          self.queue = queue      def run(self):          while True:              time.sleep(0.11)              print("接收消息++++++++++++:%s" %(self.queue.get()))    if __name__ == "__main__":      q = Queue()      p = Put_news(q)      g = Get_news(q)      p.start()      g.start()        if not p.is_alive():          g.terminate()

運行結果:

5.分散式進程

任務需要處理的數據特別大, 希望多台主機共同處理任務。multiprocessing.managers子模組裡面可以實現將進程分布到多台機器上 (管理端主機要運算一些列任務,通過與其他主機建立「連接「,將任務分配給其他主機執行,並將執行結果返回給管理端主機。) 管理端主機程式碼:

import random  from queue import Queue  from multiprocessing.managers import BaseManager    # 1.創建隊列(發送任務的隊列,收取結果的隊列)  task_queue = Queue()  result_queue = Queue()    # 第二三步驟可以互換順序  # 2.將隊列註冊到網路(這樣其他主機可以通過網路接收任務,發送結果)  # 註冊的隊列(任務隊列,結果隊列)的唯一標識碼分別為'put_task_queue','get_result_queue'  BaseManager.register('put_task_queue',callable=lambda :task_queue)  BaseManager.register('get_result_queue',callable=lambda : result_queue)    # 3.綁定埠(3333),設定密碼(hahahaha)  manager = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')    # 4.啟動manager,開始共享隊列  manager.start()    # 5.通過網路訪問共享的隊列  task = manager.put_task_queue()  result = manager.get_result_queue()    # 6.向任務隊列中放入執行任務的數據,這裡放入100個任務  for i in range(100):      n = random.randint(10,500)      task.put(n)      print('任務列表加入數據:'+str(n))    # 7.從結果隊列中讀取各個主機的任務執行結果  for j in range(100):      res = result.get()      print('執行結果:'+str(res))    # 8.任務執行結束,關閉共享隊列  manager.shutdown()

運算主機程式碼:

"""  在各個工作主機上執行的程式碼相同  """    from multiprocessing.managers import BaseManager    # 1. 連接manager端,獲取共享的隊列  import time    worker = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha')    # 2.註冊隊列,去獲取網路上共享的隊列中的內容  BaseManager.register('put_task_queue')  BaseManager.register('get_result_queue')    # 3.連接網路  worker.connect()    # 4.通過網路訪問共享的隊列    task = worker.put_task_queue()  result = worker.get_result_queue()    # 5.讀取任務,處理任務,這裡讀取了50個任務進行處理  # 每台運算主機上的處理任務數量可以不同,不過為了避免修改程式碼,一般都相同。  for i in range(50):      n = task.get()      print('執行任務 %d**2 = '%(n))      res = '%d**2=%d' %(n,n**2)  #這裡設置執行的任務是求平方      result.put(res)     #將結果放入結果隊列      time.sleep(1)       #休息1秒    print('工作主機執行任務結束.....')

6.進程池

和執行緒一樣,進程也有進程池。 1.第一種方法

import multiprocessing  import time    def job(id):      print('start id ---> %d' %id)      print('end id ----> %d' %id)      time.sleep(3)  # 創建含有8個進程的進程池  pool = multiprocessing.Pool(8)  # 給進城池的進程分配任務  for i in range(12):      pool.apply_async(job,args=(i,))    # 關閉進程池,使進程池不再工作運行  pool.close()  # 等待所有子進程結束之後再開始主進程  pool.join()    print('all works completed!')

運行結果:

start id ---> 0  end id ----> 0  start id ---> 1  end id ----> 1  start id ---> 2  end id ----> 2  start id ---> 3  end id ----> 3  start id ---> 4  end id ----> 4  start id ---> 5  end id ----> 5  start id ---> 6  end id ----> 6  start id ---> 7  end id ----> 7  start id ---> 8  end id ----> 8  start id ---> 9  end id ----> 9  start id ---> 10  end id ----> 10  start id ---> 11  end id ----> 11  all works completed!

2.第二種方法

from concurrent.futures import ProcessPoolExecutor  import time  def job(id):      print('start id ---> %d' %id)      print('end id ----> %d' %id)      time.sleep(3)    # 創建含有2個進程的進程池  pool = ProcessPoolExecutor(max_workers=2)  # 給進程池的進程分配任務,submit方法返回一個_base.Future對象  f1 = pool.submit(job,1)  f2 = pool.submit(job,2)  f3 = pool.submit(job,3)  f4 = pool.submit(job,4)  # 執行f1對象的各種方法  f1.done()  f1.result()

運行結果:

start id ---> 1  end id ----> 1  start id ---> 2  end id ----> 2  start id ---> 3  end id ----> 3  start id ---> 4  end id ----> 4

3.第三種方法

from concurrent.futures import ProcessPoolExecutor  import time  def job(id):      print('start id ---> %d' %id)      print('end id ----> %d' %id)      time.sleep(1)  pool = ProcessPoolExecutor(max_workers=3)  pool.map(job,range(1,10))

運行結果:

start id ---> 1  end id ----> 1  start id ---> 2  end id ----> 2  start id ---> 3  end id ----> 3  start id ---> 4  end id ----> 4  start id ---> 5  end id ----> 5  start id ---> 6  end id ----> 6  start id ---> 7  end id ----> 7  start id ---> 8  end id ----> 8  start id ---> 9  end id ----> 9