Python實用技巧大任務切分
- 2019 年 12 月 2 日
- 筆記

今天來說說,Python 中的任務切分。以爬蟲為例,從一個存 url 的 txt 文件中,讀取其內容,我們會獲取一個 url 列表。我們把這一個 url 列表稱為大任務。
列表切分
在不考慮內存佔用的情況下,我們對上面的大任務進行一個切分。比如我們將大任務切分成的小任務是每秒最多只訪問5個URL。
import os import time CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: result = [i.strip() for i in fs.readlines()] return result def fetch(url): print(url) def run(): max_count = 5 url_list = read_file() for index in range(0, len(url_list), max_count): start = time.time() fetch(url_list[index:index + max_count]) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
關鍵代碼都在for循環里,首先我們通過聲明range的第三個參數,該參數指定迭代的步長為5,這樣每次index增加都是以5為基數,即0,5,10。。。 然後我們對url_list做切片,每次取其五個元素,這五個元素會隨着index的增加不斷的在改變,如果最後不夠五個了,按照切片的特性這個時候就會有多少取多少了,不會造成索引超下標的問題。
隨着url列表的增加,我們會發現內存的佔用也在提高了。這個時候我們就需要對代碼進行修改了,我們知道生成器是比較節省內存的空間的,修改之後代碼變成,下面的這樣。
生成器切分
# -*- coding: utf-8 -*- # @時間 : 2019-11-23 23:47 # @作者 : 陳祥安 # @文件名 : g.py # @公眾號: Python學習開發 import os import time from itertools import islice CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) def read_file(): file_path = os.path.join(CURRENT_DIR, "url_list.txt") with open(file_path, "r", encoding="utf-8") as fs: for i in fs: yield i.strip() def fetch(url): print(url) def run(): max_count = 5 url_gen = read_file() while True: url_list = list(islice(url_gen, 0, max_count)) if not url_list: break start = time.time() fetch(url_list) end = time.time() - start if end < 1: time.sleep(1 - end) if __name__ == '__main__': run()
首先,我們修改了文件讀取的方式,把原來讀列表的形式,改為了生成器的形式。這樣我們在調用該文件讀取方法的時候大大節省了內存。
然後就是對上面for循環進行改造,因為生成器的特性,這裡不適合使用for進行迭代,因為每一次的迭代都會消耗生成器的元素,通過使用itertools的islice對url_gen進行切分,islice是生成器的切片,這裡我們每次切分出含有5個元素的生成器,因為生成器沒有__len__方法所以,我們將其轉為列表,然後判斷列表是否為空,就可以知道迭代是否該結束了。
修改之後的代碼,不管是性能還是節省內存上都大大的提高。讀取千萬級的文件不是問題。 除此之外,在使用異步爬蟲的時候,也許會用到異步生成器切片。下面就和大家討論,異步生成器切分的問題
異步生成器切分
首先先來看一個簡單的異步生成器。 我們知道調用下面的代碼會得到一個生成器
def foo(): for i in range(20): yield i
如果在def前面加一個async,那麼在調用的時候它就是個異步生成器了。 完整示例代碼如下
import asyncio async def foo(): for i in range(20): yield i async def run(): async_gen = foo() async for i in async_gen: print(i) if __name__ == '__main__': asyncio.run(run())
關於async for的切分有點複雜,這裡推薦使用aiostream模塊,使用之後代碼改為下面這樣
import asyncio from aiostream import stream async def foo(): for i in range(22): yield i async def run(): index = 0 limit = 5 while True: xs = stream.iterate(foo()) ys = xs[index:index + limit] t = await stream.list(ys) if not t: break print(t) index += limit if __name__ == '__main__': asyncio.run(run())