Python 多线程爬虫实战

  • 2020 年 2 月 13 日
  • 笔记

Queue线程安全队列解释:

在线程中,访问一些全局变量,加锁是一个经常的过程。如果你是想把一些数据存储到某个队列中,那么Python内置了一个线程安全的模块叫做queue模块。Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。相关的函数如下:

  1. 初始化Queue(maxsize):创建一个先进先出的队列。
  2. qsize():返回队列的大小。
  3. empty():判断队列是否为空。
  4. full():判断队列是否满了。
  5. get():从队列中取最后一个数据。
  6. put():将一个数据放到队列中。

使用生产者与消费者模式多线程下载表情包:


import threading  import requests  from lxml import etree  from urllib import request  import os  import re  from queue import Queue    class Producer(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }      def __init__(self,page_queue,img_queue,*args,**kwargs):          super(Producer, self).__init__(*args,**kwargs)          self.page_queue = page_queue          self.img_queue = img_queue          def run(self):          while True:              if self.page_queue.empty():                  break              url = self.page_queue.get()              self.parse_page(url)        def parse_page(self,url):          response = requests.get(url,headers=self.headers)          text = response.text          html = etree.HTML(text)          imgs = html.xpath("//div[@class='page-content text-center']//a//img")          for img in imgs:              if img.get('class') == 'gif':                  continue              img_url = img.xpath(".//@data-original")[0]              suffix = os.path.splitext(img_url)[1]              alt = img.xpath(".//@alt")[0]              alt = re.sub(r'[,。??,/\·]','',alt)              img_name = alt + suffix              self.img_queue.put((img_url,img_name))    class Consumer(threading.Thread):      def __init__(self,page_queue,img_queue,*args,**kwargs):          super(Consumer, self).__init__(*args,**kwargs)          self.page_queue = page_queue          self.img_queue = img_queue        def run(self):          while True:              if self.img_queue.empty():                  if self.page_queue.empty():                      return              img = self.img_queue.get(block=True)              url,filename = img              request.urlretrieve(url,'images/'+filename)              print(filename+'  下载完成!')    def main():      page_queue = Queue(100)      img_queue = Queue(500)      for x in range(1,101):          url = "http://www.doutula.com/photo/list/?page=%d" % x          page_queue.put(url)        for x in range(5):          t = Producer(page_queue,img_queue)          t.start()        for x in range(5):          t = Consumer(page_queue,img_queue)          t.start()    if __name__ == '__main__':      main()      

GIL全局解释器锁:

Python自带的解释器是CPythonCPython解释器的多线程实际上是一个假的多线程(在多核CPU中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中有一个东西叫做GIL(Global Intepreter Lock),叫做全局解释器锁。这个解释器锁是有必要的。因为CPython解释器的内存管理不是线程安全的。当然除了CPython解释器,还有其他的解释器,有些解释器是没有GIL锁的,见下面:

  1. Jython:用Java实现的Python解释器。不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/Jython
  2. IronPython:用.net实现的Python解释器。不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/IronPython
  3. PyPy:用Python实现的Python解释器。存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/PyPy GIL虽然是一个假的多线程。但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程

多线程下载百思不得姐段子:


import requests  from lxml import etree  import threading  from queue import Queue  import csv      class BSSpider(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }      def __init__(self,page_queue,joke_queue,*args,**kwargs):          super(BSSpider, self).__init__(*args,**kwargs)          self.base_domain = 'http://www.budejie.com'          self.page_queue = page_queue          self.joke_queue = joke_queue        def run(self):          while True:              if self.page_queue.empty():                  break              url = self.page_queue.get()              response = requests.get(url, headers=self.headers)              text = response.text              html = etree.HTML(text)              descs = html.xpath("//div[@class='j-r-list-c-desc']")              for desc in descs:                  jokes = desc.xpath(".//text()")                  joke = "n".join(jokes).strip()                  link = self.base_domain+desc.xpath(".//a/@href")[0]                  self.joke_queue.put((joke,link))              print('='*30+"第%s页下载完成!"%url.split('/')[-1]+"="*30)    class BSWriter(threading.Thread):      headers = {          'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'      }        def __init__(self, joke_queue, writer,gLock, *args, **kwargs):          super(BSWriter, self).__init__(*args, **kwargs)          self.joke_queue = joke_queue          self.writer = writer          self.lock = gLock        def run(self):          while True:              try:                  joke_info = self.joke_queue.get(timeout=40)                  joke,link = joke_info                  self.lock.acquire()                  self.writer.writerow((joke,link))                  self.lock.release()                  print('保存一条')              except:                  break    def main():      page_queue = Queue(10)      joke_queue = Queue(500)      gLock = threading.Lock()      fp = open('bsbdj.csv', 'a',newline='', encoding='utf-8')      writer = csv.writer(fp)      writer.writerow(('content', 'link'))        for x in range(1,11):          url = 'http://www.budejie.com/text/%d' % x          page_queue.put(url)        for x in range(5):          t = BSSpider(page_queue,joke_queue)          t.start()        for x in range(5):          t = BSWriter(joke_queue,writer,gLock)          t.start()    if __name__ == '__main__':      main()  

END

想一起交流的伙伴可以加我微信哦(左),欢迎关注我的公众号(右)