Python 多執行緒爬蟲實戰

  • 2020 年 2 月 13 日
  • 筆記

Queue執行緒安全隊列解釋:

在執行緒中,訪問一些全局變數,加鎖是一個經常的過程。如果你是想把一些數據存儲到某個隊列中,那麼Python內置了一個執行緒安全的模組叫做queue模組。Python中的queue模組中提供了同步的、執行緒安全的隊列類,包括FIFO(先進先出)隊列Queue,LIFO(後入先出)隊列LifoQueue。這些隊列都實現了鎖原語(可以理解為原子操作,即要麼不做,要麼都做完),能夠在多執行緒中直接使用。可以使用隊列來實現執行緒間的同步。相關的函數如下:

  1. 初始化Queue(maxsize):創建一個先進先出的隊列。
  2. qsize():返回隊列的大小。
  3. empty():判斷隊列是否為空。
  4. full():判斷隊列是否滿了。
  5. get():從隊列中取最後一個數據。
  6. put():將一個數據放到隊列中。

使用生產者與消費者模式多執行緒下載表情包:


import threading  import requests  from lxml import etree  from urllib import request  import os  import re  from queue import Queue    class Producer(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }      def __init__(self,page_queue,img_queue,*args,**kwargs):          super(Producer, self).__init__(*args,**kwargs)          self.page_queue = page_queue          self.img_queue = img_queue          def run(self):          while True:              if self.page_queue.empty():                  break              url = self.page_queue.get()              self.parse_page(url)        def parse_page(self,url):          response = requests.get(url,headers=self.headers)          text = response.text          html = etree.HTML(text)          imgs = html.xpath("//div[@class='page-content text-center']//a//img")          for img in imgs:              if img.get('class') == 'gif':                  continue              img_url = img.xpath(".//@data-original")[0]              suffix = os.path.splitext(img_url)[1]              alt = img.xpath(".//@alt")[0]              alt = re.sub(r'[,。??,/\·]','',alt)              img_name = alt + suffix              self.img_queue.put((img_url,img_name))    class Consumer(threading.Thread):      def __init__(self,page_queue,img_queue,*args,**kwargs):          super(Consumer, self).__init__(*args,**kwargs)          self.page_queue = page_queue          self.img_queue = img_queue        def run(self):          while True:              if self.img_queue.empty():                  if self.page_queue.empty():                      return              img = self.img_queue.get(block=True)              url,filename = img              request.urlretrieve(url,'images/'+filename)              print(filename+'  下載完成!')    def main():      page_queue = Queue(100)      img_queue = Queue(500)      for x in range(1,101):          url = "http://www.doutula.com/photo/list/?page=%d" % x          page_queue.put(url)        for x in range(5):          t = Producer(page_queue,img_queue)          t.start()        for x in range(5):          t = Consumer(page_queue,img_queue)          t.start()    if __name__ == '__main__':      main()      

GIL全局解釋器鎖:

Python自帶的解釋器是CPythonCPython解釋器的多執行緒實際上是一個假的多執行緒(在多核CPU中,只能利用一核,不能利用多核)。同一時刻只有一個執行緒在執行,為了保證同一時刻只有一個執行緒在執行,在CPython解釋器中有一個東西叫做GIL(Global Intepreter Lock),叫做全局解釋器鎖。這個解釋器鎖是有必要的。因為CPython解釋器的記憶體管理不是執行緒安全的。當然除了CPython解釋器,還有其他的解釋器,有些解釋器是沒有GIL鎖的,見下面:

  1. Jython:用Java實現的Python解釋器。不存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/Jython
  2. IronPython:用.net實現的Python解釋器。不存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/IronPython
  3. PyPy:用Python實現的Python解釋器。存在GIL鎖。更多詳情請見:https://zh.wikipedia.org/wiki/PyPy GIL雖然是一個假的多執行緒。但是在處理一些IO操作(比如文件讀寫和網路請求)還是可以在很大程度上提高效率的。在IO操作上建議使用多執行緒提高效率。在一些CPU計算操作上不建議使用多執行緒,而建議使用多進程

多執行緒下載百思不得姐段子:


import requests  from lxml import etree  import threading  from queue import Queue  import csv      class BSSpider(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }      def __init__(self,page_queue,joke_queue,*args,**kwargs):          super(BSSpider, self).__init__(*args,**kwargs)          self.base_domain = 'http://www.budejie.com'          self.page_queue = page_queue          self.joke_queue = joke_queue        def run(self):          while True:              if self.page_queue.empty():                  break              url = self.page_queue.get()              response = requests.get(url, headers=self.headers)              text = response.text              html = etree.HTML(text)              descs = html.xpath("//div[@class='j-r-list-c-desc']")              for desc in descs:                  jokes = desc.xpath(".//text()")                  joke = "n".join(jokes).strip()                  link = self.base_domain+desc.xpath(".//a/@href")[0]                  self.joke_queue.put((joke,link))              print('='*30+"第%s頁下載完成!"%url.split('/')[-1]+"="*30)    class BSWriter(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }        def __init__(self, joke_queue, writer,gLock, *args, **kwargs):          super(BSWriter, self).__init__(*args, **kwargs)          self.joke_queue = joke_queue          self.writer = writer          self.lock = gLock        def run(self):          while True:              try:                  joke_info = self.joke_queue.get(timeout=40)                  joke,link = joke_info                  self.lock.acquire()                  self.writer.writerow((joke,link))                  self.lock.release()                  print('保存一條')              except:                  break    def main():      page_queue = Queue(10)      joke_queue = Queue(500)      gLock = threading.Lock()      fp = open('bsbdj.csv', 'a',newline='', encoding='utf-8')      writer = csv.writer(fp)      writer.writerow(('content', 'link'))        for x in range(1,11):          url = 'http://www.budejie.com/text/%d' % x          page_queue.put(url)        for x in range(5):          t = BSSpider(page_queue,joke_queue)          t.start()        for x in range(5):          t = BSWriter(joke_queue,writer,gLock)          t.start()    if __name__ == '__main__':      main()  

END

想一起交流的夥伴可以加我微信哦(左),歡迎關注我的公眾號(右)