python的进程与线程

  • 2019 年 10 月 3 日
  • 筆記

 

进程、线程的含义?

 

1.什么是进程?

  进程是指运行中的应用程序,每个进程都有自己独立的地址空间(内存空间)。比如用户点击桌面的IE浏览器,就启动了一个进程,操作系统就会为该进程分配独立的地址空间。当用户再次点击IE浏览器,又启动了一个进程,操作系统将为新的进程分配新的独立的地址空间。多进程就是“多任务”,就像使用电脑时同时打开浏览器上网、打开播放器听歌、后台还默默运行着杀毒软件一样。现代操作系统如Mac OS X,UNIX,Linux,Windows等都支持多进程,每启动一个进程,操作系统便为该进程分配一个独立的内存空间。

2.什么是线程?

  线程是进程中的一个实体,是被系统独立调度和分派的基本单位。一个进程可以有一个线程,也可以有多个线程。
  线程自己不拥有独立的系统资源,只拥有一点在运行中必不可少的资源,它可与同属一个进程的其它线程共享当前进程所拥有的全部资源。
  一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。
  线程有就绪(runnable)、阻塞(blocked)和运行(running)三种基本状态以及新建(new)和死亡(dead)状态。

为什么要有多进程和多线程?

  每个进程至少要干一件事,比如一个编辑器既要打字输入同时又要检测打错的拼写有时候还要区分一些关键字高亮显示,它们同属于编辑器这个进程,我们把编辑器作为一个进程,而以上这些工作就是它的子任务,如何实现他们同时工作呢?就是让每个子任务即线程短暂运行交替执行,由于它们彼此之间交替太快了,看起来就像同时运行一样。(真正的多线程需要多核CPU才能实现)
当我们要让一个python程序执行多个任务时,我们可以用多个进程或多个线程来完成我们的任务,他们之间彼此同时交替进行甚至一个任务依赖于另一个任务执行的结果,他们需要相互通信和协调,所以我们就需要用到多进程和多线程编程了。

实现多进程和多线程

1.多进程

  linux下可使用os模块的fork()。
  Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

import os    print('Process (%s) start...' % os.getpid())  # Only works on Unix/Linux/Mac:  pid = os.fork()  if pid == 0:    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))  else:    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

 

  windows下可以使用multiprocessing模块
  multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process  import os    # 子进程要执行的代码  def run_proc(name):    print('Run child process %s (%s)...' % (name, os.getpid()))    if __name__=='__main__':    print('Parent process %s.' % os.getpid())    p = Process(target=run_proc, args=('test',))    print('Child process will start.')    p.start()    p.join()    print('Child process end.')

 

  创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。
  join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

 

  Pool
  如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool  import os, time, random    def long_time_task(name):    print('Run task %s (%s)...' % (name, os.getpid()))    start = time.time()    time.sleep(random.random() * 3)    end = time.time()    print('Task %s runs %0.2f seconds.' % (name, (end - start)))    if __name__=='__main__':    print('Parent process %s.' % os.getpid())    p = Pool(4)    for i in range(5):    p.apply_async(long_time_task, args=(i,))    print('Waiting for all subprocesses done...')    p.close()    p.join()    print('All subprocesses done.')

 

  对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

 

  子进程
  很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

  subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

  下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess    print('$ nslookup www.python.org')  r = subprocess.call(['nslookup', 'www.python.org'])  print('Exit code:', r)

 

2.多线程

  使用threading模块实现多线程,Python的线程是真正的Posix Thread,而不是模拟出来的线程。

import time, threading    def loop():    print('线程 %s 在运行' % threading.current_thread().name)    n = 0    while n < 5:      n = n + 1      print('线程 %s >>> %s' % (threading.current_thread().name, n))      time.sleep(1)    print('线程 %s 结束.' % threading.current_thread().name)    print('线程 %s 在运行' % threading.current_thread().name)  t = threading.Thread(target=loop, name='子线程1')  t2 = threading.Thread(target=loop, name='子线程2')  t.start()  t2.start()  t.join()  t2.join()  print('线程 %s 结束.' % threading.current_thread().name)

 

或者

import time, threading
num
=0 lock = threading.Lock() def action_one():   global num   for i in range(3):     lock.acquire()     try:       print("线程1 %d"%num)       num+=1       time.sleep(1)     finally:       lock.release() def action_two():   global num   for i in range(3):     lock.acquire()     try:       print("线程2 %d"%num)       num+=1       time.sleep(1)     finally:       lock.release() t1 = threading.Thread(target=action_one, name='子线程1') t2 = threading.Thread(target=action_two, name='子线程2') t1.start() t2.start() t1.join() t2.join()

 

进程之间和线程之间的相互协调

1.进程间的通信:

  Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

  以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue  import os, time, random    # 写数据进程执行的代码:  def write(q):    print('Process to write: %s' % os.getpid())    for value in ['A', 'B', 'C']:      print('Put %s to queue...' % value)      q.put(value)      time.sleep(random.random())    # 读数据进程执行的代码:  def read(q):    print('Process to read: %s' % os.getpid())    while True:      value = q.get(True)      print('Get %s from queue.' % value)    if __name__=='__main__':    # 父进程创建Queue,并传给各个子进程:    q = Queue()    pw = Process(target=write, args=(q,))    pr = Process(target=read, args=(q,))    # 启动子进程pw,写入:    pw.start()    # 启动子进程pr,读取:    pr.start()    # 等待pw结束:    pw.join()    # pr进程里是死循环,无法等待其结束,只能强行终止:    pr.terminate()

 

  在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

2.线程间通信

  1.Queue

  使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。

  Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者”“消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 get() 和 put() 方法都支持非阻塞方式和设定超时。

import queue
q
= queue.Queue() try:   data = q.get(block=False) except queue.Empty:   ... try:   q.put(item, block=False) except queue.Full:   ... try:   data = q.get(timeout=5.0) except queue.Empty:   ...

def producer(q):   ...   try:     q.put(item, block=False)   except queue.Full:     log.warning('queued item %r discarded!', item) _running = True

def consumer(q):   while _running:     try:       item = q.get(timeout=5.0)       # Process item       ...     except queue.Empty:       pass

 

  最后,有 q.qsize() , q.full() , q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用empty() 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。

  为了避免出现死锁的情况,使用锁机制的程序应该设定为每个线程一次只允许获取一个锁。如果不能这样做的话,你就需要更高级的死锁避免机制。在 threading 库中还提供了其他的同步原语,比如 RLock 和 Semaphore 对象。

 

  Queue提供的方法:

task_done()

  意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

  如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()

  阻塞调用线程,直到队列中的所有任务被处理掉。

  只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

put(item[, block[, timeout]])

  将item放入队列中。
    1.如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
    2.如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
    3.如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常

  其非阻塞版本为put_nowait等同于put(item, False)

get([block[, timeout]])

  从队列中移除并返回一个数据。block跟timeout参数同put方法

  其非阻塞方法为get_nowait()相当与get(False)

empty()

  如果队列为空,返回True,反之返回False

 

  2.同步机制Event

  线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading 库中的 Event 对象。

  Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。

from threading import Thread, Event  import time    def countdown(n, start_evt):    print('countdown is starting...')    start_evt.set()    while n > 0:      print('T-minus', n)      n -= 1      time.sleep(5)    start_evt = Event() # 可通过Event 判断线程的是否已运行  t = Thread(target=countdown, args=(10, start_evt))  t.start()    print('launching countdown...')  start_evt.wait() # 等待countdown执行    # event 对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程    print('countdown is running...')

 

  Semaphore(信号量)
  在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是1)的限制。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。

from threading import Semaphore, Lock, RLock, Condition, Event, Thread  import time    # 信号量  sema = Semaphore(3) #限制同时能访问资源的数量为3    def foo(tid):    with sema:      print('{} acquire sema'.format(tid))      time.sleep(1)    print('{} release sema'.format(tid))      threads = []    for i in range(5):    t = Thread(target=foo, args=(i,))    threads.append(t)    t.start()    for t in threads:    t.join()

 

  Lock(锁)
  互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。

  互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

#创建锁  mutex = threading.Lock()  #锁定  mutex.acquire([timeout])  #释放  mutex.release()

  RLock(可重入锁)
  为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

import threading  import time    class MyThread(threading.Thread):    def run(self):      global num      time.sleep(1)        if mutex.acquire(1):        num = num+1        msg = self.name+' set num to '+str(num)        print msg        mutex.acquire()        mutex.release()        mutex.release()  
num
= 0 mutex = threading.RLock()

def test():   for i in range(5):     t = MyThread()     t.start()
if __name__ == '__main__':   test()

 

  Condition(条件变量)
  Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

  可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。

  Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

  除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

import threading  import time    class Producer:    def run(self):      global count      while True:        if con.acquire():          if count > 1000:            con.wait()          else:            count += 100            msg = threading.current_thread().name + ' produce 100, count=' + str(count)            print(msg)            con.notify() # 通知 waiting线程池中的线程          con.release()          time.sleep(1)    count = 0  con = threading.Condition()    class Consumer:    def run(self):      global count      while True:      if con.acquire():        if count < 100:          con.wait()        else:          count -= 3          msg = threading.current_thread().name + ' consumer 3, count=' + str(count)          print(msg)          con.notify()        con.release()        time.sleep(3)    producer = Producer()

 

进程和线程的比较

1.稳定性

  多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了它拥有自己独立的内存空间,不会影响主进程和其他子进程(主进程崩掉,子进程也难逃厄运)。多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。
  多线程模式通常比多进程快,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。

2.切换开销

  首先上下文切换就是从当前执行任务切换到另一个任务执行的过程。但是,为了确保下次能从正确的位置继续执行,在切换之前,会保存上一个任务的状态。
  操作系统在切换进程或者线程时需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。
  但是线程的切换虚拟空间内存是相同的,但是进程切换的虚拟空间内存则是不同的。所以线程上下文切换比进程上下文切换快的多。同时,这两种上下文切换的处理都是通过操作系统内核来完成的。内核的这种切换过程伴随的最显著的性能损耗是将寄存器中的内容切换出。

3.计算密集型和IO密集型下的选择

  我们可以把任务分为计算密集型和IO密集型。
  计算密集型任务的特点是要进行大量的计算,消耗CPU资源。IO密集型任务的特点是涉及到网络、磁盘IO,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成

 

对比维度

多进程

多线程

总结

数据共享、同步

数据共享复杂,需要用IPC;数据是分开的,同步简单

因为共享进程数据,数据共享简单,但也是因为这个原因导致同步复杂

各有优势

内存、CPU

占用内存多,切换复杂,CPU利用率低

占用内存少,切换简单,CPU利用率高

线程占优

创建销毁、切换

创建销毁、切换复杂,速度慢

创建销毁、切换简单,速度很快

线程占优

编程、调试

编程简单,调试简单

编程复杂,调试复杂

进程占优

可靠性

进程间不会互相影响

一个线程挂掉将导致整个进程挂掉

进程占优

分布式

适应于多核、多机分布式;如果一台机器不够,扩展到多台机器比较简单

适应于多核分布式

进程占优

 

(1)需要频繁创建销毁的优先用线程

  原因请看上面的对比。

  这种原则最常见的应用就是Web服务器了,来一个连接建立一个线程,断了就销毁线程,要是用进程,创建和销毁的代价是很难承受的

(2)需要进行大量计算的优先使用线程

  所谓大量计算,当然就是要耗费很多CPU,切换频繁了,这种情况下线程是最合适的。

  这种原则最常见的是图像处理、算法处理。

(3)强相关的处理用线程,弱相关的处理用进程

  什么叫强相关、弱相关?理论上很难定义,给个简单的例子就明白了。

  一般的Server需要完成如下任务:消息收发、消息处理。“消息收发”和“消息处理”就是弱相关的任务,而“消息处理”里面可能又分为“消息解码”、“业务处理”,这两个任务相对来说相关性就要强多了。因此“消息收发”和“消息处理”可以分进程设计,“消息解码”、“业务处理”可以分线程设计。

  当然这种划分方式不是一成不变的,也可以根据实际情况进行调整。

(4)可能要扩展到多机分布的用进程,多核分布的用线程

  原因请看上面对比。

 

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2qkn8bupvco4o