Python多執行緒、執行緒池及實際運用
我們在寫python爬蟲的過程中,對於大量數據的抓取總是希望能獲得更高的速度和效率,但由於網路請求的延遲、IO的限制,單執行緒的運行總是不能讓人滿意。因此有了多執行緒、非同步協程等技術。
下面介紹一下python中的多執行緒及執行緒池技術,並通過一個具體的爬蟲案例實現具體運用。
多執行緒
先來分析單執行緒。寫兩個測試函數
def func1():
for i in range(500000):
print("func1", i)
def func2():
for i in range(500000):
print("func2", i)
在主函數中調用
if __name__ == "__main__":
func1()
func2()
當程式執行時,按照主程式中的執行順序,func1
全部運行完畢後才會運行func2
,這就是單執行緒的效果。
接下來測試多執行緒。
先導包
from threading import Thread
改造主函數
thread1 = Thread(target=func1)
thread1.start()
thread2 = Thread(target=func2)
thread2.start()
thread1.join()
thread2.join()
這裡的thread.join()是阻塞進程,因為這裡主函數中沒有
執行效果如下:
可以看到func1
和func2
函數分為兩個不同的執行緒同時工作、互不干擾。
執行緒池
以此類推,如果同時開著20個這樣的執行緒,是否可以同時執行呢?但手動分配這麼多執行緒顯然是不可能的,因此引入執行緒池這一概念,一次開闢一些進程,我們用戶直接給執行緒池提交任務,執行緒任務的調度交給執行緒池來完成。這樣一來,就能十分方便的分配執行緒的任務了。
首先導包
from concurrent.futures import ThreadPoolExecutor
改造一下子函數
def func(url):
for i in range(1000):
print(url)
主函數
if __name__ == "__main__":
# 創建執行緒池
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(func, url=f"執行緒{i}")
print("over")
我們建立一個執行緒池,分配50個執行緒,提交100個任務,讓他們去自由分配。現有的50個執行緒先去拿到了1-50這些任務,當誰先完成就去拿到51個任務,以此類推。相當於50個工人一起幹活,互不干涉,顯然效率較單人更高一些。
再來看運行結果
執行緒鎖
了解了執行緒池的基本概念之後就可以去改造我們的爬蟲了。但是在此之前該需要了解一個執行緒鎖的概念。先看下面這個例子
from threading import Thread
num = 0
def add():
global num
for i in range(100000):
num += 1
def minus():
global num
for i in range(100000):
num -= 1
if __name__=="__main__":
thread1 = Thread(target=add)
thread2 = Thread(target=minus)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(num)
開闢兩個執行緒,一個做自增一個做自減,他們兩個同時運行,按常理num最終的值應為0,但實際運行結果是不穩定的。
由於每個執行緒運行速度極快,因此在他們的臨界點都想對全局變數num
操作時會出現競爭狀態,有可能出現數值丟失、自增失敗的情況,因此需要加入執行緒鎖
來控制每次只允許有一個執行緒對全局變數num
進行操作。
import threading
lock = threading.Lock()
lock.acquire()
num += 1
lock.release()
在執行緒中的關鍵操作加上執行緒鎖,再跑起來就不會出現競爭狀態了。
爬蟲實戰
要在爬蟲中運用到執行緒池,基本的思路很簡單,
1.如何抓取到單個頁面的數據
2.上執行緒池批量抓取
目標://www.dydytt.net/html/gndy/dyzz/list_23_1.html
這裡僅做執行緒池的基本實驗,具體案例移步這裡
先隨便寫個爬蟲拿到第一頁的所有電影標題數據
import requests
from lxml import etree
filmNameList = []
def download(url):
global filmNameList
resp = requests.get(url)
resp.encoding="gb2312"
html = etree.HTML(resp.text)
filmName = html.xpath('//table[@class="tbspan"]/tr[2]/td[2]/b/a/text()')
for each in filmName:
filmNameList.append(each)
pass
if __name__=="__main__":
url = "//www.dydytt.net/html/gndy/dyzz/list_23_1.html"
download(url)
for i in filmNameList:
print(i)
非常輕鬆的拿到了第一頁的數據
接下來上執行緒池
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
filmNameList = []
lock = threading.Lock()
def download(url):
global filmNameList
resp = requests.get(url)
resp.encoding="gb2312"
html = etree.HTML(resp.text)
filmName = html.xpath('//table[@class="tbspan"]/tr[2]/td[2]/b/a/text()')
for each in filmName:
lock.acquire()
filmNameList.append(each)
lock.release()
resp.close()
if __name__=="__main__":
with ThreadPoolExecutor(5) as t:
for i in range(1, 11):
url = f"//www.dydytt.net/html/gndy/dyzz/list_23_{i}.html"
t.submit(download, url=url)
for i in filmNameList:
print(i)
print(f"total_len is {len(filmNameList)}")
我們給執行緒池分配了5個執行緒,抓了前10頁共250條數據。
****