Python學習記錄-多進程和多線程

Python學習記錄-多進程和多線程

[TOC]

1. 進程和線程

進程

狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。 廣義定義:進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。

線程

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。

線程與進程比較

線程與進程的區別: 1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。 2)通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。 3)創建:創建新線程很簡單,創建新進程需要對父進程進行一次克隆。 4)調度和切換:一個線程可以控制和操作同一進程里的其它線程,但是進程只能操作子進程;線程上下文切換比進程上下文切換要快得多。 5)在多線程OS中,進程不是一個可執行的實體。

<font color="red">注意:</font> 線程和進程快慢無法對比,因為線程被包含在進程中。

2. threading模塊

線程有2種調用方式,如下:

直接調用

import threading  import time    def sayhi(num):  # 定義每個線程要運行的函數        print("running on number:%s" % num)        time.sleep(3)    if __name__ == '__main__':      t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一個線程實例      t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另一個線程實例        t1.start()  # 啟動線程      t2.start()  # 啟動另一個線程        print(t1.getName())  # 獲取線程名      print(t2.getName())

繼承式調用

import threading  import time    class MyThread(threading.Thread):      def __init__(self, num):          threading.Thread.__init__(self)          self.num = num        def run(self):  # 定義每個線程要運行的函數          print("running on number:%s" % self.num)          time.sleep(3)    if __name__ == '__main__':      t1 = MyThread(1)      t2 = MyThread(2)      t1.start()      t2.start()

平常用到的主要方法:

start 線程準備就緒,等待CPU調度 setName 為線程設置名稱 getName 獲取線程名稱 setDaemon 設置為後台線程或前台線程(默認) 如果是後台線程,主線程執行過程中,後台線程也在進行,主線程執行完畢後,後台線程不論成功與否,均停止 如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢後,等待前台線程也執行完成後,程序停止 join 逐個執行每個線程,執行完畢後繼續往下執行,該方法使得多線程變得無意義 run 線程被cpu調度後自動執行線程對象的run方法

2.1 Join & Daemon

import time  import threading    def run(n):      print('[%s]------running----n' % n)      time.sleep(2)      print('--done--')    def main():      for i in range(5):          t = threading.Thread(target=run, args=[i, ])          t.start()          t.join(1)          print('starting thread', t.getName())    m = threading.Thread(target=main, args=[])  m.setDaemon(True)  # 將main線程設置為Daemon線程,它做為程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執行完任務  m.start()  m.join(timeout=2)  print("---main thread done----")
import time  import threading    def addNum():      global num  # 在每個線程中都獲取這個全局變量      print('--get num:', num)      time.sleep(1)      num -= 1  # 對此公共變量進行-1操作    num = 100  # 設定一個共享變量  thread_list = []  for i in range(100):      t = threading.Thread(target=addNum)      t.start()      thread_list.append(t)    for t in thread_list:  # 等待所有線程執行完畢      t.join()    print('final num:', num)

2.2 線程鎖(互斥鎖Mutex)

一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之後,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 – 同一時刻允許一個線程執行操作。

import time  import threading    def addNum():      global num  # 在每個線程中都獲取這個全局變量      print('--get num:', num)      time.sleep(1)      num -= 1  # 對此公共變量進行-1操作    num = 100  # 設定一個共享變量  thread_list = []  for i in range(100):      t = threading.Thread(target=addNum)      t.start()      thread_list.append(t)    for t in thread_list:  # 等待所有線程執行完畢      t.join()    print('final num:', num)

因為python2.7以上版本,已經自動添加線程鎖,所以我們不用細糾。

import time  import threading    def addNum():      global num  # 在每個線程中都獲取這個全局變量      print('--get num:', num)      time.sleep(1)      lock.acquire()  # 修改數據前加鎖      num -= 1  # 對此公共變量進行-1操作      lock.release()  # 修改後釋放    num = 100  # 設定一個共享變量  thread_list = []  lock = threading.Lock()  # 生成全局鎖  for i in range(100):      t = threading.Thread(target=addNum)      t.start()      thread_list.append(t)    for t in thread_list:  # 等待所有線程執行完畢      t.join()    print('final num:', num)

2.3 信號量(Semaphore)

互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading, time    def run(n):      semaphore.acquire()      time.sleep(1)      print("run the thread: %s" % n)      semaphore.release()    if __name__ == '__main__':        num = 0      semaphore = threading.BoundedSemaphore(5)  # 最多允許5個線程同時運行      for i in range(20):          t = threading.Thread(target=run, args=(i,))          t.start()

2.4 事件(event)

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 setwaitclear

事件處理的機制:全局定義了一個「Flag」,如果「Flag」值為 False,那麼當程序執行 event.wait 方法時就會阻塞,如果「Flag」值為True,那麼event.wait 方法時便不再阻塞。

clear:將「Flag」設置為False set:將「Flag」設置為True

import threading    def do(event):      print('start')      event.wait()      print('execute')    event_obj = threading.Event()  for i in range(10):      t = threading.Thread(target=do, args=(event_obj,))      t.start()    event_obj.clear()  inp = input('input:')  if inp == 'true':      event_obj.set()

2.5 條件(Condition)

使得線程等待,只有滿足某條件時,才釋放n個線程。

未使用條件時:

import threading    def run(n):      con.acquire()      con.wait()      print("run the thread: %s" %n)      con.release()    if __name__ == '__main__':        con = threading.Condition()      for i in range(10):          t = threading.Thread(target=run, args=(i,))          t.start()        while True:          inp = input('>>>')          if inp == 'q':              break          con.acquire()          con.notify(int(inp))          con.release()

使用條件時:

def condition_func():        ret = False      inp = input('>>>')      if inp == '1':          ret = True        return ret    def run(n):      con.acquire()      con.wait_for(condition_func)      print("run the thread: %s" %n)      con.release()    if __name__ == '__main__':        con = threading.Condition()      for i in range(10):          t = threading.Thread(target=run, args=(i,))          t.start()

2.6 定時器(Timer)

定時器,指定n秒後執行某操作

from threading import Timer    def hello():      print("hello, world")    t = Timer(1, hello)  t.start()  # after 1 seconds, "hello, world" will be printed

3. queue隊列模塊

queue特別用於在多個線程間安全的交換。

class queue.Queue(maxsize=0) #先入先出  class queue.LifoQueue(maxsize=0) #後入先出  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

maxsize為隊列最大容量,如果設置為0或者小於0,則隊列大小無限制。

3.1 一些常用方法

task_done() 意味着之前入隊的一個任務已經完成。由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。

如果當前一個join()正在阻塞,它將在隊列中的所有任務都處理完時恢復執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用。

join() 阻塞調用線程,直到隊列中的所有任務被處理掉。

只要有數據被加入隊列,未完成的任務數就會增加。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。

put(item[, block[, timeout]]) 將item放入隊列中。

如果可選的參數blockTruetimeout為空對象(默認的情況,阻塞調用,無超時)。 如果timeout是個正整數,阻塞調用進程最多timeout秒,如果一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。 如果blockFalse,如果有空閑空間可用將數據放入隊列,否則立即拋出Full異常 其非阻塞版本為put_nowait等同於put(item, False)

get([block[, timeout]]) 從隊列中移除並返回一個數據。blocktimeout參數同put方法

其非阻塞方法為get_nowait()相當與get(False)

empty() 如果隊列為空,返回True,反之返回False

4. 生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麼要使用生產者和消費者模式?

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式?

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

生產者消費者例子:

import threading  import queue  import time    def producer(name):      count = 1      while True:          for i in range(10):              q.put("包子 %s" % count)              print("生產了包子", count)              count += 1              time.sleep(0.5)    def consumer(n):      while True:          if q.qsize() > 0:              print("%s 取到" %n  , q.get())              time.sleep(1.5)    q = queue.Queue(maxsize=10)  p = threading.Thread(target=producer,args=("包子店",))  p.start()    c1 = threading.Thread(target=consumer,args=("Lily",))  c1.start()  c2 = threading.Thread(target=consumer,args=("Aeny",))  c2.start()  c3 = threading.Thread(target=consumer,args=("Bob",))  c3.start()

4.1 多線程的使用場景

python多線程不適合CPU密集型的任務,適合IO密集型的任務。

5. 多進程

多進程模塊multiprocessing的基本使用方法:

from multiprocessing import Process  import threading  import time    def thread_run():      print(threading.get_ident())    def foo(i):      time.sleep(1)      print('say hi',i)      t = threading.Thread(target=thread_run,)      t.start()    for i in range(10):      p = Process(target=foo,args=(i,))      p.start()      # p.join()

<font color="red">注意</font>: 由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。

獲取進程ID和父進程ID的方法示例:

from multiprocessing import Process  import os    def info(title):      print(title)      print('module name:', __name__)      print('parent process:', os.getppid())      print('process id:', os.getpid())      print("nn")    def f(name):      info('33[31;1mfunction f33[0m')      print('hello', name)    if __name__ == '__main__':      info('33[32;1mmain process line33[0m')      p = Process(target=f, args=('bob',))      p.start()      p.join()

5.1 進程間數據交互

不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法: Queues

from multiprocessing import Process, Queue    def f(q):      q.put([100, None, 'hello'])    if __name__ == '__main__':      q = Queue()      p = Process(target=f, args=(q,))      p.start()      print(q.get())    # prints "[100, None, 'hello']"      p.join()

Pipes

from multiprocessing import Process, Pipe    def f(conn):      conn.send([1, True, 'first'])      conn.send([2, True, 'second'])      print("from parent:", conn.recv())  # from parent: Third      conn.close()    if __name__ == '__main__':      parent_conn, child_conn = Pipe()      p = Process(target=f, args=(child_conn,))      p.start()      print(parent_conn.recv())  # [1, True, 'first']      print(parent_conn.recv())  # [2, True, 'second']      parent_conn.send("Third")      p.join()

5.2 進程數據共享

進程各自持有一份數據,默認無法共享數據。

from multiprocessing import Process    import time    li = []    def foo(i):      li.append(i)      print('say hi',li)    for i in range(10):      p = Process(target=foo,args=(i,))      p.start()    print ('ending',li)

結果類似這樣(每次的結果可能排序會有變化)。

say hi [2]  say hi [3]  say hi [5]  say hi [0]  say hi [1]  say hi [4]  say hi [6]  say hi [7]  say hi [8]  ending []  say hi [9]

想要進程間共享數據

方法一:使用Array

from multiprocessing import Process,Array  temp = Array('i', [11,22,33,44])    def Foo(i):      temp[i] = 100+i      for item in temp:          print(i,'----->',item)    for i in range(2):      p = Process(target=Foo,args=(i,))      p.start() 

方法二:manage.dict()共享數據

from multiprocessing import Process,Manager  import os    def f(d, l):      d[os.getpid()] = os.getpid()      l.append(os.getpid())      print(l)    if __name__ == '__main__':      manager = Manager()      d= manager.dict()      l = manager.list(range(5))      p_list = []      for i in range(10):          p = Process(target=f,args=(d, l))          p.start()          p_list.append(p)        for res in p_list:  # 等待結果          res.join()        print(d)      print(l)

5.3 進程同步

進程鎖的作用是保證屏幕打印正常,因為不加鎖有可能同一時間打印的數據導致顯示錯亂。

from multiprocessing import Process, Lock    def f(l, i):      l.acquire()      try:          print('hello world', i)      finally:          l.release()    if __name__ == '__main__':      lock = Lock()        for num in range(10):          Process(target=f, args=(lock, num)).start()

5.4 進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
from multiprocessing import Process, Pool  import time  import os    def Foo(i):      time.sleep(2)      print("in process", os.getpid())      return i + 100    def Bar(arg):      print('-->exec done:', arg, os.getpid())    if __name__ == '__main__':      pool = Pool(processes=3)  # 允許進程池同時放入的個數      print("主進程", os.getpid())      for i in range(10):          pool.apply_async(func=Foo, args=(i,), callback=Bar)  # callback=回調          # pool.apply(func=Foo, args=(i,))  # 串行          # pool.apply_async(func=Foo, args=(i,))  # 並行        print('end')      pool.close()      pool.join()  # 進程池中進程執行完畢後再關閉,如果注釋,那麼進程直接關閉