python爬蟲之多執行緒、多進程+程式碼示例
python爬蟲之多執行緒、多進程
使用多進程、多執行緒編寫爬蟲的程式碼能有效的提高爬蟲爬取目標網站的效率。
一、什麼是進程和執行緒
引用廖雪峰的官方網站關於進程和執行緒的講解:
進程:對於作業系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開一個記事本就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。
執行緒:有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個「子任務」,我們把進程內的這些「子任務」稱為執行緒(Thread)。
每個進程至少要做一件事,所以,一個進程至少有一個執行緒。
二、多進程
實現多進程的四種方式
方式一:os.fork()
python 的 os 模組封裝了常見的系統調用,其中,多進程的調用就是 fork() 函數。具體示例程式碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1.只有在Unix系統中有效,Windows系統中無效
2.fork函數調用一次,返回兩次:在父進程中返回值為子進程id,在子進程中返回值為0
"""
import os
pid = os.fork()
if pid == 0:
print("執行子進程,子進程pid={pid},父進程ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
print("執行父進程,子進程pid={pid},父進程ppid={ppid}".format(pid=pid, ppid=os.getpid()))
# 執行父進程,子進程pid=611,父進程ppid=610
# 執行子進程,子進程pid=611,父進程ppid=610
方法二:Multiprocessing 模組 Process 類
通過 Multiprocessing 模組中的 Process 類,創建Process對象。
Process類的構造方法:
init(self, group=None, targent=None, name=None, args=(), kwargs={})
參數 | 說明 |
---|---|
group | 進程所屬組,基本不用。 |
targent | 表示調用對象,一般為函數。 |
args | 表示調用對象參數元祖。 |
name | 進程別名。 |
kwargs | 表示調用對象的字典。 |
具體示例程式碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process
def run_process(name):
print(name)
if __name__ == "__main__":
p = Process(target=run_process, args=("test",))
p.start()
p.join()
print("子進程結束")
# test
# 子進程結束
方法三:繼承 Process 類
通過繼承Process類,重寫 run 方法。使用 .start() 方法,會自動調用 run 方法。具體示例程式碼如下:
from multiprocessing import Process
class NewProcess(Process):
def __init__(self, n):
super(NewProcess, self).__init__()
self.n = n
def run(self):
print(self.n)
if __name__ == "__main__":
test = "test"
p = NewProcess(test)
p.start()
p.join()
print("子進程結束")
# test
# 子進程結束
方式四:進程池 Pool 類
Pool 類可以提供指定數量(一般為CPU的核數)的進程供用戶調用,當有新的請求提交的 Pool 中時,如果池中還沒有滿,就會創建一個新的進程來執行這些請求。如果池滿,請求就會告知先等待。直到池中有進程結束,才會創建新的進程來執行這些請求。
注意:進程池中的進程是不能共享隊列和數據的,而 Process 生成的子進程可以共享隊列。
Pool 類中的常用方法:
函數 | 函數原型 | 說明 |
---|---|---|
apply() | apply(func[, args=()[, kwds={}]]) | 該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以後不再出現)。 |
apply_async() | apply_async(func[, args()[, kwds{}[, callback=None]]]) | 與apply用法一樣,但它是非阻塞且支援結果返回進行回調。 |
map() | map(func, utterable[, chunksize=None]) | Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程式才會運行子進程。 |
close() | 關閉進程池(Pool),使其不能再添加新的Process。 | |
terminate() | 結束工作進程,不再處理未處理的任務。 | |
join() | 主進程阻塞等待子進程的退出,join方法必須在close或terminate之後使用。 |
具體程式碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool
def run(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
testList = [1, 2, 3, 4, 5, 6, 7]
print('單進程執行') # 順序執行
t1 = time.time()
for i in testList:
run(i)
t2 = time.time()
print('順序執行的時間為:', int(t2 - t1))
print('多進程 map 執行') # 並行執行
p = Pool(4) # 創建擁有4個進程數量的進程池
result = p.map(run, testList)
p.close() # 關閉進程池,不再接受新的任務
p.join() # 主進程阻塞等待子進程的退出
t3 = time.time()
print('執行的時間為:', int(t3 - t2))
print(result)
# 單進程執行
# 順序執行的時間為: 7
# 多進程 map 執行
# 執行的時間為: 2
# [1, 4, 9, 16, 25, 36, 49]
進程通訊
Queue()
隊列:先進先出,按照順序
通訊原理:在記憶體中建立隊列數據結構模型。多個進程都可以通過隊列存入內容,取出內容的順序和存入內容的順序保存一致。
方法 | 功能 | 參數 |
---|---|---|
q = Queue(maxsize = 0) | 創建隊列消息,並返回隊列對象。 | 表示最多存儲多少消息。默認表示根據記憶體分配存儲。 |
q.put(data, [block, timeout]) | 向隊列存儲消息。 | Data:要存入的數據。block:默認隊列滿時會堵塞,設置False則非堵塞。timeout:超時時間。 |
data = q.get([block, timeout]) | 獲取隊列消息。 | block:默認隊列空時會堵塞,設置False則非堵塞。timeout:超時時間。 |
q.full() | 判斷隊列是否為滿。 | |
q.empty() | 判斷隊列是否為空。 | |
q.size() | 判斷隊列中的消息數量。 | |
q.close() | 關閉隊列。 |
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue
def foo(data):
s = data.get() # 管子的另一端放在子進程這裡,子進程接收到了數據
if s not in "":
print('子進程已收到數據...')
print(s) # 子進程列印出了數據內容...
if __name__ == '__main__': # 要加這行...
q = Queue() # 創建進程通訊的Queue,你可以理解為我拿了個管子來...
p = Process(target=foo, args=(q,)) # 創建子進程
print('主進程準備發送數據...')
q.put("數據接收成功") # 將管子的一端放在主進程這裡,主進程往管子里丟入數據↑
p.start() # 啟子子進程
p.join()
# 主進程準備發送數據...
# 子進程已收到數據...
# 數據接收成功
Pipe()
通訊原理:在記憶體中開闢管道空間,生成管道操作對象,多個進程使用「同一個」管道對象進行操作即可實現通訊。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello']) # 向管道中寫入內容
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 從管道讀取資訊
p.join()
# prints "[42, None, 'hello']"
manager()
進程的 manager 方法可以共享數據,比如共享列表,元祖,字典,鎖,字元。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
def f(m_list):
m_list.append("f")
if __name__ == '__main__':
manager = multiprocessing.Manager()
m_list = manager.list([1, 2, 3])
p = multiprocessing.Process(target=f, args=(m_list, ))
p.start()
p.join()
print(m_list)
# [1, 2, 3, 'f']
三、多執行緒
執行緒在程式中是獨立的、並非的執行流。與分隔的進程相比執行緒之間的隔離程度要小,它們共享記憶體,文件句柄和其它進程應有的狀態。多執行緒之間共享全局變數。
創建多執行緒多兩種方式
方法一:threading模組Thread類
具體程式碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()
t1.join()
t2.join()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
自定義執行緒
繼承threading.Thread類自定義執行緒類。其本質是重構Thread類中的run方法。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
守護執行緒
setDaemon(True)把所有的子執行緒都變成了主執行緒的守護執行緒,因此當主進程結束後,子執行緒也會隨之結束。所以當主執行緒結束後,整個程式就退出了。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1) # 此時子執行緒停1s
print('2')
time.sleep(1)
print('1')
if __name__ == '__main__':
t = threading.Thread(target=run, args=("t1",))
t.setDaemon(True) # 把子進程設置為守護執行緒,必須在start()之前設置
t.start()
print("end")
# task t1
# end
想要守護執行緒執行結束後,主進程再結束,可以使用 join 方法,讓主執行緒等待子執行緒執行完畢。
Lock
多執行緒和多進程最大的不同在於,多進程中,同一個變數,各自都有一份拷貝存與每個進程中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享數據最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
value = 0
lock = threading.Lock()
def change_it(n):
# 先存後取,結果應該為0:
global value
value = value + n
value = value - n
# 未加鎖(值不確定)
def run_thread(n):
for i in range(2000000):
change_it(n)
# 加鎖
# def run_thread(n):
# for i in range(2000000):
# lock.acquire()
# try:
# change_it(n)
# finally:
# lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)
# 29
由於鎖只有一個,無論多少執行緒,同一時刻最多只有一個執行緒持有該鎖,所以不會造成修改的衝突。當多個執行緒同時執行 lock.acquire() 時,只有一個執行緒能成功獲取鎖,然後繼續執行程式碼,其它執行緒就繼續等待直到獲得鎖為止。
獲得鎖的執行緒用完一定要釋放鎖,否則那些等待鎖的執行緒將會永遠的等待下去,成為死執行緒。所以用 try…finally 來確保鎖一定會被釋放。
鎖的好處就是確保某段關鍵程式碼只能由一個執行緒從頭到尾完整的執行,壞處當然也很多,首先是阻止了多執行緒並發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率大大的下降了。其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。
訊號量(BoundedSemaphore類)
Lock同時只允許一個執行緒更改數據,而Semaphore是同時允許一定數量的執行緒去更改數據。
import threading
import time
def run(n, semaphore):
semaphore.acquire() #加鎖
time.sleep(1)
print("run the thread:%s\n" % n)
semaphore.release() #釋放
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個執行緒同時運行
for i in range(22):
t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----all threads done-----')
GIL鎖
在非 python 環境中,單核情況下,同時只能有一個任務執行。多核可以同時支援多個執行緒同時執行。但是在 python 中,無論有多少核,同只能執行一個執行緒。究其原因,這就是GIL的存在導致的。
GIL全稱Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了數據安全所做的決定。某個執行緒想要執行,必須先拿到GIL,我們可以把GIL看作是「通行證」,並且在一個python進程中,GIL只有一個。拿不到通行證的執行緒,就不允許進入CPU執行。GIL只有在cpython中才有,因為cpython調用的是c語言的原生執行緒,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個執行緒拿到數據,而在pypy和jpython中是沒有GIL的。
python針對不同類型的程式碼執行效率也是不同的。
1、cpu密集型程式碼(各種循環處理、計數等),在這種情況下,由於電腦工作多,ticks計數很快就會達到閾值。然後觸發GIL的釋放與再競爭(多個執行緒來回切換是需要消耗資源的),所以python下的多執行緒對cpu密集型代並不友好。
2、IO密集型程式碼(文件處理,網路爬蟲等涉及文件讀寫的操作),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的浪費,而開啟多執行緒能在執行緒A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率)。所以python的多執行緒對IO密集型程式碼比較友好。
使用建議
python下想要充分利用多核CPU,就使用多進程。因為每個進程都有各子獨立的GIL,互不干擾,這樣就可以真正意義上的並行執行,在python中,多進程的執行效率優於多執行緒(僅僅針對多核CPU而言)。
四、爬取豆瓣電影TOP250
採取三種方式。爬取前250名電影。
(1)所爬取的網頁鏈接://movie.douban.com/top250?start=0&filter=
(2)通過分析網頁,發現第一頁的url start=0,第二頁的url start=25,第三頁的url start=50。
(3)主要爬取電影名跟評分,用來進行比對,所以數據方面就不過多的提取和保存,只簡單的列印出來。
多進程爬取
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue
import time
from lxml import etree
import requests
class DouBanSpider(Process):
def __init__(self, q, url_list, lock):
# 重寫寫父類的__init__方法
super(DouBanSpider, self).__init__()
self.url_list = url_list
self.q = q
self.lock = lock
self.headers = {
'Host': 'movie.douban.com',
'Referer': '//movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
用來發送請求的方法
:return: 返回網頁源碼
'''
# 請求出錯時,重複請求3次,
i = 0
while i <= 3:
try:
print(u"[INFO]請求url:" + url)
return requests.get(url=url, headers=self.headers).content
except Exception as e:
print(u'[INFO] %s%s' % (e, url))
i += 1
def parse_page(self):
'''
解析網站源碼,並採用xpath提取 電影名稱和平分放到隊列中
:return:
'''
time.sleep(0.1)
while 1:
try:
url = self.url_list.pop()
except IndexError as e:
break
self.lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# 獲取到一頁的電影數據
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 電影名稱
title = move.xpath('.//a/span/text()')[0]
# 評分
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# 將每一部電影的名稱跟評分加入到隊列
self.q.put(score + "\t" + title)
self.lock.release()
class AllUrlSpider(Process):
def __init__(self, url_lis):
super(AllUrlSpider, self).__init__()
self.url_list = url_lis
def run(self):
base_url = '//movie.douban.com/top250?start='
# 構造所有url
for num in range(225, -1, -25):
self.url_list.append(base_url + str(num))
print("獲得URL:{}".format(base_url + str(num)))
def main():
# 創建一個隊列用來保存進程獲取到的數據
q = Queue()
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
url_list = manager.list()
a = AllUrlSpider(url_list)
p = DouBanSpider(q, url_list, lock)
b = DouBanSpider(q, url_list, lock)
c = DouBanSpider(q, url_list, lock)
a.start()
p.start()
b.start()
c.start()
a.join()
p.join()
b.join()
c.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info]耗時:%s' % (time.time() - start))
多進程爬取耗時7.15秒,部分結果如下圖所示:
多執行緒爬取
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time
from lxml import etree
import requests
url_list = []
lock = threading.Lock()
class DouBanSpider(Thread):
def __init__(self, q) :
# 重寫寫父類的__init__方法
super(DouBanSpider, self).__init__()
self.q = q
self.headers = {
'Host': 'movie.douban.com',
'Referer': '//movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
用來發送請求的方法
:return: 返回網頁源碼
'''
# 請求出錯時,重複請求3次,
i = 0
while i <= 3:
try:
print
u"[INFO]請求url:" + url
html = requests.get(url=url, headers=self.headers).content
except Exception as e:
print
u'[INFO] %s%s' % (e, url)
i += 1
else:
return html
def parse_page(self):
'''
解析網站源碼,並採用xpath提取 電影名稱和平分放到隊列中
:return:
'''
while 1:
try:
url = url_list.pop()
except IndexError as e:
break
lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# 獲取到一頁的電影數據
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 電影名稱
title = move.xpath('.//a/span/text()')[0]
# 評分
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# 將每一部電影的名稱跟評分加入到隊列
self.q.put(score + "\t" + title)
lock.release()
class AllUrlSpider(Thread):
def run(self):
base_url = '//movie.douban.com/top250?start='
# 構造所有url
for num in range(225, -1, -25):
url_list.append(base_url + str(num))
print("獲得URL:{}".format(base_url + str(num)))
def main():
# 創建一個隊列用來保存進程獲取到的數據
q = Queue()
a = AllUrlSpider()
a.start()
# 保存執行緒
Thread_list = []
# 創建並啟動執行緒
for i in range(5):
p = DouBanSpider(q)
p.start()
Thread_list.append(p)
a.join()
# 讓主執行緒等待子執行緒執行完成
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info]耗時:%s' % (time.time() - start))
多進程爬取耗時5秒,部分結果如下圖所示:
耗時跟網路的好壞也是有一定的關係,每次測出的數據結果也不一樣。但理論上來講,執行緒在I/O密集的操作性是要高於進程的。