Python 多執行緒爬蟲實戰
- 2020 年 2 月 13 日
- 筆記
Queue執行緒安全隊列解釋:
在執行緒中,訪問一些全局變數,加鎖是一個經常的過程。如果你是想把一些數據存儲到某個隊列中,那麼Python內置了一個執行緒安全的模組叫做queue
模組。Python中的queue模組中提供了同步的、執行緒安全的隊列類,包括FIFO(先進先出)隊列Queue,LIFO(後入先出)隊列LifoQueue。這些隊列都實現了鎖原語(可以理解為原子操作,即要麼不做,要麼都做完),能夠在多執行緒中直接使用。可以使用隊列來實現執行緒間的同步。相關的函數如下:
- 初始化Queue(maxsize):創建一個先進先出的隊列。
- qsize():返回隊列的大小。
- empty():判斷隊列是否為空。
- full():判斷隊列是否滿了。
- get():從隊列中取最後一個數據。
- put():將一個數據放到隊列中。
使用生產者與消費者模式多執行緒下載表情包:
import threading import requests from lxml import etree from urllib import request import os import re from queue import Queue class Producer(threading.Thread): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36' } def __init__(self,page_queue,img_queue,*args,**kwargs): super(Producer, self).__init__(*args,**kwargs) self.page_queue = page_queue self.img_queue = img_queue def run(self): while True: if self.page_queue.empty(): break url = self.page_queue.get() self.parse_page(url) def parse_page(self,url): response = requests.get(url,headers=self.headers) text = response.text html = etree.HTML(text) imgs = html.xpath("//div[@class='page-content text-center']//a//img") for img in imgs: if img.get('class') == 'gif': continue img_url = img.xpath(".//@data-original")[0] suffix = os.path.splitext(img_url)[1] alt = img.xpath(".//@alt")[0] alt = re.sub(r'[,。??,/\·]','',alt) img_name = alt + suffix self.img_queue.put((img_url,img_name)) class Consumer(threading.Thread): def __init__(self,page_queue,img_queue,*args,**kwargs): super(Consumer, self).__init__(*args,**kwargs) self.page_queue = page_queue self.img_queue = img_queue def run(self): while True: if self.img_queue.empty(): if self.page_queue.empty(): return img = self.img_queue.get(block=True) url,filename = img request.urlretrieve(url,'images/'+filename) print(filename+' 下載完成!') def main(): page_queue = Queue(100) img_queue = Queue(500) for x in range(1,101): url = "http://www.doutula.com/photo/list/?page=%d" % x page_queue.put(url) for x in range(5): t = Producer(page_queue,img_queue) t.start() for x in range(5): t = Consumer(page_queue,img_queue) t.start() if __name__ == '__main__': main()
GIL全局解釋器鎖:
Python自帶的解釋器是CPython
。CPython
解釋器的多執行緒實際上是一個假的多執行緒(在多核CPU中,只能利用一核,不能利用多核)。同一時刻只有一個執行緒在執行,為了保證同一時刻只有一個執行緒在執行,在CPython
解釋器中有一個東西叫做GIL(Global Intepreter Lock)
,叫做全局解釋器鎖。這個解釋器鎖是有必要的。因為CPython
解釋器的記憶體管理不是執行緒安全的。當然除了CPython
解釋器,還有其他的解釋器,有些解釋器是沒有GIL
鎖的,見下面:
Jython
:用Java實現的Python解釋器。不存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/JythonIronPython
:用.net
實現的Python解釋器。不存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/IronPythonPyPy
:用Python
實現的Python解釋器。存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/PyPy GIL雖然是一個假的多執行緒。但是在處理一些IO操作(比如文件讀寫和網路請求)還是可以在很大程度上提高效率的。在IO操作上建議使用多執行緒提高效率。在一些CPU計算操作上不建議使用多執行緒,而建議使用多進程
多執行緒下載百思不得姐段子:
import requests from lxml import etree import threading from queue import Queue import csv class BSSpider(threading.Thread): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36' } def __init__(self,page_queue,joke_queue,*args,**kwargs): super(BSSpider, self).__init__(*args,**kwargs) self.base_domain = 'http://www.budejie.com' self.page_queue = page_queue self.joke_queue = joke_queue def run(self): while True: if self.page_queue.empty(): break url = self.page_queue.get() response = requests.get(url, headers=self.headers) text = response.text html = etree.HTML(text) descs = html.xpath("//div[@class='j-r-list-c-desc']") for desc in descs: jokes = desc.xpath(".//text()") joke = "n".join(jokes).strip() link = self.base_domain+desc.xpath(".//a/@href")[0] self.joke_queue.put((joke,link)) print('='*30+"第%s頁下載完成!"%url.split('/')[-1]+"="*30) class BSWriter(threading.Thread): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36' } def __init__(self, joke_queue, writer,gLock, *args, **kwargs): super(BSWriter, self).__init__(*args, **kwargs) self.joke_queue = joke_queue self.writer = writer self.lock = gLock def run(self): while True: try: joke_info = self.joke_queue.get(timeout=40) joke,link = joke_info self.lock.acquire() self.writer.writerow((joke,link)) self.lock.release() print('保存一條') except: break def main(): page_queue = Queue(10) joke_queue = Queue(500) gLock = threading.Lock() fp = open('bsbdj.csv', 'a',newline='', encoding='utf-8') writer = csv.writer(fp) writer.writerow(('content', 'link')) for x in range(1,11): url = 'http://www.budejie.com/text/%d' % x page_queue.put(url) for x in range(5): t = BSSpider(page_queue,joke_queue) t.start() for x in range(5): t = BSWriter(joke_queue,writer,gLock) t.start() if __name__ == '__main__': main()
END
想一起交流的夥伴可以加我微信哦(左),歡迎關注我的公眾號(右)