队列、进程互斥锁、线程
- 2019 年 12 月 16 日
- 笔记
1.进程的并行和并发
并行: 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )
并发: 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。
2.并行和并发的区别
并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。
3.进程互斥锁
作用:让加锁的部分由并发变成串行,牺牲了执行效率,保证了数据安全。
应用:在程序使用同一份数据时,就会引发数据安全和数据混乱等问题,需要使用锁来维持数据的顺序取用。
下面的小程序模拟抢票软件,对票数进行修改
#查看余票 import json import time from multiprocessing import Process from multiprocessing import Lock #查看余票 def search(user): #打开data文件查看余票 with open('data.txt','r',encoding='utf-8') as f: dic = json.load(f) print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}') #抢票功能 def buy(user): with open('data.txt','r',encoding='utf-8') as f : dic = json.load(f) if dic.get("ticket_num")>0: dic["ticket_num"] -= 1 with open('data.txt','w',encoding='utf-8') as f: json.dump(dic,f) print(f'用户{user}抢票成功!') else: print(f'用户{user}抢票失败') #开始抢票 def run(user,mutex): search(user) mutex.acquire() buy(user) mutex.release() if __name__ == '__main__': #调用Lock类实例化一个所对象 mutex = Lock() # mutex.acquire()#加锁 # mutex.release()#释放锁 for i in range(10): #并发十个子进程 p = Process(target=run,args=(f'{i}',mutex)) p.start() 用户1查看余票,还剩6 用户1抢票成功! 用户0查看余票,还剩5 用户0抢票成功! 用户2查看余票,还剩4 用户2抢票成功! 用户3查看余票,还剩3 用户3抢票成功! 用户4查看余票,还剩2 用户4抢票成功! 用户6查看余票,还剩1 用户6抢票成功! 用户5查看余票,还剩0 用户5抢票失败 用户7查看余票,还剩0 用户7抢票失败 用户9查看余票,还剩0 用户9抢票失败 用户8查看余票,还剩0 用户8抢票失败 #这里如果不使用互斥锁就会导致票数和抢到的人数不符。
4.队列
原则:先进先出(堆栈,先进后出)
相当于内存中产生一个队列空间,可以存放多个数据,但是数据是先进去的先被取出来。
4.1multiprocess.Queue介绍
Queue是多进程的列队,可以实现多进程间的数据传递。
Queue([maxsize])
:创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
:返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
:如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
:如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()
方法)。
其他方法(了解)
q.close()
:关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()
操作上,关闭生产者中的队列不会导致get()
方法返回错误。
q.cancel_join_thread()
:不会再进程退出时自动连接后台线程。这可以防止join_thread()
方法阻塞。
q.join_thread()
:连接队列的后台线程。此方法用于在调用q.close()
方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()
方法可以禁止这种行为。
4.2 IPC进程间通信实例1
from multiprocessing import Process,Queue def test1(q): data = '数据hello' q.put(data)#向队列中添加数据,如果列队已经填满则会卡在这里不会往下执行,直到列队空出位置让其把数据放进去 print('进程1开始添加数据到列队中。。') def test2(q): data = q.get()#从队列中取出数据,如果列队中已经没有数据给它,也会卡住 #q.get_nowait()#如果获取不到数据就报错 print(f'进程2从队列中获取数据{data}') #q.empty()#判断列队是否为空,返回bool值 #q.full()#判断列队是否满了,返回bool值 if __name__ == '__main__': q = Queue(2)#括号内填队列中可以存放元素的个数,不填默认为无限大 p1 = Process(target=test1,args=(q,)) p2 = Process(target=test2,args=(q,)) p1.start() p2.start() p2.join() print('主程序')
4.3 ICP通信实例2:生产者与消费者模型
生产者:生产数据的
消费者:使用数据的
在程序中,生产者把数据添加到队列中,消费者从队列中获取数据。
from multiprocessing import Queue,Process import time def producer(name,food,q): for i in range(9): data = food,i msg = f'用户{name}开始制作{data}' print(msg) q.put(data) time.sleep(0.1)#由于cup执行速度太快,这里加个延时,让两个消费者都能抢到CPU的使用权 def consumer(name,q): while True: data = q.get() if not data: break print(f'用户{name}开始吃{data}') if __name__ == '__main__': q = Queue() #创造生产者 p1 = Process(target=producer,args=('tank','油条',q)) p2 = Process(target=producer,args=('小明','馒头',q)) #消费者 c1 = Process(target=consumer,args=('tom',q)) c2 = Process(target=consumer,args=('juery',q)) p1.start() p2.start() c1.daemon = True c2.daemon = True#为消费者添加守护进程,主程序完成就结束掉 c1.start() c2.start() p2.join()#这里的目的是当生产者p2等消费者吃完再结束,给主程序加延时也能达到同样的效果 #time.sleep(2) print('主程序')
5.线程
5.1什么是线程?
线程(thread)是操作系统能够进行运算调度的最小单位。它包含在进程之中,是进程的实际运行单位。一条线程指的是进程中一个单一顺序控制流,一个进程可以并发多个线程,每条线程并发执行不同的任务。同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述和信号处理等等。但是同一进程中的多个线程有各自的调用栈,自己的寄存器环境,自己的进程本地存储。
在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率,进一步提高系统的并发性。
进程与线程的区别:
进程是系统进行资源分配和调度的基本单位,线程是是操作系统能够进行运算调度的最小单位。线程包含在进程之中,是进程的实际运行单位。
为什么要使用线程?
线程进一步提高了CPU的使用效率。
注意:线程不能实现并行,只能实现并发。
注意:进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程。
5.2 使用线程的实际场景

开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。
5.3 内存中的线程

多个线程共享同一个进程的地址空间中的资源,是对一台计算机上多个进程的模拟,有时也称线程为轻量级的进程。
而对一台计算机上多个进程,则共享物理内存、磁盘、打印机等其他物理资源。多线程的运行也多进程的运行类似,是CPU在多个线程之间的快速切换。
不同的进程之间是充满敌意的,彼此是抢占、竞争CPU的关系,如果迅雷会和QQ抢资源。而同一个进程是由一个程序员的程序创建,所以同一进程内的线程是合作关系,一个线程可以访问另外一个线程的内存地址,大家都是共享的,一个线程干死了另外一个线程的内存,那纯属程序员脑子有问题。
类似于进程,每个线程也有自己的堆栈,不同于进程,线程库无法利用时钟中断强制线程让出CPU,可以调用thread_yield运行线程自动放弃CPU,让另外一个线程运行。
线程通常是有益的,但是带来了不小程序设计难度,线程的问题是:
- 父进程有多个线程,那么开启的子线程是否需要同样多的线程。
- 在同一个进程中,如果一个线程关闭了文件,而另外一个线程正准备往该文件内写内容呢?
因此,在多线程的代码中,需要更多的心思来设计程序的逻辑、保护程序的数据。
5.4用户级线程和内核级线程(了解)
线程的实现可以分为两类:用户级线程(User-Level Thread)和内核线线程(Kernel-Level Thread),后者又称为内核支持的线程或轻量级进程。在多线程操作系统中,各个系统的实现方式并不相同,在有的系统中实现了用户级线程,有的系统中实现了内核级线程。
5.4.1用户级线程
内核的切换由用户态程序自己控制内核切换,不需要内核干涉,少了进出内核态的消耗,但不能很好的利用多核CPU。

在用户空间模拟操作系统对进程的调度,来调用一个进程中的线程,每个进程中都会有一个运行时系统,用来调度线程。此时当该进程获取CPU时,进程内再调度出一个线程去执行,同一时刻只有一个线程执行。
5.4.2内核级线程
内核级线程:切换由内核控制,当线程进行切换的时候,由用户态转化为内核态。切换完毕要从内核态返回用户态;可以很好的利用smp,即利用多核CPU。windows线程就是这样的。

5.5 用户级与内核级线程的对比
5.5.1 用户级线程和内核级线程的区别
- 内核支持线程是OS内核可感知的,而用户级线程是OS内核不可感知的。
- 用户级线程的创建、撤消和调度不需要OS内核的支持,是在语言(如Java)这一级处理的;而内核支持线程的创建、撤消和调度都需OS内核提供支持,而且与进程的创建、撤消和调度大体是相同的。
- 用户级线程执行系统调用指令时将导致其所属进程被中断,而内核支持线程执行系统调用指令时,只导致该线程被中断。
- 在只有用户级线程的系统内,CPU调度还是以进程为单位,处于运行状态的进程中的多个线程,由用户程序控制线程的轮换运行;在有内核支持线程的系统内,CPU调度则以线程为单位,由OS的线程调度程序负责线程的调度。
- 用户级线程的程序实体是运行在用户态下的程序,而内核支持线程的程序实体则是可以运行在任何状态下的程序。
5.5.2内核线程的优缺点
优点:当有多个处理机时,一个进程的多个线程可以同时执行。
缺点:由内核进行调度。
5.5.3用户级线程的优缺点
- 优点:
- 线程的调度不需要内核直接参与,控制简单。
- 可以在不支持线程的操作系统中实现。
- 创建和销毁线程、线程切换代价等线程管理的代价比内核线程少得多。
- 允许每个进程定制自己的调度算法,线程管理比较灵活。
- 线程能够利用的表空间和堆栈空间比内核级线程多。
- 同一进程中只能同时有一个线程在运行,如果有一个线程使用了系统调用而阻塞,那么整个进程* 都会被挂起。另外,页面失效也会产生同样的问题。
- 缺点:
- 资源调度按照进程进行,多个处理机下,同一个进程中的线程只能在同一个处理机下分时复用
5.6 混合实现
用户级与内核级的多路复用,内核同一调度内核线程,每个内核线程对应n个用户线程。

5.6.1 linux操作系统的 NPTL
历史:在内核2.6以前的调度实体都是进程,内核并没有真正支持线程。它是能过一个系统调用clone()来实现的,这个调用创建了一份调用进程的拷贝,跟fork()不同的是,这份进程拷贝完全共享了调用进程的地址空间。LinuxThread就是通过这个系统调用来提供线程在内核级的支持的(许多以前的线程实现都完全是在用户态,内核根本不知道线程的存在)。非常不幸的是,这种方法有相当多的地方没有遵循POSIX标准,特别是在信号处理,调度,进程间通信原语等方面。
很显然,为了改进LinuxThread必须得到内核的支持,并且需要重写线程库。为了实现这个需求,开始有两个相互竞争的项目:IBM启动的NGTP(Next Generation POSIX Threads)项目,以及Redhat公司的NPTL。在2003年的年中,IBM放弃了NGTP,也就是大约那时,Redhat发布了最初的NPTL。
NPTL最开始在redhat linux 9里发布,现在从RHEL3起内核2.6起都支持NPTL,并且完全成了GNU C库的一部分。
设计:NPTL使用了跟LinuxThread相同的办法,在内核里面线程仍然被当作是一个进程,并且仍然使用了clone()系统调用(在NPTL库里调用)。但是,NPTL需要内核级的特殊支持来实现,比如需要挂起然后再唤醒线程的线程同步原语futex.
NPTL也是一个1*1的线程库,就是说,当你使用pthread_create()调用创建一个线程后,在内核里就相应创建了一个调度实体,在linux里就是一个新进程,这个方法最大可能的简化了线程的实现。
除NPTL的11模型外还有一个mn模型,通常这种模型的用户线程数会比内核的调度实体多。在这种实现里,线程库本身必须去处理可能存在的调度,这样在线程库内部的上下文切换通常都会相当的快,因为它避免了系统调用转到内核态。然而这种模型增加了线程实现的复杂性,并可能出现诸如优先级反转的问题,此外,用户态的调度如何跟内核态的调度进行协调也是很难让人满意。
5.7 GIL全局解释器锁
Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
- 设置 GIL;
- 切换到一个线程去运行;
- 运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
- 把线程设置为睡眠状态;
- 解锁 GIL;
- 再次重复以上所有步骤。
在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。
5.8 开启线程的两种方式
5.8.1 方式一:直接实例化Thread的对象
如果要创建多个线程可以使用for循环
from threading import Thread import time def task(): print('线程开启') time.sleep(1) print('线程结束') if __name__ == '__main__': t = Thread(target=task)#实例化线程对象,可以在主程序进行,也可以不再主程序进行 t.start()
5.8.2 方式二:继承Thread类
from threading import Thread import time class MyThresd(Thread): def run(self): print('线程开启') time.sleep(1) print('线程结束') t = MyThresd() t.start()
5.9 线程对象的属性
线程的属性和进程的属性有些相似,功能也相似。
Thread实例对象的方法:
- join()子线程结束后主线程再结束
- start()开启线程
- is_alive()查看线程是否存活返回bool值
- isAlive()查看线程是否存活返回bool值
- daemon = True守护进程
getName()
:返回线程名。setName()
:设置线程名。
threading模块提供的一些方法:
threading.currentThread()
:返回当前的线程变量。threading.enumerate()
:返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread from threading import current_thread import time def task(): print(f'线程开启{current_thread().name}') time.sleep(3) print(f'线程结束{current_thread().name}') if __name__ == '__main__': for i in range(3): t = Thread(target=task) t.start() print(t.isAlive()) print(t.is_alive()) t1 = Thread(target=task) t1.daemon = True t1.start()#这里t1能否正常结束就看t1能否快速的抢到CPU执行自己的代码了,能抢到则可以正常打印“线程结束”,否则就被主程序结束掉了 print('主线程') 线程开启Thread-1 True True 线程开启Thread-2 True True 线程开启Thread-3 True True 线程开启Thread-4 主线程 线程结束Thread-1 线程结束Thread-3 线程结束Thread-2
5.10 线程互斥锁
线程互斥锁和进程互斥锁的作用是一样的,用法也很相似,在需要保护数据的地方加锁就可以了。
from threading import Thread,Lock import time mutex = Lock() n = 100 def task(i): print(f'线程{i}启动。。') global n mutex.acquire()#获取,加锁 temp = n time.sleep(0.1) n = temp - 1 print(n) mutex.release()#释放 #如果不加锁,么个线程获取到的值都是100,所有程序都在执行100-1的操作,加锁之后,每个线程获取到的数据是前一个线程计算完成的结果 if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task,args=(i,)) t_l.append(t) t.start() for t in t_l: t.join() print(n)