Python自動化開發學習-爬蟲3
- 2020 年 1 月 6 日
- 筆記
爬取多個網頁
講師的博客:https://www.cnblogs.com/wupeiqi/p/6229292.html 在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引起等待,從而使得請求整體變慢。 比如找10個國外的資源爬取,慢的效果就很明顯。
串行的方式
直接用一個for循環,把所有的請求串起來一次執行。這樣的效率會很低:
import requests from bs4 import BeautifulSoup url_list = [ 'https://github.com/explore', 'https://www.djangoproject.com/', 'http://www.python-requests.org/en/master/', 'https://jquery.com/', 'https://getbootstrap.com/', 'https://www.solarwinds.com/', 'https://www.zabbix.com/', 'http://open-falcon.org/', 'https://www.python.org/', 'http://www.jetbrains.com/', ] if __name__ == '__main__': for url in url_list: r = requests.get(url) r.encoding = 'utf-8' soup = BeautifulSoup(r.text, features='html.parser') title = soup.find('title') print(title)
多線程(多進程)
下面是使用線程池(進程池)實現的方式。這裡多進程和多線程的效果一樣,但是線程更省資源。
import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor # from concurrent.futures import ProcessPoolExecutor # 進程池 url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] def fetch_request(url): r = requests.get(url) r.encoding = 'utf-8' soup = BeautifulSoup(r.text, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': pool = ThreadPoolExecutor(10) # pool = ProcessPoolExecutor(10) # 進程池 for url in url_list: pool.submit(fetch_request, url) pool.shutdown(True)
多線程 + 回調函數
上面的例子用到的模塊,還支持使用回調函數,把代碼稍稍改一下:
import requests from bs4 import BeautifulSoup from concurrent.futures import ProcessPoolExecutor url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] def fetch_request(url): response = requests.get(url) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, features='html.parser') title = soup.find('title') return str(title) # 這裡返回的,就是下面回調函數的入參。不轉str會報錯 def callback(result): print(result.result()) if __name__ == '__main__': pool = ProcessPoolExecutor(10) for url in url_list: v = pool.submit(fetch_request, url) v.add_done_callback(callback) pool.shutdown(True)
多進程和多線程的回調函數用法也是一樣的。 這裡簡單的需求,是不需要用到回調函數。不過作為線程池的一個用法,多一個示例。
異步IO
多線程和多進程的缺點是在IO阻塞時會造成了線程和進程的浪費,所以異步IO是更好的方式。 異步IO請求的本質則是非阻塞Socket + IO多路復用。這裡只需要一個線程,而每一個請求則是一個協程。 下面就是各種Python內置以及第三方提供的異步IO請求模塊。這些模塊,使用簡便,大大提高效率。
asyncio 模塊
這個是內置模塊 先看下模塊是怎麼調用的。這裡是python3.4版本的用法,到3.5版本有新的 async/await 關鍵字可以用。不過向下兼容,舊的裝飾器的 asyncio/yield from 的用法還是可以使用的。 用法示例:
import asyncio @asyncio.coroutine def func(n): print('before func %s...' % n) yield from asyncio.sleep(3) print('end func %s...' % n) if __name__ == '__main__': tasks = [] for i in range(5): tasks.append(func(i)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
這裡注意一下裝飾器和被它裝飾的函數。在tasks.append()里,添加的是函數的調用,添加的是func()而不是func,帶括號的。所以一般情況下是要執行這個函數。當然這裡給函數加了裝飾器,就不會直接執行了,而是等到下面在執行的。
通過TCP發送HTTP請求 asyncio模塊只提供了發送tcp的功能,無法直接發送http請求。不過在理解了Web服務的本質的基礎上,http本質上還是tcp。http請求還是通過tcp發送字符串,只是字符串有特定的格式。字符串分為請求頭和請求體,請求頭和請求體之間使用 "/r/n/r/n" 分隔,而請求頭和請求頭之間使用 "/r/n" 分隔。下面就是一個基本的GET請求的格式:
""" GET /index HTTP/1.0rn HOST: 1.1.1.1 rnrn """
所以只要按上面的方式對字符串進行封裝,然後通過tcp發送,這就是http了。下面這個就是用 asyncio 手動封裝http報頭的示例:
import asyncio from bs4 import BeautifulSoup url_list = [ ('www.python-requests.org', '/en/master/'), ('open-falcon.org', '/'), ('www.jetbrains.com', '/'), ('www.nga.cn', '/'), ('edu.51cto.com', '/'), ] @asyncio.coroutine def fetch_async(host, url): reader, writer = yield from asyncio.open_connection(host, 80) # 建立tcp連接 request_header_content = "GET %s HTTP/1.0rnHost: %srnrn" % (url, host) # 這個是GET請求 request_header_content = request_header_content.encode('utf-8') # 最終發送的是bytes類型 writer.write(request_header_content) # 發出請求 yield from writer.drain() text = yield from reader.read() # 接收到的當然也是bytes類型 text = text.decode('utf-8') soup = BeautifulSoup(text, features='html.parser') title = soup.find('title') print(title) writer.close() if __name__ == '__main__': tasks = [] for host, url in url_list: tasks.append(fetch_async(host, url)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
通過TCP發送HTTPS 上面這樣只能發http請求。https主要是2個差別,默認的端口號是443,還有就是需要ssl。好在 asyncio.open_connection
是提供支持ssl的,只需要加上ssl=True的參數(這個參數的默認是False,所以上面不用指定)。下面是支持https的版本:
import asyncio from bs4 import BeautifulSoup url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] @asyncio.coroutine def fetch_async(host, url='/', port=80, ssl=False): reader, writer = yield from asyncio.open_connection(host, port, ssl=ssl) # 建立tcp連接 request_header_content = "GET %s HTTP/1.0rnHost: %srnrn" % (url, host) # 這個是GET請求 request_header_content = request_header_content.encode('utf-8') # 最終發送的是bytes類型 writer.write(request_header_content) # 發出請求 yield from writer.drain() text = yield from reader.read() # 接收到的當然也是bytes類型 text = text.decode('utf-8') soup = BeautifulSoup(text, features='html.parser') title = soup.find('title') print(title) writer.close() if __name__ == '__main__': from urllib.parse import urlparse tasks = [] for url in url_list: url_parse = urlparse(url) if url_parse.scheme == "https": tasks.append(fetch_async(url_parse.netloc, url_parse.path, 443, True)) else: tasks.append(fetch_async(url_parse.netloc, url_parse.path)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
asyncio + aiohttp
講師博客里的代碼是版本的問題,運行不了會報錯。因為從 python3.5 開始,引入了 async/await 。大概記錄一下原因:
在Python3.5以後,原生協程不能用於迭代,未被裝飾的生成器不能yield from一個原生協程
什麼是原生協程?用async關鍵字定義的就是原生線程。asyncio是Python 3.4版本引入的標準庫,是用裝飾器的方式來定義協程的(上面的例子就是)。到了python3.5版本,引入了async關鍵字來定義協程,並且向下兼容,之前的裝飾器的方法也能用。 再來看一下aiohttp模塊。粗略的看一下源碼,舊版本(2.x及之前),用的是 asyncio/yield from 。3.x版本開始,都改用 async/await 了。舊版的 yield from 是不能調用新版的用async關鍵字定義的原生協程的,所以會報錯。 之前的例子用的是 asyncio/yield from ,但是這裡的 aishttp 用的是 async/await ,所以無法再用 yield from 了。下面是用 async/await 的例子:
import aiohttp import asyncio from bs4 import BeautifulSoup url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] async def fetch_async(url): async with aiohttp.request('GET', url) as r: text = await r.text('utf-8') soup = BeautifulSoup(text, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': tasks = [] for url in url_list: tasks.append(fetch_async(url)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
後面的例子還會繼續用到 asyncio/yield from ,而且這個例子也不好找。 不過 async/await 才是推薦的用法,好在改一下也不難,而且網上例子也多。
asyncio + requests
import asyncio import requests from bs4 import BeautifulSoup url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] @asyncio.coroutine def fetch_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future response.encoding = 'utf-8' soup = BeautifulSoup(response.text, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': tasks = [] for url in url_list: tasks.append(fetch_async(requests.get, url)) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
gevent + requests
from bs4 import BeautifulSoup import gevent from gevent import monkey monkey.patch_all() # 必須放在requests模塊導入前 import requests url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] def fetch_request(url): r = requests.get(url) r.encoding = 'utf-8' soup = BeautifulSoup(r.text, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': g_list = [] for url in url_list: g_list.append(gevent.spawn(fetch_request, url=url)) gevent.joinall(g_list)
grequests
grequests 模塊,就是 gevent + requests 。有人用代碼又把這兩個模塊再封裝了一層。就寫個例子:
import grequests from bs4 import BeautifulSoup url_list = [ 'https://github.com/explore', # 省略多個url 'http://www.jetbrains.com/', ] def exception_handler(request, exception): print(request, exception) print("Request failed") def callback(r, *args, **kwargs): r.encoding = 'utf-8' soup = BeautifulSoup(r.text, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': request_list = [grequests.get(url, timeout=10, callback=callback) for url in url_list] response_list = grequests.map(request_list, exception_handler=exception_handler, gtimeout=10) print(response_list)
之前用for循環寫列表太Low了,這裡用列表生成式的寫法。grequests.get里的timeout是單個任務的超時時間,grequests.map里的gtimeout則是整體任務的超時時間。 exception_handler方法是請求有異常時的處理方法。如果單個任務超時,就會拋出異常,如果任務整體超時,則還沒有結束的任務返回None,沒有異常。
Twisted
直接安裝模塊會報錯,去官網翻了一下 http://twistedmatrix.com 。找到了pip的安裝方法
The recommended way is to run pip install Twisted, preferably inside a virtualenv. On Linux, and BSDs, you will need a C compiler (such as GCC). On macOS you will need to run xcode-select –install. If you are installing on Windows, pip install Twisted[windows_platform] will install the Windows-specific requirements.
所以應該用下面的命令,安裝windwos用的版本:
pip install -i https://mirrors.163.com/pypi/simple Twisted[windows_platform]
但是還是不行,錯誤信息如下:
error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
Twisted 模塊安裝 最終在網上找到解決方法,就是本地安裝。先去下載這個模塊對應版本的whl文件: https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted 然後用 pip 執行本地安裝:
pip install E:/Downloads/Twisted-18.9.0-cp36-cp36m-win_amd64.whl
發GET請求
from bs4 import BeautifulSoup from twisted.web.client import getPage, defer from twisted.internet import reactor url_list = [ 'https://github.com/explore', # 略多個url 'http://www.jetbrains.com/', ] def all_done(arg): reactor.stop() def callback(contents): soup = BeautifulSoup(contents, features='html.parser') title = soup.find('title') print(title) if __name__ == '__main__': deferred_list = [] for url in url_list: deferred = getPage(url.encode('utf-8')) # 發請求 deferred.addCallback(callback) # 請求返回後的回調函數 deferred_list.append(deferred) # 把所有的請求加到列表裡,後面要檢測 dlist = defer.DeferredList(deferred_list) # 檢測所有的請求 dlist.addBoth(all_done) # 檢測到所有請求都執行完,執行的方法 reactor.run() # 開啟一個死循環,不停的執行,all_done函數里的stop()方法會停止這個循環
發POST請求
from twisted.internet import reactor from twisted.web.client import getPage import urllib.parse def one_done(arg): print(arg) print(arg.decode()) reactor.stop() post_data = urllib.parse.urlencode({'check_data': 'TEST'}) post_data = post_data.encode('utf-8') headers = {b'Content-Type': b'application/x-www-form-urlencoded'} response = getPage(b'http://dig.chouti.com/login', method=b'POST', postdata=post_data, cookies={}, headers=headers) response.addBoth(one_done) reactor.run()
tornado
這裡只有個例子,之後可能還要再學一下:
from bs4 import BeautifulSoup from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop url_list = [ 'https://github.com/explore', 'https://www.djangoproject.com/', 'http://www.python-requests.org/en/master/', 'https://jquery.com/', 'https://getbootstrap.com/', 'https://www.solarwinds.com/', 'https://www.zabbix.com/', 'http://open-falcon.org/', 'https://www.python.org/', 'http://www.jetbrains.com/', ] def asynchronous_fetch(): http_client = AsyncHTTPClient() # 創建一個函數內的函數,來處理返回的結果 def handle_response(response): """ 處理返回值內容(需要維護計數器,來停止IO循環),調用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: # print(response.headers) # print(response.body) soup = BeautifulSoup(response.body, features='html.parser') title = soup.find('title') print(title) # 自己加的停止的方法,實現方法可能不是很正規 # print(response.effective_url) curr_url = response.effective_url if curr_url in url_list: url_list.remove(curr_url) if not url_list: ioloop.IOLoop.current().stop() for url in url_list: # 異步處理結束後會調用指定的callback的函數 http_client.fetch(HTTPRequest(url), callback=handle_response) # 下面這句和上面效果一樣,模塊內部會判斷參數的isinstance是否是HTTPRequest # 如果不是則,HTTPRequest(url, **kwargs) # 這裡的**kwargs,就是如果要給請求加任何參數,就用關鍵參數傳參 # http_client.fetch(url, callback=handle_response) if __name__ == '__main__': ioloop.IOLoop.current().add_callback(asynchronous_fetch) ioloop.IOLoop.current().start()