python之并发编程
- 2019 年 10 月 3 日
- 笔记
一、并发编程之多进程
1.multiprocessing模块介绍
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
2.Process类的介绍
创建进程的类
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
方法介绍
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
3.Process类的使用
注意:在windows中Process()必须放到# if name == ‘main‘:下
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
3.1 创建开启子进程的两种方式
方式一
from multiprocessing import Process import time def task(name): print(f"{name} is running") time.sleep(2) print(f"{name} is gone") if __name__ == '__main__': #在windos中,开启进程必须在__name__ == '__main__'下面 p = Process(target=task,args=("zbb",)) #创建一个进程对象 p.start() #只是向操作系统发出一个开辟子进程的信号,然后执行下一行 # 这个信号操作系统接收到之后,会从内存中开辟一个子进程空间, # 然后在将主进程所有数据copy加载到子进程,然后在调用cpu去执行. # 开辟子进程开销是很大的. print("==主开始") time.sleep(3) print("主结束") # ==主开始 # zbb is running # zbb is gone # 主结束
方式二(了解不常用)
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): #必须为run 否则会执行父类的,但是父类的为None print(f"{self.name} is running") time.sleep(2) print(f"{self.name} is gone ") if __name__ == '__main__': p = MyProcess("zbb") p.start() print("==主") # ==主 # zbb is running # zbb is gone
3.2 获取进程pid
pid是进程在内存中唯一的标识
列如,linux中 kill pid
代码获取
from multiprocessing import Process import os def task(name): print(f'子进程:{os.getpid()}') print(f'主进程:{os.getppid()}') if __name__ == '__main__': p = Process(target=task,args=('zbb',)) # 创建一个进程对象 p.start() print(f'====主{os.getpid()}') # ====主13548 # 子进程:1832 # 主进程:13548
win命令行获取pid
linux中获取
3.3验证进程之间的空间隔离
子进程和主进程在不同的空间
from multiprocessing import Process import time name = '追梦NAN' def task(): global name name = 'zbb' print(f'子进程{name}') if __name__ == '__main__': p = Process(target=task) # 创建一个进程对象 p.start() # print('==主开始') time.sleep(3) print(f'主:{name}') # 子进程zbb # 主:追梦NAN
3.4 进程对象的join方法
join让主进程等待子进程结束之后,在执行主进程.
from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') if __name__ == '__main__': p = Process(target=task,args=('zbb',)) # 创建一个进程对象 p.start() p.join() # print('==主开始')
多个子进程使用join
验证一
from multiprocessing import Process import time def task(name,sec): print(f'{name}is running') time.sleep(sec) print(f'{name} is gone') if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=('1',1)) p2 = Process(target=task,args=('2',2)) p3 = Process(target=task,args=('3',3)) p1.start() p2.start() p3.start() p1.join() # join只针对主进程,如果join下面多次join 他是不阻塞的. p2.join() p3.join() print(f'==主{time.time()-start_time}') # 1is running # 2is running # 3is running # 1 is gone # 2 is gone # 3 is gone # ==主3.186117172241211
验证2
# 多个子进程使用join from multiprocessing import Process import time def task(name,sec): print(f'{name}is running') time.sleep(sec) print(f'{name} is gone') if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=('1',3)) p2 = Process(target=task,args=('2',2)) p3 = Process(target=task,args=('3',1)) p1.start() p2.start() p3.start() p1.join() #p1就是阻塞 走完周后才走主 print(f'==主1-{time.time() - start_time}') p2.join() print(f'==主2-{time.time() - start_time}') p3.join() print(f'==主3-{time.time()-start_time}') # 1is running # 2is running # 3is running # 3 is gone # 2 is gone # 1 is gone # ==主1-3.152977705001831 # ==主2-3.152977705001831 # ==主3-3.152977705001831
优化上面代码
from multiprocessing import Process import time def task(name,sec): print(f'{name}is running') time.sleep(sec) print(f'{name} is gone') if __name__ == '__main__': start_time = time.time() l1 = [] for i in range(1,4): p=Process(target=task,args=("zbb",1)) l1.append(p) p.start() for i in l1: i.join() print(f'==主{time.time() - start_time}') print(l1) # zbbis running # zbbis running # zbbis running # zbb is gone # zbb is gone # zbb is gone # ==主1.1665570735931396 # [<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>]
join就是阻塞,主进程有join,主进程下面的代码一律不执行,直到进程执行完毕之后,在执行.
3.5进程对象的其他属性(了解)
from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') if __name__ == '__main__': p = Process(target=task,args=('cc',),name='aaa') # 创建一个进程对象 print(p.name) #给子进程起名字 p.start() # time.sleep(1) p.terminate() #杀死子进程 time.sleep(0.5) #先睡一会要不然判断还是活的 print(p.is_alive())#判断子进程是否存活 # p.name = 'sb' #改子进程的名字 print('==主开始')
3.6 僵尸进程和孤儿进程(了解)
基于unix环境(linux,macOS)
正常:主进程需要等待子进程结束之后,主进程才结束
主进程时刻监测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收.
为什么主进程不在子进程结束后马上对其回收呢?
1. 主进程与子进程是异步关系.主进程无法马上捕获子进程什么时候结束. 2. 如果子进程结束之后马上再内存中释放资源,主进程就没有办法监测子进程的状态了.
unix针对于上面的问题,提供了一个机制.
所有的子进程结束之后,立马会释放掉文件的操作链接,内存的大部分数据,但是会保留一些内容: 进程号,结束时间,运行状态,等待主进程监测,回收.
所有的子进程结束之后,在被主进程回收之前,都会进入僵尸进程状态.
一:僵尸进程(有害) 僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。
如果父进程不对僵尸进程进行回收(wait/waitpid),产生大量的僵尸进程,这样就会占用内存,占用进程pid号.
僵尸进程如何解决???
父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由init进行回收
二:孤儿进程(无害) 孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
4.守护进程
子进程守护着主进程,只要主进程结束,子进程跟着就结束
from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') if __name__ == '__main__': # 在windows环境下, 开启进程必须在 __name__ == '__main__' 下面 p = Process(target=task,args=('zb',)) # 创建一个进程对象 p.daemon = True # 将p子进程设置成守护进程,只要主进程结束,守护进程马上结束. p.start() time.sleep(1) print('===主')
5.互斥锁(进程同步控制)
多个用户抢占一个资源时,第一个用户先抢到了,加上锁,用完之后才给第二个用户使用
问题
三个同事 同时用一个打印机打印内容.
三个进程模拟三个同事, 输出平台模拟打印机.
#版本一: #并发是以效率优先的,但是目前我们的需求: 顺序优先. #多个进程共强一个资源时, 要保证顺序优先: 串行,一个一个来. from multiprocessing import Process import time import random import os def task1(p): print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') def task2(p): print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') if __name__ == '__main__': p1 = Process(target=task1,args=('p1',)) p2 = Process(target=task2,args=('p2',)) p2.start() p2.join() p1.start() p1.join() #我们利用join 解决串行的问题,保证了顺序优先,但是这个谁先谁后是固定的. #这样不合理. 你在争抢同一个资源的时候,应该是先到先得,保证公平.
from multiprocessing import Process from multiprocessing import Lock import time import random import os def task1(p,lock): ''' 一把锁不能连续锁两次 ''' lock.acquire() print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') lock.release() def task2(p,lock): lock.acquire() print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') lock.release() if __name__ == '__main__': mutex = Lock() p1 = Process(target=task1,args=('p1',mutex)) p2 = Process(target=task2,args=('p2',mutex)) p2.start() p1.start()
lock与join的区别.
共同点: 都可以把并发变成串行, 保证了顺序. 不同点: join人为设定顺序,lock让其争抢顺序,保证了公平性.
6.进程之间的通信
1.基于文件通信
# 抢票系统. # 先可以查票.查询余票数. 并发 # 进行购买,向服务端发送请求,服务端接收请求,在后端将票数-1,返回到前端. 串行. # 当多个进程共强一个数据时,如果要保证数据的安全,必须要串行. # 要想让购买环节进行串行,我们必须要加锁处理. from multiprocessing import Process from multiprocessing import Lock import json import time import os import random def search(): time.sleep(random.randint(1,3)) # 模拟网络延迟(查询环节) with open('ticket.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}') def paid(): with open('ticket.json', encoding='utf-8') as f1: dic = json.load(f1) if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.randint(1,3)) # 模拟网络延迟(购买环节) with open('ticket.json', encoding='utf-8',mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()} 购买成功') def task(lock): search() lock.acquire() paid() lock.release() if __name__ == '__main__': mutex = Lock() for i in range(6): p = Process(target=task,args=(mutex,)) p.start() # 当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行. # 互斥锁: 可以公平性的保证顺序以及数据的安全. # 基于文件的进程之间的通信: # 效率低. # 自己加锁麻烦而且很容易出现死锁.
2.基于队列通信
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
队列: 把队列理解成一个容器,这个容器可以承载一些数据,
队列的特性: 先进先出永远保持这个数据.
.
创建队列的类(底层就是以管道和锁定的方式实现):
1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
参数介绍:
1 maxsize是队列中允许最大项数,省略则无大小限制。
主要方法:
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常
q.get方法可以从队列读取并且删除一个元素。 同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)
q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
其他方法(了解):
q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
from multiprocessing import Queue q = Queue(3) # maxsize q.put(1) q.put('alex') q.put([1,2,3]) # q.put(5555,block=False) # 当队列满了时,在进程put数据就会阻塞. # print(q.get()) print(q.get()) print(q.get()) print(q.get(timeout=3)) # 阻塞3秒,3秒之后还阻塞直接报错. # print(q.get(block=False)) # 当数据取完时,在进程get数据也会出现阻塞,直到某一个进程put数据. # block=False 只要遇到阻塞就会报错.
3.基于管道
管道是有问题的,管道会造成数据的不安全,官方给予的解释是管道有可能会造成数据损坏。
7.生产者和消费者
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
1.为什么要使用生产者和消费者模式
线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
2.什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
起到缓冲的作用,平衡生产力与消费力,解耦.
from multiprocessing import Process from multiprocessing import Queue import time import random def producer(q,name): for i in range(1,6): time.sleep(random.randint(1,2)) res = f'{i}号包子' q.put(res) print(f'生产者{name} 生产了{res}') def consumer(q,name): while 1: try: food = q.get(timeout=3) time.sleep(random.randint(1, 3)) print(f'