43、並發編程之多執行緒實操篇

一 threading模組介紹

multiprocessing模組完全模仿了threading模組的介面,二者在使用層面,有很大的相似性,因而不再詳細介紹

官網鏈接://docs.python.org/3/library/threading.html?highlight=threading#

二 開啟執行緒的兩種方式

# 方式一
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('amgulen',))
    t.start()
    print('主執行緒')
# 方式二
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('amgulen')
    t.start()
    print('主執行緒')

img

三 在一個進程下開啟多個執行緒與在一個進程下開啟多個子進程的區別

1.誰的開啟速度快

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    # 在主進程下開啟執行緒
    t=Thread(target=work)
    t.start()
    print('主執行緒/主進程')
    '''
    列印結果:
    hello
    主執行緒/主進程
    '''

    # 在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    print('主執行緒/主進程')
    '''
    列印結果:
    主執行緒/主進程
    hello
    '''

2.瞅一瞅PID

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    # part1:在主進程下開啟多個執行緒,每個執行緒都跟主進程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主執行緒/主進程pid',os.getpid())

    # part2:開多個進程,每個進程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主執行緒/主進程pid',os.getpid())

3.同一進程內的執行緒共享該進程的數據?

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) # 毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('主',n) # 查看結果為0,因為同一進程內的執行緒之間共享進程內的數據

四 練習

練習一:

多執行緒並發的socket服務端

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()
        p=threading.Thread(target=action,args=(conn,))
        p.start()

客戶端

#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

練習二:三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

五 執行緒相關的其他方法

Thread實例對象的方法
  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設置執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在運行的執行緒的list。正在運行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在運行的執行緒數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    # 在主進程下開啟執行緒
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) # 主執行緒
    print(threading.enumerate()) # 連同主執行緒在內有兩個運行的執行緒
    print(threading.active_count())
    print('主執行緒/主進程')

    '''
    列印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主執行緒/主進程
    Thread-1
    '''

主執行緒等待子執行緒結束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('amgulen',))
    t.start()
    t.join()
    print('主執行緒')
    print(t.is_alive())
    '''
    amgulen say hello
    主執行緒
    False
    '''

六 守護執行緒

無論是進程還是執行緒,都遵循:守護xxx會等待主xxx運行完畢後被銷毀

需要強調的是:運行完畢並非終止運行

# 1.對主進程來說,運行完畢指的是主進程程式碼運行完畢

# 2.對主執行緒來說,運行完畢指的是主執行緒所在的進程內所有非守護執行緒統統運行完畢,主執行緒才算運行完畢

詳細解釋:

# 1 主進程在其程式碼結束後就已經算運行完畢了(守護進程在此時就被回收),然後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(否則會產生殭屍進程),才會結束,

# 2 主執行緒在其他非守護執行緒運行完畢後才算運行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護執行緒都運行完畢後才能結束。
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('amgulen',))
    t.setDaemon(True) # 必須在t.start()之前設置
    t.start()

    print('主執行緒')
    print(t.is_alive())
    '''
    主執行緒
    True
    '''

迷惑人的例子

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

七 Python GIL鎖(Global Interpreter Lock)

一 介紹

'''
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython』s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''
結論:在Cpython解釋器中,同一個進程下開啟的多執行緒,同一時刻只能有一個執行緒執行,無法利用多核優勢

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行程式碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段程式碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這裡要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

這篇文章透徹的剖析了GIL對python多執行緒的影響,強烈推薦看一下:

二 GIL介紹

GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串列,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。

要想了解GIL,首先確定一點:每次執行python程式,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不同的python進程

驗證python test.py只會產生一個進程

'''
# 驗證python test.py只會產生一個進程
# test.py內容
import os,time
print(os.getpid())
time.sleep(1000)
'''
python3 test.py 
#在windows下
tasklist |findstr python
#在linux下
ps aux |grep python

在一個python的進程內,不僅有test.py的主執行緒或者由該主執行緒開啟的其他執行緒,還有解釋器開啟的垃圾回收等解釋器級別的執行緒,總之,所有執行緒都運行在這一個進程內,毫無疑問

#1 所有數據都是共享的,這其中,程式碼作為一種數據也是被所有執行緒共享的(test.py的所有程式碼以及Cpython解釋器的所有程式碼)
例如:test.py定義一個函數work(程式碼內容如下圖),在進程內所有執行緒都能訪問到work的程式碼,於是我們可以開啟三個執行緒然後target都指向該程式碼,能訪問到意味著就是可以執行。

#2 所有執行緒的任務,都需要將任務的程式碼當做參數傳給解釋器的程式碼去執行,即所有的執行緒要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的程式碼。

綜上:

如果多個執行緒的target=work,那麼執行流程是

多個執行緒先訪問到解釋器的程式碼,即拿到執行許可權,然後將target的程式碼交給解釋器的程式碼去執行

解釋器的程式碼是所有執行緒共享的,所以垃圾回收執行緒也可能訪問到解釋器的程式碼而去執行,這就導致了一個問題:對於同一個數據100,可能執行緒1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什麼高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的程式碼

img

三 GIL與Lock

GIL保護的是解釋器級的數據,保護用戶自己的數據則需要自己加鎖處理,如下圖

img

四 GIL與多執行緒

有了GIL的存在,同一時刻同一進程中只有一個執行緒被執行

聽到這裡,有的同學立馬質問:進程可以利用多核,但是開銷大,而python的多執行緒開銷小,但卻無法利用多核優勢,也就是說python沒用了,php才是最牛逼的語言?

別著急啊,老娘還沒講完呢。

要解決這個問題,我們需要在幾個點上達成一致:

#1. cpu到底是用來做計算的,還是用來做I/O的?

#2. 多cpu,意味著可以有多個核並行完成計算,所以多核提升的是計算性能

#3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什麼用處 

一個工人相當於cpu,此時計算相當於工人在幹活,I/O阻塞相當於為工人幹活提供所需原材料的過程,工人幹活的過程中如果沒有原材料了,則工人幹活的過程需要停止,直到等待原材料的到來。

如果你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去干別的活,

反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高

結論:

對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用

當然對運行一個程式來說,隨著cpu的增多執行效率肯定會有所提高(不管提高幅度多大,總會有所提高),這是因為一個程式基本上不會是純計算或者純I/O,所以我們只能相對的去看一個程式到底是計算密集型還是I/O密集型,從而進一步分析python的多執行緒到底有無用武之地

# 分析:
我們有四個任務需要處理,處理方式肯定是要玩出並發的效果,解決方案可以是:
方案一:開啟四個進程
方案二:一個進程下,開啟四個執行緒

# 單核情況下,分析結果: 
  如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷,方案二勝
  如果四個任務是I/O密集型,方案一創建進程的開銷大,且進程的切換速度遠不如執行緒,方案二勝

# 多核情況下,分析結果:
  如果四個任務是計算密集型,多核意味著並行計算,在python中一個進程中同一時刻只有一個執行緒執行用不上多核,方案一勝
  如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

 
# 結論:現在的電腦基本上都是多核,python對於計算密集型的任務開多執行緒的效率並不能帶來多大性能上的提升,甚至不如串列(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

五 多執行緒性能測試

計算密集型:多進程效率高

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) # 本機為4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) # 耗時5s多
        p=Thread(target=work) # 耗時18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

I / O密集型:多執行緒效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) # 本機為4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗時12s多,大部分時間耗費在創建進程上
        p=Thread(target=work) # 耗時2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

應用:

多執行緒用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

八 同步鎖

三個需要注意的點:
# 1.執行緒搶的是GIL鎖,GIL鎖相當於執行許可權,拿到執行許可權後才能拿到互斥鎖Lock,其他執行緒也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行許可權GIL也要立刻交出來

# 2.join是等待所有,即整體串列,而鎖只是鎖住修改共享數據的部分,即部分串列,要想保證數據安全的根本原理在於讓並發變成串列,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串列效率要更高

# 3. 一定要看本小節最後的GIL與互斥鎖的經典分析

GIL VS Lock

機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個執行緒來執行了,為什麼這裡還需要lock?

首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個執行緒來修改共享的數據

然後,我們可以得出結論:保護不同的數據就應該加不同的鎖。

最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程式的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有執行緒搶的是GIL鎖,或者說所有執行緒搶的是執行許可權

執行緒1搶到GIL鎖,拿到執行許可權,開始執行,然後加了一把Lock,還沒有執行完畢,即執行緒1還未釋放Lock,有可能執行緒2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被執行緒1釋放,於是執行緒2進入阻塞,被奪走執行許可權,有可能執行緒1拿到GIL,然後正常執行到釋放Lock。。。這就導致了串列運行的效果

既然是串列,那我們執行

t1.start()

t1.join

t2.start()

t2.join()

這也是串列執行啊,為何還要加Lock呢,需知join是等待t1所有的程式碼執行完,相當於鎖住了t1的所有程式碼,而Lock只是鎖住一部分操作共享數據的程式碼。

# 因為Python解釋器幫你自動定期進行記憶體回收,你可以理解為python解釋器里有一個獨立的執行緒,每過一段時間它起wake up做一次全局輪詢看看哪些記憶體數據是可以被清空的,此時你自己的程式 里的執行緒和 py解釋器自己的執行緒是並發運行的,假設你的執行緒刪除了一個變數,py解釋器的垃圾回收執行緒在清空這個變數的過程中的clearing時刻,可能一個其它執行緒正好又重新給這個還沒來及得清空的記憶體空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個執行緒運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。 
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) # 結果可能為99

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它執行緒已經獲得了該鎖,則當前執行緒需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操作
'''
R.release()
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) # 結果肯定為0,由原來的並發執行變成串列,犧牲了執行效率保證了數據安全

GIL鎖與互斥鎖綜合分析(重點)

分析:
# 1.100個執行緒去搶GIL鎖,即搶執行許可權
# 2. 肯定有一個執行緒先搶到GIL(暫且稱為執行緒1),然後開始執行,一旦執行就會拿到lock.acquire()
# 3. 極有可能執行緒1還未運行完畢,就有另外一個執行緒2搶到GIL,然後開始運行,但執行緒2發現互斥鎖lock還未被執行緒1釋放,於是阻塞,被迫交出執行許可權,即釋放GIL
# 4.直到執行緒1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的執行緒再重複2 3 4的過程

互斥鎖與join的區別(重點)

# 不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


# 不加鎖:未加鎖部分並發執行,加鎖部分串列執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    # 未加鎖的程式碼並發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    # 加鎖的程式碼串列運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

# 有的同學可能有疑問:既然加鎖會讓運行變成串列,那麼我在start之後立即使用join,就不用加鎖了啊,也是串列的效果啊
# 沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成串列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
# start後立即join:任務內的所有程式碼都是串列執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串列的
# 單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0
'''

九 死鎖現象與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那裡忘記說了,放到這裡一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然後就卡住,死鎖了
'''

解決方法,遞歸鎖,在Python中為了支援在同一執行緒中多次請求同一資源,提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() # 一個執行緒拿到鎖,counter加1,該執行緒內又碰到加鎖的情況,則counter繼續加1,這期間所有其他執行緒都只能等待,等待該執行緒釋放所有鎖,即counter遞減到0為止

十 訊號量Semaphore

同進程的一樣

Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1,調用release() 時內置計數器+1,計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒直到其他執行緒調用release()。

實例:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連接數為5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而訊號量是產生一堆執行緒 / 進程

十一 Event

同進程的一樣

執行緒的一個關鍵特性是每個執行緒都是獨立運行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由執行緒設置的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event對象中的訊號標誌被設置為假。如果有執行緒等待一個Event對象, 而這個Event對象的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event對象的訊號標誌設置為真,它將喚醒所有等待這個Event對象的執行緒。如果一個執行緒等待一個已經被設置為真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():# 返回event的狀態值;

event.wait(): # 如果 event.isSet()==False將阻塞執行緒;

event.set():  # 設置event的狀態值為True,所有阻塞池的執行緒激活進入就緒狀態, 等待作業系統調度;

event.clear():# 恢復event的狀態值為False。

img

例如,有多個工作執行緒嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作執行緒去連接MySQL伺服器,如果連接不成功,都會去嘗試重新連接。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連接操作

例一

from threading import Event,Thread,current_thread

e=Event()

def check_mysql():
    print('正則檢測mysql',e.is_set())
    import time
    time.sleep(2)
    e.set()

def conn_mysql():
    count=0
    while count < 3:
        print('<%s>第%s次嘗試鏈接' % (current_thread().getName(), count))
        e.wait(0.5)
        if e.is_set():
            print('<%s> 鏈接成功' % current_thread().getName())
            break
        count+=1
    else:
        # raise TimeoutError("鏈接超時")
        print("<%s> 鏈接超時" % current_thread().getName())

if __name__ == '__main__':
    t1=Thread(target=check_mysql)
    t2=Thread(target=conn_mysql)
    t1.start()
    t2.start()

例二

import time
from threading import Event,Thread,current_thread

e=Event()

def f1():
    while True:
        e.clear()
        print("紅燈亮,請等待2秒")
        time.sleep(2)

        e.set()
        print('綠燈亮,持續2秒')
        time.sleep(2)



def f2():
    while True:
        if e.is_set():
            print('%s 過馬路' %current_thread().getName())
        else:
            print("%s 等待" % current_thread().getName())
            e.wait()

if __name__ == '__main__':
    t1=Thread(target=f1)
    t2=Thread(target=f2)
    t1.start()
    t2.start()

十二 條件Condition

使得執行緒等待,只有滿足某條件時,才釋放n個執行緒

import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

十三 定時器

定時器,指定n秒後執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()

驗證碼定時器

from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('驗證成功',end='\n')
                self.t.cancel()
                break


if __name__ == '__main__':
    obj=Code()
    obj.check()

十四 執行緒queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

classqueue.Queue(maxsize=0) 先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

classqueue.LifoQueue(maxsize=0) 後進先出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

classqueue.PriorityQueue(maxsize=0) 存儲數據時可設置優先順序的隊列

import queue

q=queue.PriorityQueue()
# put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

其他

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢

十五 Python標準模組–concurrent.futures

//docs.python.org/dev/library/concurrent.futures.html

#1 介紹
concurrent.futures模組提供了高度封裝的非同步調用介面
ThreadPoolExecutor:執行緒池,提供非同步調用
ProcessPoolExecutor: 進程池,提供非同步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
# submit(fn, *args, **kwargs)
非同步提交任務

# map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

# shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數

ProcessPoolExecutor

# 介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


# 用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

ThreadPoolExecutor

# 介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

# 用法
與ProcessPoolExecutor相同

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) # map取代了for+submit

回調函數

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        '//www.baidu.com',
        '//www.python.org',
        '//www.openstack.org',
        '//help.github.com/',
        '//www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) # parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果