多任務爬蟲

  • 2020 年 3 月 31 日
  • 筆記

一、多任務簡介

1、為什麼要使用多任務爬蟲?
  • 在大量的url需要請求時,單執行緒/單進程去爬取,速度太慢,此時cpu不工作,浪費cpu資源。
  • 爬取與寫入文件分離,可以規避io操作,增加爬取速度,充分利用cpu。
2、多任務分類
  • 進程:進程是操作資源分配的最小單位,一個運行的程式,至少包括一個進程,進程之間數據不能共享。(利用多核)
  • 執行緒:執行緒是cpu調度的最小單位,一個進程中至少含有一個執行緒,執行緒中數據是共享的,如果多個執行緒操作同一個對象時,需要考慮數據安全問題。(爬蟲中最常用)
  • 協程:協程位於執行緒內部,如果一個執行緒中運行的程式碼,遇到IO操作時,切換到執行緒其他程式碼執行(最大程度的規避IO操作)
2、如何提高程式的運行速度
1、提高CPU的利用率

假如我們的程式有隻有一個執行緒,CPU就只處理這一個執行緒。如果在程式中遇到IO操作。此時CPU就不工作了。休息的這段時間,就浪費了CPU的資源。

若我們的程式是多執行緒的,CPU會在這多個任務之間切換,如果其中一個執行緒阻塞了,CPU不會休息,會處理其他執行緒。

2、增加CPU數量

一個CPU同一時間只能護理一個任務,若我們增加CPU數量,那麼多個CPU處理多個任務,也會提升程式的運行速度,例如使用多進程。

二、python中的threading模組(開啟多執行緒)

cpython解釋器下的 python中沒有真正的多執行緒(因為多個執行緒不能同時在多核上執行,只能在一個CPU上進行多個執行緒的切換輪流執行,在視覺效果上看起來同時在執行),造成這個情況的原因是因為GIL(全局性解釋器鎖),在一個進程中,多個執行緒是數據共享的,如果不設置全局解釋性鎖,多個執行緒可能在同一時間對同一個變數進行操作,造成變數的引用計數不正確,影響其進行垃圾回收,所以需要加全局性解釋器鎖。

2.1、多執行緒開啟方法
from threading import Thread  1、使用函數  t = Thread(  					target=執行緒執行的任務(方法)名字,  					args = 執行方法的參數,是一個元組  				)---創建執行緒  t.start()---啟動執行緒    2、使用類  class Mythread(Thread)  	def __init__(self,參數)  		self.參數=參數  		super(Mythread,self).__init__()    	def run(self):  		將需要多任務執行的程式碼,添加到此處    if __name__ == '__main__':      my =  Mythread(參數)      my.start()  
2.2、執行緒中常用的幾個方法
from threading import Thread, current_thread, enumerate, active_count  import time  import random      class MyThread(Thread):      def run(self):          time.sleep(random.random())          msg = "I'm" + self.name + "@" + str(i)  #self.name 當前執行緒名          print(msg)          print(current_thread().ident)  #當前執行緒的id號          print(current_thread().is_alive()) #當前執行緒是否存活      if __name__ == '__main__':      t_list=[]      for i in range(5):          t = MyThread()          t.start()          t_list.append(t)      while active_count() > 1:  #active_count() 當前存活執行緒數,包括主執行緒          print(enumerate()) #enumerate() 當前存活執行緒列表,包括主執行緒       for i  in t_list:          i.join() #join方法,會使非同步執行的多執行緒,變為同步執行,主執行緒會等i執行緒執行完,才會往下執行。  
2.3、守護執行緒

守護執行緒,當一個子執行緒設置為守護執行緒時,該子執行緒會等待其他非守護子執行緒和主執行緒執行完成後,結束執行緒。

from threading import Thread, current_thread  import time      def bar():      while True:          time.sleep(1)          print(current_thread().name)      def foo():      print(f'{current_thread().name}開始了...')      time.sleep(2)      print(f'{current_thread().name}結束了...')      if __name__ == '__main__':      t1 = Thread(target=bar)      t1.daemon = True #將t1設置為守護執行緒,      t1.start()      t2 = Thread(target=foo)      t2.start()    #執行結果  Thread-2開始了...  Thread-1  Thread-1  Thread-2結束了...  
2.4、鎖

在使用多執行緒爬蟲的時候,有時候多個執行緒會對同一個文件進行讀寫。造成數據不安全,下面是一個Tencent招聘的例子,在寫入excel文件中的時候,由於多個執行緒對同一個文件進行寫入操作,造成數據不安全。

import requests  from jsonpath import jsonpath  from excle_wirte import ExcelUtils  from threading import Thread  import os  from multiprocessing import Lock  import threading    def get_content(url):      headers = {          'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',          'referer': 'https://careers.tencent.com/search.html'      }      print(url)      res = requests.get(url, headers=headers).json()      jp = jsonpath(res, '$.*.Posts.*')      return jp      def write_excel(filename, item_list, sheetname):      if not os.path.exists(filename):          ExcelUtils.write_to_excel(filename, item_list, sheetname)      else:          ExcelUtils.append_to_excel(filename, item_list)      def main(i, lock):      base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1585401795646&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=20&language=zh-cn&area=cn'      content = get_content(base_url.format(i))      with lock:   #加鎖          write_excel('tencent.xls', content, 'hr')      if __name__ == '__main__':      lock = Lock()  #創建鎖      for i in range(1, 11):          t = Thread(target=main, args=(i, lock))          t.start()  
2.5、生產者與消費者模型

生產者和消費者問題是執行緒模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。

例子:Tencent招聘生產者與消費者版本,我這裡是用函數寫的,當然也可以用類來寫,會更加方便。
import requests  from jsonpath import jsonpath  from excle_wirte import ExcelUtils  from threading import Thread  import os  from multiprocessing import Lock  from queue import Queue    flag = False      def ger_url_list(num, url_queue):      base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1585401795646&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=20&language=zh-cn&area=cn'      for i in range(1, num + 1):          url_queue.put(base_url.format(i))      def producer(url_queue, content_queue):      headers = {          'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',          'referer': 'https://careers.tencent.com/search.html'      }      while True:          try:              url = url_queue.get_nowait()              res = requests.get(url, headers=headers).json()              jp = jsonpath(res, '$.*.Posts.*')              content_queue.put(jp)          except Exception as e:              break      def consumer(content_queue, lock, filename, sheetname):      while True:          if content_queue.empty() and flag:              break          try:              item_list = content_queue.get_nowait()              with lock:                  if not os.path.exists(filename):                      ExcelUtils.write_to_excel(filename, item_list, sheetname)                  else:                      ExcelUtils.append_to_excel(filename, item_list)          except Exception as e:              pass      if __name__ == '__main__':      p_t_list = []      url_queue = Queue()   #存放url的隊列      content_queue = Queue()  #網頁內容隊列      ger_url_list(10, url_queue)  #往url隊列添加url      lock = Lock() #創建鎖對象      for i in range(4): # 開啟四個執行緒來抓取網頁內容          p_t = Thread(target=producer, args=(url_queue, content_queue))          p_t.start()          p_t_list.append(p_t)      for i in range(4): #四個執行緒來解析內容和寫入文件          t = Thread(target=consumer, args=(content_queue, lock, 'tencent.xls', 'hr'))          t.start()      for i in p_t_list:          i.join()      flag=True #判斷標誌,用來判斷生產者是否生產完畢。    
2.6、多進程

多進程一般用於處理計算密集型任務,在爬蟲方面用的較少,因為多進程開啟數量依賴於CPU核心數,且多進程開啟作業系統需要為每個進程分配資源,效率不高。這裡只簡單說明python中使用的庫和使用方法,注意進程間不能之間進行數據交換,需要依賴於IPC(Inter-Process Communication)進程間通訊,提供了各種進程間通訊的方法進行數據交換),常用方法為 隊列和管道和Socket。當然還有第三方工具,例如RabbitMQredis

from multiprocessing import Process  1、使用函數  t = Thread(  					target=進程執行的任務(方法)名字,  					args = 執行方法的參數,是一個元組  				)---創建進程  t.start()---啟動進程    2、使用類  class MyProcess(Process)  	def __init__(self,參數)  		self.參數=參數  		super(Mythread,self).__init__()    	def run(self):  		將需要多任務執行的程式碼,添加到此處    if __name__ == '__main__':      my =  MyProcess(參數)      my.start()  

multiprocessing這個庫中有很多於多進程相關對象

from multiprocessing import Queue, Pipe, Pool,等  Queue:隊列  Pipe:管道  Pool:池(有另外的模組,統一了進程池,執行緒池的介面,使用更加方便)  

三、池

3.1、什麼是池

池,包括執行緒池與進程池,一個池內,可以含有指定的執行緒數,或者是進程數,多個任務,從中拿取執行緒/進程執行任務,執行完成後,下一個任務再從池中拿取執行緒/進程。直到所有任務都執行完畢。

3.2、為什麼使用池
  • 可以比較好的控制開啟執行緒/執行緒的數量,在提升效率的同時又控制住資源開銷。
  • 可以指定回調函數,很方便的處理返回數據
3.2、池的簡單使用,以進程池為例,執行緒池一樣的操作。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor      def fun(i):      return i ** 2      def pr(con):      p = con.result()      print(p)      if __name__ == '__main__':      p_pool = ProcessPoolExecutor(max_workers=4)  #創建一個含有四個進程的池      for i in range(10): #10個任務          p = p_pool.submit(fun, i)  #任務提交          p.add_done_callback(pr)  #指定回調函數      p_pool.shutdown()#關閉池  #執行結果  0  1  4  9  16  25  36  49  64  81  
3.3、池map方法使用,適合於簡單參數
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor      def fun(i):      return i ** 2    if __name__ == '__main__':      p_pool = ProcessPoolExecutor(max_workers=4)      p = p_pool.map(fun, range(10))      print(list(p)) #map方法返回的是一個生成器,可通過強轉或者循環取值。    #執行結果  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]