Python多任務之協程
- 2019 年 10 月 15 日
- 筆記
前言
協程的核心點在於協程的使用,即只需要了解怎麼使用協程即可;但如果你想了解協程是怎麼實現的,就需要了解依次了解可迭代,迭代器,生成器了;
如果你只想看協程的使用,那麼只需要看第一部分內容就行了;如果如果想理解協程,可以按照順序依次閱讀本博文,或者按照 迭代器-生成器-協程的順序閱讀。
協程
- yield生成器是特殊的迭代器;
- greenlet 對 yield 進行了封裝;
- 而 gevent 對 greenlet 進行了封裝;
- gevent 遇見延時操作就換任務執行,這裡的延時操作可以是等待服務器資源或者sleep等等;
上面的概念會在後面的知識點進行講解;
greenlet實現多任務
要使用greenlet,首先要安裝greenlet
pip3 install greenlet
greenlet實現多任務代碼

from greenlet import greenlet import time def task1(): while 1: print("---1---") gr2.switch() time.sleep(1) def task2(): while 1: print("---2---") gr1.switch() time.sleep(1) gr1 = greenlet(task1) gr2 = greenlet(task2) # 切換到gr1中執行 gr1.switch()
greenlet實現多任務
但注意,這裡其實是一個單線程;並且經過測試,這裡最後幾句不能使用 __main__ ,否則會報錯;
gevent實現多任務
可以看到,greenlet已經可以實現協程了,但需要我們手動進行任務切換,這樣會很麻煩,因此我們要學習gevent,在greenlet的基礎上進行了封裝,可以幫助我們實現自動切換任務;
要使用gevent,使用要進行安裝
pip3 install gevent
gevent實現多任務代碼

import time import gevent def test1(n): for i in range(n): print("---test1---", gevent.getcurrent(), i) # time.sleep(0.5) # 這裡使用time的sleep並不會因為耗時導致切換任務 gevent.sleep(0.5) def test2(n): for i in range(n): print("---test2---", gevent.getcurrent(), i) # time.sleep(0.5) # 這裡使用time的sleep並不會因為耗時導致切換任務 gevent.sleep(0.5) def test3(n): for i in range(n): print("---test3---", gevent.getcurrent(), i) # time.sleep(0.5) # 這裡使用time的sleep並不會因為耗時導致切換任務 gevent.sleep(0.5) g1 = gevent.spawn(test1, 5) g2 = gevent.spawn(test2, 5) g3 = gevent.spawn(test3, 5) g1.join() g2.join() g3.join()
gevent實現多任務.py
運行結果:

---test1--- <Greenlet at 0x1e9e64c2598: test1(5)> 0 ---test2--- <Greenlet at 0x1e9e64c26a8: test2(5)> 0 ---test3--- <Greenlet at 0x1e9e64c27b8: test3(5)> 0 ---test1--- <Greenlet at 0x1e9e64c2598: test1(5)> 1 ---test2--- <Greenlet at 0x1e9e64c26a8: test2(5)> 1 ---test3--- <Greenlet at 0x1e9e64c27b8: test3(5)> 1 ---test1--- <Greenlet at 0x1e9e64c2598: test1(5)> 2 ---test2--- <Greenlet at 0x1e9e64c26a8: test2(5)> 2 ---test3--- <Greenlet at 0x1e9e64c27b8: test3(5)> 2 ---test1--- <Greenlet at 0x1e9e64c2598: test1(5)> 3 ---test2--- <Greenlet at 0x1e9e64c26a8: test2(5)> 3 ---test3--- <Greenlet at 0x1e9e64c27b8: test3(5)> 3 ---test1--- <Greenlet at 0x1e9e64c2598: test1(5)> 4 ---test2--- <Greenlet at 0x1e9e64c26a8: test2(5)> 4 ---test3--- <Greenlet at 0x1e9e64c27b8: test3(5)> 4
運行結果
注意,在gevent中如果要使用sleep(),必須要使用 gevent.sleep();
存在一個問題當我們創建g1,g2,g3時,如果不小心全部創建了g1,結果和沒寫錯幾乎是一樣的;
問題版運行結果

g1 = gevent.spawn(test1, 5) g2 = gevent.spawn(test2, 5) g3 = gevent.spawn(test3, 5) g1.join() g1.join() g1.join() ---test1--- <Greenlet at 0x17d8ef12598: test1(5)> 0 ---test2--- <Greenlet at 0x17d8ef126a8: test2(5)> 0 ---test3--- <Greenlet at 0x17d8ef127b8: test3(5)> 0 ---test1--- <Greenlet at 0x17d8ef12598: test1(5)> 1 ---test2--- <Greenlet at 0x17d8ef126a8: test2(5)> 1 ---test3--- <Greenlet at 0x17d8ef127b8: test3(5)> 1 ---test1--- <Greenlet at 0x17d8ef12598: test1(5)> 2 ---test2--- <Greenlet at 0x17d8ef126a8: test2(5)> 2 ---test3--- <Greenlet at 0x17d8ef127b8: test3(5)> 2 ---test1--- <Greenlet at 0x17d8ef12598: test1(5)> 3 ---test2--- <Greenlet at 0x17d8ef126a8: test2(5)> 3 ---test3--- <Greenlet at 0x17d8ef127b8: test3(5)> 3 ---test1--- <Greenlet at 0x17d8ef12598: test1(5)> 4 ---test2--- <Greenlet at 0x17d8ef126a8: test2(5)> 4 ---test3--- <Greenlet at 0x17d8ef127b8: test3(5)> 4
問題版運行結果
協程的核心在於利用延時操作去做其他的任務;
給gevent打補丁
當我們使用gevent的時候,如果要延時操作,比如等待網絡資源或者time.sleep(),必須要使用 gevent.sleep(),即每處延時操作都需要改成gevent的延時;如果我們想,還是按照原來的寫法,並且使用gevent,怎麼實現呢?這個實收,我們解疑使用打補丁的方法。只需要給使用gevent的代碼添加如下一行代碼即可完成打補丁
from gevent import monkey monkey.patch_all()
使用打補丁的方式完成協程的使用

import time import gevent from gevent import monkey monkey.patch_all() def test1(n): for i in range(n): print("---test1---", gevent.getcurrent(), i) time.sleep(0.5) # 在打補丁的情況下等效於 gevent.sleep(0.5) def test2(n): for i in range(n): print("---test2---", gevent.getcurrent(), i) time.sleep(0.5) def test3(n): for i in range(n): print("---test3---", gevent.getcurrent(), i) time.sleep(0.5) g1 = gevent.spawn(test1, 5) g2 = gevent.spawn(test2, 5) g3 = gevent.spawn(test3, 5) g1.join() g2.join() g3.join()
給gevent打補丁.py
給gevent打補丁,使time.sleep(1)之類的耗時操作等效於gevent.sleep(1);
gevent.joinall()的使用
如果我們有很多函數要調用,那麼豈不是得每次都先創建,在join(),gevent提供了一種簡便方式;

import time import gevent from gevent import monkey monkey.patch_all() def test1(n): for i in range(n): print("---test1---", gevent.getcurrent(), i) time.sleep(0.5) # 在打補丁的情況下等效於 gevent.sleep(0.5) def test2(n): for i in range(n): print("---test2---", gevent.getcurrent(), i) time.sleep(0.5) def test3(n): for i in range(n): print("---test3---", gevent.getcurrent(), i) time.sleep(0.5) gevent.joinall([ gevent.spawn(test1, 5), # 括號內前面的是函數名,後面的是傳參 gevent.spawn(test2, 5), gevent.spawn(test3, 5), ])
gevent.joinall()的使用.py
協程使用小案例-圖片下載器

import urllib.request import gevent from gevent import monkey monkey.patch_all() def img_download(img_name, img_url): req = urllib.request.urlopen(img_url) data = req.read() with open("images/"+img_name, "wb") as f: f.write(data) def main(): gevent.joinall([ gevent.spawn(img_download, "1.jpg", "https://rpic.douyucdn.cn/live-cover/appCovers/2019/05/13/6940298_20190513113912_small.jpg"), gevent.spawn(img_download, "2.jpg", "https://rpic.douyucdn.cn/asrpic/190513/2077143_6233919_0d516_2_1818.jpg"), gevent.spawn(img_download, "3.jpg", "https://rpic.douyucdn.cn/live-cover/appCovers/2018/11/24/1771605_20181124143723_small.jpg") ]) if __name__ == "__main__": main()
協程的使用-圖片下載器.py
進程,線程,線程對比
區別
- 進程是資源分配的單位
- 線程是操作系統調度的單位
- 進程切換需要的資源很最大,效率很低
- 線程切換需要的資源一般,效率一般(當然了在不考慮GIL的情況下)
- 協程切換任務資源很小,效率高
- 多進程、多線程根據cpu核數不一樣可能是並行的,但是協程是在一個線程中 所以是並發。
- 多進程耗費的資源最多;
- 當我們python3運行一個py文件時,就是運行一個進程,進程中有一個默認的線程就是主線程,主線程拿着代碼去執行;即進程是資源分配的單位,而線程才是真正拿着資源去執行,操作系統真正調度的就是線程;
- 一個進程裏面有兩個線程就是我們說的多線程的多任務方式,第二種多任務方式是多進程中有多線程;
- 線程的一大特點是可以利用某個線程在等待某個資源到來的時間去執行其他的任務;
- 在不考慮GIL的情況下,優先考慮協程,再考慮線程,再考慮進程;
- 進程是最穩定的,一個進程出問題了不會影響其他的進程,但耗費的資源較大;線程在切換任務時耗費的資源較線程少;協程可以利用線程在等待的時間做其他的事;
迭代器
迭代是訪問集合元素的一種方式。迭代器是一個可以記住遍歷的位置的對象。迭代器對象從集合的第一個元素開始訪問,直到所有的元素被訪問完結束。迭代器只能往前不會後退。
- 要理解協程的使用,首先要了解生成器;
- 要了解生成器,首先要理解迭代器;
推薦原來看過的一篇博客:一文徹底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念 ,不過和本文關係不大,哈哈~
在了解迭代器之前,我們來認識兩個單詞
Iterable 可迭代的/可迭代/可迭代對象 Iterator 迭代器
可迭代
迭代器引入-for循環
In [1]: for i in [11,22,33]: ...: print(i) 11 22 33 In [2]: for i in "hhh": ...: print(i) h h h In [3]: for i in 10: ...: print(i) ...: --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-3-309758a01ba4> in <module>() ----> 1 for i in 10: 2 print(i) 3 TypeError: 'int' object is not iterable # “int”對象不可迭代
使用for循環時,in後面的數據類型是可迭代的 才可以使用for循環,例如元組,列表,字符串等;不可迭代的,例如數字,小數點的;
判斷是否可迭代
- 判斷某個東西是否可迭代的,可以通過判斷該數據類型是否為 Iterable 的子類,如果是則為可迭代;
- isinstance 可以用來判斷某對象是否是某類創建出來的;
- 比如我們要判斷 a是否為A類創建出來的,可以使用 isinstance(a, A)進行判斷;返回值為True,代表可迭代;
判斷列表是否是可迭代的:
from collections import Iterable isinstance([11,22,33], Iterable) True
isinstance判斷數據類型是否可迭代
In [6]: from collections import Iterable In [7]: isinstance([11,22], Iterable) Out[7]: True In [8]: isinstance((11,22), Iterable) Out[8]: True In [9]: isinstance(10, Iterable) Out[9]: False
元組,列表,字符串都是可迭代的;數字,小數不可迭代;
我們把可以通過for…in…這類語句迭代讀取一條數據供我們使用的對象稱之為可迭代對象(Iterable)。
自己定義的一個類,判斷能不能用for?
自己創建一個類,滿足能用for循環遍歷的需求
不可迭代
class Classmate(object): """docstring for Classmate""" def __init__(self): self.names = list() def add(self, name): self.names.append(name) classmate = Classmate() classmate.add("張三") classmate.add("李四") classmate.add("王五") for name in classmate: print(name) # TypeError: 'Classmate' object is not iterable
可迭代對象本質
我們分析對可迭代對象進行迭代使用的過程,發現每迭代一次(即在for…in…中每循環一次)都會返回對象中的下一條數據,一直向後讀取數據直到迭代了所有數據後結束。那麼,在這個過程中就應該有一個“人”去記錄每次訪問到了第幾條數據,以便每次迭代都可以返回下一條數據。我們把這個能幫助我們進行數據迭代的“人”稱為迭代器(Iterator)。
可迭代對象的本質就是可以向我們提供一個這樣的中間“人”即迭代器幫助我們對其進行迭代遍歷使用。
可迭代對象通過__iter__方法向我們提供一個迭代器,我們在迭代一個可迭代對象的時候,實際上就是先獲取該對象提供的一個迭代器,然後通過這個迭代器來依次獲取對象中的每一個數據.
那麼也就是說,一個具備了__iter__方法的對象,就是一個可迭代對象。
如果你不理解上面的話,沒關係,你只需要知道 “如果想要將自己定義的一個類變為可迭代的,那麼只需要在這個類中定義一個 __iter__ 方法即可”。
添加__iter__方法
class Classmate(object): """docstring for Classmate""" def __init__(self): self.names = list() def add(self, name): self.names.append(name) def __iter__(self): pass classmate = Classmate() classmate.add("張三") classmate.add("李四") classmate.add("王五") for name in classmate: print(name) # TypeError: iter() returned non-iterator of type 'NoneType' # iter()返回“NoneType”類型的非迭代器
注意,這個時候的classmate已經是可迭代對象了,可以用isinstance(classmate, Iterable)驗證;
但如果將__iter__()方法注釋掉,就不是可迭代對象了,所以可以驗證,要成為可迭代對象的第一步是添加__iter__()方法;
可迭代與迭代器
可迭代與迭代器
- 一個對象中有 __iter__ 方法,叫做 可以迭代;
- 如果一個對象中有 __iter__ 方法,並且 __iter__ 方法返回一個另一個對象的引用,而返回的對象中又包含 __iter__ 和 __next__ 方法,那麼這個返回的對象叫做 迭代器;
- 只要有了迭代器,那麼for方法就會通過迭代器中的 __next__ 方法來取值,每 for 循環一次,就調用一次 __next__ 方法;
- 使用 iter(xxxobj) 會自動調用 xxxobj 中的 __iter__ 方法,__iter__ 方法返回一個迭代器;
- next(可迭代實例對象即 __iter__ 方法返回一個迭代器),會自動去迭代器中調用 __next__ 方法;
- 一個可迭代的不一定是個迭代器;
- 一個迭代器一定可迭代;
- (可迭代–裏面有__iter__方法,迭代器–裏面有__iter__和__next__方法);
判斷是否可迭代
以下列代碼為例
for i in classmate
流程:
- 1.判斷 classmate是否為可迭代的,即是否包含 __iter__ 方法;
- 2.如果第一步是可迭代的,那麼就調用 iter(classmate) 即去調用 classmate 類中的 __iter__ 方法,返回一個迭代器,取返回值;
- 3.每 for 循環一次就去調用返回值中的 __next__ 方法一次,__next__ 返回什麼,就給i什麼;
自定義使用for循環步驟
- 1.在類中添加 __iter__ 方法;
- 2.__iter__ 方法返回一個對象的引用,這個對象必須包含 __iter__ 和 __next__ 方法;
- 3.在包含 __iter__ 和 __next__ 方法的類中,編寫 __next__ 方法返回值;
for…in…循環的本質
for item in Iterable
循環的本質就是先通過iter()函數獲取可迭代對象Iterable的迭代器,然後對獲取到的迭代器不斷調用next()方法來獲取下一個值並將其賦值給item,當遇到StopIteration的異常後循環結束。
完善自定義迭代器
一個實現了__iter__方法和__next__方法的對象,就是迭代器。
讓迭代器可以完整返回所有的數據;

import time from collections.abc import Iterable, Iterator class Classmate(object): def __init__(self): self.names = list() def add(self, name): self.names.append(name) def __iter__(self): return ClassmateIterable(self) class ClassmateIterable(object): def __init__(self, obj): self.obj = obj self.num = 0 def __iter__(self): pass def __next__(self): # return self.obj.names[0] try: ret = self.obj.names[self.num] self.num += 1 return ret except IndexError as e: raise StopIteration def main(): classmate = Classmate() classmate.add("張三") classmate.add("李四") classmate.add("王五") print("判斷classmate是否為可迭代的:", isinstance(classmate, Iterable)) classmate_iterator = iter(classmate) print("判斷classmate_iterator是否為迭代器:", isinstance(classmate_iterator, Iterator)) # 調用一次 __next__ print("classmate_iterator's next:", next(classmate_iterator)) for i in classmate: print(i) time.sleep(1) if __name__ == '__main__': main()
自己實現一個可迭代的對象
可以看到,現在已經可以實現for循環使用自定義的類了;但在這個代碼里我們看到為了實現返回迭代器我們要再定義一個額外的類,這樣是比較麻煩的。在這裡我們可以進行簡化一下,不返回另一個類,而是返回自己這個類,並且在自己類中定義一個 __next__ 方法。簡化如下
改進簡化迭代器

import time from collections.abc import Iterable, Iterator class Classmate(object): def __init__(self): self.names = list() self.num = 0 def add(self, name): self.names.append(name) def __iter__(self): return self def __next__(self): # return self.obj.names[0] try: ret = self.names[self.num] self.num += 1 return ret except IndexError as e: raise StopIteration def main(): classmate = Classmate() classmate.add("張三") classmate.add("李四") classmate.add("王五") for i in classmate: print(i) time.sleep(1) if __name__ == '__main__': main()
改進簡化迭代器.py
迭代器的應用
迭代器的作用
- 不用迭代器,是當要做某事之前,就生成並存儲數據,存儲數據時可能會佔用大量的空間;
- 用迭代器,是掌握數據的生成方法,什麼時候使用,什麼時候生成;
- 比如range(10),即時生成10個數據,那麼range(1000000000)呢?
- range:生成10個值的列表;xrange:存儲生成10個值的方式;
- python2 中 range(10) 存儲的是一個列表,xrange(10) 存儲的是生成10個值的方式,是一個迭代器;
- python3 中 range() 已經相當於python2中的 xrange()了,並且py3中已經沒有xrange()了;
- 迭代器是存儲生成數據的方式,而不是存儲數據結果;
python3中使用range:
>>> range(10) range(0, 10) >>> ret = range(10) >>> next(ret) Traceback (most recent call last): File "<pyshell#3>", line 1, in <module> next(ret) TypeError: 'range' object is not an iterator >>> for i in range(10): print(i) 0 1 2 3 ...
正常實現斐波那契數列
nums = [] a = 0 b = 1 i = 0 while i < 10: nums.append(a) a, b = b, a+b i += 1 for i in nums: print(i)
使用迭代器實現斐波那契數列

class Fibonacci(object): def __init__(self, times): self.times = times self.a = 0 self.b = 1 self.current_num = 0 def __iter__(self): return self def __next__(self): if self.current_num < self.times: ret = self.a self.a, self.b = self.b, self.a+self.b self.current_num += 1 return ret else: raise StopIteration fibo = Fibonacci(10) for i in fibo: print(i)
使用迭代器實現斐波那契數列
什麼時候調,什麼時候生成。
迭代器使用的其他方式-列表元組等類型轉換
當我們使用 list() 或者 tuple() 進行類型轉換時,使用的其實也是迭代器;
a = (11,22,33) b = list(a)
當我們使用list()將元組轉換成列表時,是使用了迭代器的原理,先定義一個空列表,用迭代器 通過 __next__ 從元組中取第一個值,添加到空列表中,再依次從元組取值,添加入列表,直到元組中沒有值了,主動拋出迭代停止異常;
同理,將列錶轉換成元組也是如此;
生成器
迭代器:用來節省內存空間而且還知道將來怎麼生成數據的方式;
生成器:一種特殊的迭代器;
生成器方式:
- 1.將列表推導式的小括號換成中括號;
- 2.函數中使用yield
實現生成器方式1
In [15]: L = [ x*2 for x in range(5)] In [16]: L Out[16]: [0, 2, 4, 6, 8] In [17]: G = ( x*2 for x in range(5)) In [18]: G Out[18]: <generator object <genexpr> at 0x7f626c132db0> In [19]: next(G) Out[19]: 0 In [20]: next(G) Out[20]: 2
實現生成器方式2
使用yield的生成器

def Fibonacci(n): a, b = 0, 1 count_num = 0 while count_num < n: # 如果函數中有一個yield語句,那麼這個就不再是函數,而是一個生成器的模板 yield a a, b = b, a+b count_num += 1 # 如果在調用時發現這個函數中有yield,那麼此時,不是調用函數,而是創建一個生成器對象 fb = Fibonacci(5) print("使用for循環遍歷生成器中的所有數字".center(40, "-")) for i in fb: print(i)
使用yield完成斐波那契數列
生成器執行流程:當第一次調用for/next執行時,會從生成器的第一行開始依次向下執行,直到在循環中碰見yield,就會返回yield後面的變量/字符;然後第二次調用for/next時,就會從上次的yield後面的代碼繼續執行,直到在循環中再次碰到yield,返回;依次往下,直到沒有了數據。
可以使用 for i in 生成器對象 來遍歷生成器中的數據,也可以用 next(生成器對象) 來一個一個獲取生成器中的值;
使用next獲取生成器中的值

def Fibonacci(n): a, b = 0, 1 count_num = 0 while count_num < n: # 如果函數中有一個yield語句,那麼這個就不再是函數,而是一個生成器的模板 yield a a, b = b, a+b count_num += 1 # 如果在調用時發現這個函數中有yield,那麼此時,不是調用函數,而是創建一個生成器對象 fb = Fibonacci(5) print("使用next依次生成三次數字".center(40, "-")) print(next(fb)) print(next(fb)) print(next(fb)) print("使用for循環遍歷剩餘的數字".center(40, "-")) for i in fb: print(i)
使用next獲取生成器中的值
生成器-send方式
可以重複創建多個生成器,多個生成器之間互不干擾;
如果在生成器中有return值,可以在生成器結束後用 出錯的結果.value 來進行接收;

def Fibonacci(n): a, b = 0, 1 count_num = 0 while count_num < n: # 如果函數中有一個yield語句,那麼這個就不再是函數,而是一個生成器的模板 yield a a, b = b, a+b count_num += 1 return "okhaha" # 如果在調用時發現這個函數中有yield,那麼此時,不是調用函數,而是創建一個生成器對象 fb = Fibonacci(5) while 1: try: result = next(fb) print(result) except Exception as e: print(e.value) break
生成器使用send
除了使用next來啟動生成器之外,還可以使用send來啟動生成器;

def Fibonacci(n): a, b = 0, 1 count_num = 0 while count_num < n: ret = yield a print("ret:", ret) a, b = b, a+b count_num += 1 fb = Fibonacci(5) print(next(fb)) print(fb.send("haha")) print(next(fb)) # 0 # ret: haha # 1 # ret: None # 1
使用send來啟動生成器
我們可以理解為,第一次使用next,先執行等號右邊的代碼,就將yield a返回給了next(fb);然後下次調用send時,執行等號左邊的,將send的傳值賦值給ret,再執行後續代碼;
或者我們可以理解 ret = yield a 為兩步 ===>1.yield a; 2.ret = arg;其中的arg表示send的傳值,如果不傳值,默認為None,所以當next在send後面調用時,就默認傳了None;
注意,一般不將send用作第一次喚醒生成器,如果一定要使用send第一次喚醒,要send(None);
生成器-小總結
生成器特點:
- 一個沒有__iter__和__next__方法的特殊迭代器;
- 函數只執行一部分就返回;
- 可以讓一個函數暫停執行,並且保存上次的值,根據上次的值恢復到原來的樣子,再做接下來的操作;
- 迭代器節省空間,實現循環;
- 生成器可以讓一個看起來像函數的代碼暫停執行,並根據自己的想法調用next/send繼續執行;
使用yield完成多任務
- 在python2中,while1 的執行時間大概是while True的2/3,這是因為True在2中不是關鍵字,可以隨意賦值,因此用while 1;
- 在python3中,True已經是關鍵字了,解釋器不用判斷True的值,所以while True和 while 1的區別不大,但可能還是1更快;
進程之間切換任務,佔用的資源很大,創建進程,釋放進程需要浪費大量的時間,進程的效率沒有線程高,比線程佔用資源更少的是協程;
使用yield完成多任務

import time def task_1(): while 1: print("---1---") time.sleep(0.5) yield def task_2(): while 1: print("---2---") time.sleep(0.5) yield def main(): t1 = task_1() t2 = task_2() while 1: next(t1) next(t2) if __name__ == "__main__": main()
使用yield完成多任務
是假的多任務,屬於並發;