第十章:Python高級編程-多執行緒、多進程和執行緒池編程
- 2020 年 5 月 10 日
- 筆記
- Python高級編程學習
第十章:Python高級編程-多執行緒、多進程和執行緒池編程
目錄
10.1 Python中的GIL
"""
gil global interpreter lock (cpython)
Python中一個執行緒對應於C語言中的一個執行緒
gil是的同一時刻只有一個執行緒在一個cpu上執行位元組碼
"""
# GIL會根據執行的位元組碼行數以及時間片釋放, GIL遇到IO操作的時候會主動釋放
import dis
def add(a):
a = a + 1
return a
print(dis.dis(add))
# ================ demo start =====================
total = 0
def add():
global total
for i in range(1000000):
total -= 1
def desc():
global total
for i in range(10000000):
total -= 1
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total) # GIL是會釋放的
10.2 多執行緒編程-threading
# 對應IO操作來說,多執行緒和多進程性能差別不大
# 1.通過Thread類實例化
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
#2. 通過集成Thread來實現多執行緒
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start()
# thread1.setDaemon(True) # 設置為守護執行緒,主執行緒結束其立刻結束
# thread2.setDaemon(True)
thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
10.3 執行緒間通訊-共享變數和Queue
"""
1. 執行緒通訊方式-共享變數
"""
import threading
DETAIL_URL_LIST = []
def get_detail_html():
# 爬取文章詳情頁
global DETAIL_URL_LIST
print("get detail html started")
url = DEATIL_URL_LIST.pop()
time.sleep(2)
print("get detail html end")
def get_detail_url():
# 爬取文章列表頁
global DETAIL_URL_LIST
print("get detail url started")
time.sleep(4)
for i in range(20):
DETAIL_URL_LIST.append("//projectsedu.com/{id}".format(id=i))
print("get detail url end")
if __name__ == "__main__":
thread_detail_url = threading.Thread(target=get_detail_url)
for i in range(10):
html_thread = threading.Thread(target=get_detail_html)
html_thread.start()
# =====================================================================
# 通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading
def get_detail_html(queue):
# 爬取文章詳情頁
while True:
url = queue.get() # 執行緒安全的,取不到阻塞
# for url in detail_url_list:
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(queue):
# 爬取文章列表頁
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
queue.put("//projectsedu.com/{id}".format(id=i))
print("get detail url end")
# 1. 執行緒通訊方式- 共享變數
if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
# # thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# thread_detail_url.start()
# thread_detail_url1.start()
#
# thread1.join()
# thread2.join()
detail_url_queue.task_done()
detail_url_queue.join() # 阻塞,等待task_done
# 當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
10.4 執行緒同步-Lock、Rlock
from threading import Lock, RLock, Condition # 可重入的鎖
# 在同一個執行緒裡面,可以連續調用多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = RLock()
def add():
# 1. dosomething1
# 2. io操作
# 1. dosomething3
global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
# 1. 用鎖會影響性能
# 2. 鎖會引起死鎖
# 死鎖的情況 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b)
B(a、b)
acquire (a)
acquire (b)
"""
10.5 執行緒同步-condition使用以及源碼分析
import threading
#條件變數, 用於複雜的執行緒間同步
# class XiaoAi(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="小愛")
# self.lock = lock
#
# def run(self):
# self.lock.acquire()
# print("{} : 在 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 好啊 ".format(self.name))
# self.lock.release()
#
# class TianMao(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="天貓精靈")
# self.lock = lock
#
# def run(self):
#
# self.lock.acquire()
# print("{} : 小愛同學 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 我們來對古詩吧 ".format(self.name))
# self.lock.release()
#通過condition完成協同讀詩
class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="小愛")
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print("{} : 在 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 好啊 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 君住長江尾 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 共飲長江水 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 此恨何時已 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 定不負相思意 ".format(self.name))
self.cond.notify()
class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="天貓精靈")
self.cond = cond
def run(self):
with self.cond:
print("{} : 小愛同學 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我們來對古詩吧 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我住長江頭 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 日日思君不見君 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 此水幾時休 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 只願君心似我心 ".format(self.name))
self.cond.notify()
self.cond.wait()
if __name__ == "__main__":
from concurrent import futures
cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)
#啟動順序很重要
#在調用with cond之後才能調用wait或者notify方法
#condition有兩層鎖, 一把底層鎖會在執行緒調用了wait方法的時候釋放, 上面的鎖會在每次調用wait的時候分配一把並放入到cond的等待隊列中,等到notify方法的喚醒
xiaoai.start()
tianmao.start()
10.6 執行緒同步-Semaphore使用及源碼分析
# Semaphore 是用於控制進入數量的鎖
# 文件, 讀、寫, 寫一般只是用於一個執行緒寫,讀可以允許有多個
# 做爬蟲
import threading
import time
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success")
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider("//baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()
10.7 ThreadPoolExecutor執行緒池
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool
#未來對象,task的返回容器
#執行緒池, 為什麼要執行緒池
#主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
#當一個執行緒完成的時候我們主執行緒能立即知道
#futures可以讓多執行緒和多進程編碼介面一致
import time
def get_html(times):
time.sleep(times)
print("get page {} success".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
#通過submit函數提交執行的函數到執行緒池中, submit 是立即返回
# task1 = executor.submit(get_html, (3))
# task2 = executor.submit(get_html, (2))
#要獲取已經成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
# for future in as_completed(all_task):
# data = future.result()
# print("get {} page".format(data))
#通過executor的map獲取已經完成的task的值
# for data in executor.map(get_html, urls):
# print("get {} page".format(data))
# #done方法用於判定某個任務是否完成
# print(task1.done())
# print(task2.cancel())
# time.sleep(3)
# print(task1.done())
#
# #result方法可以獲取task的執行結果
# print(task1.result())
10.8 多進程和多執行緒對比
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
#多進程編程
#耗cpu的操作,用多進程編程, 對於io操作來說, 使用多執行緒編程,進程切換代價要高於執行緒
#1. 對於耗費cpu的操作,多進程由於多執行緒
# def fib(n):
# if n<=2:
# return 1
# return fib(n-1)+fib(n-2)
#
# if __name__ == "__main__":
# with ThreadPoolExecutor(3) as executor:
# all_task = [executor.submit(fib, (num)) for num in range(25,40)]
# start_time = time.time()
# for future in as_completed(all_task):
# data = future.result()
# print("exe result: {}".format(data))
#
# print("last time is: {}".format(time.time()-start_time))
#2. 對於io操作來說,多執行緒優於多進程
def random_sleep(n):
time.sleep(n)
return n
if __name__ == "__main__":
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))
print("last time is: {}".format(time.time()-start_time))
10.9 multiprocessing多進程編程
# import os
# #fork只能用於linux/unix中
# pid = os.fork()
# print("bobby")
# if pid == 0:
# print('子進程 {} ,父進程是: {}.' .format(os.getpid(), os.getppid()))
# else:
# print('我是父進程:{}.'.format(pid))
import multiprocessing
#多進程編程
import time
def get_html(n):
time.sleep(n)
print("sub_progress success")
return n
if __name__ == "__main__":
# progress = multiprocessing.Process(target=get_html, args=(2,))
# print(progress.pid)
# progress.start()
# print(progress.pid)
# progress.join()
# print("main progress end")
#使用執行緒池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# result = pool.apply_async(get_html, args=(3,))
#
# #等待所有任務完成
# pool.close()
# pool.join()
#
# print(result.get())
#imap
# for result in pool.imap(get_html, [1,5,3]):
# print("{} sleep success".format(result))
for result in pool.imap_unordered(get_html, [1,5,3]):
print("{} sleep success".format(result))
10.10 進程間通訊-Queue、Pipe、Manager
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe
# def producer(queue):
# queue.put("a")
# time.sleep(2)
#
# def consumer(queue):
# time.sleep(2)
# data = queue.get()
# print(data)
#
# if __name__ == "__main__":
# queue = Queue(10)
# my_producer = Process(target=producer, args=(queue,))
# my_consumer = Process(target=consumer, args=(queue,))
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()
#共享全局變數通訊
#共享全局變數不能適用於多進程編程,可以適用於多執行緒
# def producer(a):
# a += 100
# time.sleep(2)
#
# def consumer(a):
# time.sleep(2)
# print(a)
#
# if __name__ == "__main__":
# a = 1
# my_producer = Process(target=producer, args=(a,))
# my_consumer = Process(target=consumer, args=(a,))
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()
#multiprocessing中的queue不能用於pool進程池
#pool中的進程間通訊需要使用manager中的queue
# def producer(queue):
# queue.put("a")
# time.sleep(2)
#
# def consumer(queue):
# time.sleep(2)
# data = queue.get()
# print(data)
#
# if __name__ == "__main__":
# queue = Manager().Queue(10)
# pool = Pool(2)
#
# pool.apply_async(producer, args=(queue,))
# pool.apply_async(consumer, args=(queue,))
#
# pool.close()
# pool.join()
#通過pipe實現進程間通訊
#pipe的性能高於queue
# def producer(pipe):
# pipe.send("bobby")
#
# def consumer(pipe):
# print(pipe.recv())
#
# if __name__ == "__main__":
# recevie_pipe, send_pipe = Pipe()
# #pipe只能適用於兩個進程
# my_producer= Process(target=producer, args=(send_pipe, ))
# my_consumer = Process(target=consumer, args=(recevie_pipe,))
#
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()
def add_data(p_dict, key, value):
p_dict[key] = value
if __name__ == "__main__":
progress_dict = Manager().dict()
from queue import PriorityQueue
first_progress = Process(target=add_data, args=(progress_dict, "bobby1", 22))
second_progress = Process(target=add_data, args=(progress_dict, "bobby2", 23))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(progress_dict)