第十章:Python高級編程-多執行緒、多進程和執行緒池編程

第十章:Python高級編程-多執行緒、多進程和執行緒池編程

Python3高級核心技術97講 筆記

10.1 Python中的GIL

"""
gil global interpreter lock (cpython)
Python中一個執行緒對應於C語言中的一個執行緒
gil是的同一時刻只有一個執行緒在一個cpu上執行位元組碼
"""

# GIL會根據執行的位元組碼行數以及時間片釋放, GIL遇到IO操作的時候會主動釋放
import dis


def add(a):
    a = a + 1
    return a


print(dis.dis(add))


# ================ demo start =====================
total = 0


def add():
    global total
    for i in range(1000000):
        total -= 1
        

def desc():
    global total
    for i in range(10000000):
        total -= 1
        
        
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(total)  # GIL是會釋放的

10.2 多執行緒編程-threading

# 對應IO操作來說,多執行緒和多進程性能差別不大
# 1.通過Thread類實例化


import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")


#2. 通過集成Thread來實現多執行緒


class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")

if  __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()

   	# thread1.setDaemon(True)  # 設置為守護執行緒,主執行緒結束其立刻結束
    # thread2.setDaemon(True)
    
    thread1.join()
    thread2.join()

    #當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))
    

10.3 執行緒間通訊-共享變數和Queue

"""
1. 執行緒通訊方式-共享變數
"""


import threading

DETAIL_URL_LIST = []


def get_detail_html():
    # 爬取文章詳情頁
    global DETAIL_URL_LIST
    print("get detail html started")
    url = DEATIL_URL_LIST.pop()
    time.sleep(2)
    print("get detail html end")


def get_detail_url():
    # 爬取文章列表頁
    global DETAIL_URL_LIST
    print("get detail url started")
    time.sleep(4)
    for i in range(20):
        DETAIL_URL_LIST.append("//projectsedu.com/{id}".format(id=i))
    print("get detail url end")
    
    
if __name__ == "__main__":
    thread_detail_url = threading.Thread(target=get_detail_url)
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html)
        html_thread.start()

    
# =====================================================================
# 通過queue的方式進行執行緒間同步
from queue import Queue


import time
import threading


def get_detail_html(queue):
    # 爬取文章詳情頁
    while True:
        url = queue.get()  # 執行緒安全的,取不到阻塞
        # for url in detail_url_list:
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


def get_detail_url(queue):
    # 爬取文章列表頁
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            queue.put("//projectsedu.com/{id}".format(id=i))
        print("get detail url end")


# 1. 執行緒通訊方式- 共享變數

if  __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000)


    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
        html_thread.start()
    # # thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    # thread_detail_url.start()
    # thread_detail_url1.start()
    #
    # thread1.join()
    # thread2.join()
    detail_url_queue.task_done()
    detail_url_queue.join()  # 阻塞,等待task_done

    # 當主執行緒退出的時候, 子執行緒kill掉
    print ("last time: {}".format(time.time()-start_time))
    

10.4 執行緒同步-Lock、Rlock

from threading import Lock, RLock, Condition  # 可重入的鎖

# 在同一個執行緒裡面,可以連續調用多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = RLock()
def add():
    # 1. dosomething1
    # 2. io操作
    # 1. dosomething3
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release()
        lock.release()


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()


thread1.join()
thread2.join()
print(total)

# 1. 用鎖會影響性能
# 2. 鎖會引起死鎖
# 死鎖的情況 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b)

B(a、b)
acquire (a)
acquire (b)
"""
    

10.5 執行緒同步-condition使用以及源碼分析

import threading

#條件變數, 用於複雜的執行緒間同步
# class XiaoAi(threading.Thread):
#     def __init__(self, lock):
#         super().__init__(name="小愛")
#         self.lock = lock
#
#     def run(self):
#         self.lock.acquire()
#         print("{} : 在 ".format(self.name))
#         self.lock.release()
#
#         self.lock.acquire()
#         print("{} : 好啊 ".format(self.name))
#         self.lock.release()
#
# class TianMao(threading.Thread):
#     def __init__(self, lock):
#         super().__init__(name="天貓精靈")
#         self.lock = lock
#
#     def run(self):
#
#         self.lock.acquire()
#         print("{} : 小愛同學 ".format(self.name))
#         self.lock.release()
#
#         self.lock.acquire()
#         print("{} : 我們來對古詩吧 ".format(self.name))
#         self.lock.release()

#通過condition完成協同讀詩

class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="小愛")
        self.cond = cond

    def run(self):
        with self.cond:
            self.cond.wait()
            print("{} : 在 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 好啊 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住長江尾 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 共飲長江水 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 此恨何時已 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 定不負相思意 ".format(self.name))
            self.cond.notify()

class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="天貓精靈")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小愛同學 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我們來對古詩吧 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我住長江頭 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 日日思君不見君 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 此水幾時休 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 只願君心似我心 ".format(self.name))
            self.cond.notify()
            self.cond.wait()



if __name__ == "__main__":
    from concurrent import futures
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    #啟動順序很重要
    #在調用with cond之後才能調用wait或者notify方法
    #condition有兩層鎖, 一把底層鎖會在執行緒調用了wait方法的時候釋放, 上面的鎖會在每次調用wait的時候分配一把並放入到cond的等待隊列中,等到notify方法的喚醒
    xiaoai.start()
    tianmao.start()

10.6 執行緒同步-Semaphore使用及源碼分析

# Semaphore 是用於控制進入數量的鎖
# 文件, 讀、寫, 寫一般只是用於一個執行緒寫,讀可以允許有多個

# 做爬蟲
import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider("//baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

10.7 ThreadPoolExecutor執行緒池

 
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool

#未來對象,task的返回容器


#執行緒池, 為什麼要執行緒池
#主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
#當一個執行緒完成的時候我們主執行緒能立即知道
#futures可以讓多執行緒和多進程編碼介面一致
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times



executor = ThreadPoolExecutor(max_workers=2)
#通過submit函數提交執行的函數到執行緒池中, submit 是立即返回
# task1 = executor.submit(get_html, (3))
# task2 = executor.submit(get_html, (2))


#要獲取已經成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
# for future in as_completed(all_task):
#     data = future.result()
#     print("get {} page".format(data))
#通過executor的map獲取已經完成的task的值
# for data in executor.map(get_html, urls):
#     print("get {} page".format(data))


# #done方法用於判定某個任務是否完成
# print(task1.done())
# print(task2.cancel())
# time.sleep(3)
# print(task1.done())
#
# #result方法可以獲取task的執行結果
# print(task1.result())

10.8 多進程和多執行緒對比

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
#多進程編程
#耗cpu的操作,用多進程編程, 對於io操作來說, 使用多執行緒編程,進程切換代價要高於執行緒

#1. 對於耗費cpu的操作,多進程由於多執行緒
# def fib(n):
#     if n<=2:
#         return 1
#     return fib(n-1)+fib(n-2)
#
# if __name__ == "__main__":
#     with ThreadPoolExecutor(3) as executor:
#         all_task = [executor.submit(fib, (num)) for num in range(25,40)]
#         start_time = time.time()
#         for future in as_completed(all_task):
#             data = future.result()
#             print("exe result: {}".format(data))
#
#         print("last time is: {}".format(time.time()-start_time))

#2. 對於io操作來說,多執行緒優於多進程
def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

10.9 multiprocessing多進程編程

# import os
# #fork只能用於linux/unix中
# pid = os.fork()
# print("bobby")
# if pid == 0:
#   print('子進程 {} ,父進程是: {}.' .format(os.getpid(), os.getppid()))
# else:
#   print('我是父進程:{}.'.format(pid))


import multiprocessing

#多進程編程
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    # progress = multiprocessing.Process(target=get_html, args=(2,))
    # print(progress.pid)
    # progress.start()
    # print(progress.pid)
    # progress.join()
    # print("main progress end")

    #使用執行緒池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    # result = pool.apply_async(get_html, args=(3,))
    #
    # #等待所有任務完成
    # pool.close()
    # pool.join()
    #
    # print(result.get())

    #imap
    # for result in pool.imap(get_html, [1,5,3]):
    #     print("{} sleep success".format(result))

    for result in pool.imap_unordered(get_html, [1,5,3]):
        print("{} sleep success".format(result))

10.10 進程間通訊-Queue、Pipe、Manager

import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe


# def producer(queue):
#     queue.put("a")
#     time.sleep(2)
#
# def consumer(queue):
#     time.sleep(2)
#     data = queue.get()
#     print(data)
#
# if __name__ == "__main__":
#     queue = Queue(10)
#     my_producer = Process(target=producer, args=(queue,))
#     my_consumer = Process(target=consumer, args=(queue,))
#     my_producer.start()
#     my_consumer.start()
#     my_producer.join()
#     my_consumer.join()

#共享全局變數通訊
#共享全局變數不能適用於多進程編程,可以適用於多執行緒


# def producer(a):
#     a += 100
#     time.sleep(2)
#
# def consumer(a):
#     time.sleep(2)
#     print(a)
#
# if __name__ == "__main__":
#     a = 1
#     my_producer = Process(target=producer, args=(a,))
#     my_consumer = Process(target=consumer, args=(a,))
#     my_producer.start()
#     my_consumer.start()
#     my_producer.join()
#     my_consumer.join()

#multiprocessing中的queue不能用於pool進程池
#pool中的進程間通訊需要使用manager中的queue

# def producer(queue):
#     queue.put("a")
#     time.sleep(2)
#
# def consumer(queue):
#     time.sleep(2)
#     data = queue.get()
#     print(data)
#
# if __name__ == "__main__":
#     queue = Manager().Queue(10)
#     pool = Pool(2)
#
#     pool.apply_async(producer, args=(queue,))
#     pool.apply_async(consumer, args=(queue,))
#
#     pool.close()
#     pool.join()

#通過pipe實現進程間通訊
#pipe的性能高於queue

# def producer(pipe):
#     pipe.send("bobby")
#
# def consumer(pipe):
#     print(pipe.recv())
#
# if __name__ == "__main__":
#     recevie_pipe, send_pipe = Pipe()
#     #pipe只能適用於兩個進程
#     my_producer= Process(target=producer, args=(send_pipe, ))
#     my_consumer = Process(target=consumer, args=(recevie_pipe,))
#
#     my_producer.start()
#     my_consumer.start()
#     my_producer.join()
#     my_consumer.join()

def add_data(p_dict, key, value):
    p_dict[key] = value

if __name__ == "__main__":
    progress_dict = Manager().dict()
    from queue import PriorityQueue

    first_progress = Process(target=add_data, args=(progress_dict, "bobby1", 22))
    second_progress = Process(target=add_data, args=(progress_dict, "bobby2", 23))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)