Python学习—pyhton中的进程
- 2020 年 1 月 6 日
- 笔记
1.进程定义
进程: 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据、进程控制块(pcb)三部分组成。 (1)我们编写的程序用来描述进程要完成哪些功能以及如何完成; (2)数据则是程序在执行过程中所需要使用的资源; (3)进程控制块用来记录进程的所有信息。系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
2.创建进程
新创建的进程在内存独立开辟一块空间,不与其他进程共享空间、数据。 同一个进程中,新创建的线程与此进程里其他线程共享空间、数据。
1.os.fork()函数
os模块的三个方法: os.fork()创建一个当前进程的子进程 os.getpid()获取当前进程pid os.getppid()获取当前进程的父进程的Pid 关于fork(): 它用来创建一个进程,即为当前进程的子进程,复制父进程的所有代码并从fork语句处开始运行。运行父进程还是子进程的取决于当前os调度策略。 在父进程中返回子进程的pid,在子进程中返回0。即返回0表示在子进程中运行,返回大与0的数表示在父进程中运行。
例子:
import os print('当前进程:',os.getpid()) print('当前进程的父进程:',os.getppid()) pid = os.fork() if pid == 0: print('此时为子进程:',os.getpid(),'n其父进程:',os.getppid()) else: print('父进程:',os.getpid(),'nos.fork的返回值pid:',pid)
运行结果:
当前进程: 16839 当前进程的父进程: 2912 父进程: 16839 os.fork的返回值pid: 16842 此时为子进程: 16842 其父进程: 16839
从运行结果中看,在linux中fork产生子进程后是先运行父进程,当父进程结束后再进入子进程运行。
2.实例化进程类
直接通过实例化进程类multiprocessing.Process创建新进程。 和线程类一样,进程类也有start()方法,join()方法。调用对象的start()方法实例上也是调用的类中的run()方法。
# 导入进程模块 import multiprocessing import os def job(ss): print(ss,'当前子进程:%s' %os.getpid()) #实例化进程类,并提交任务,传入任务所需要的参数 p1 = multiprocessing.Process(target=job,args=('abc',)) p1.start() p2 = multiprocessing.Process(target=job,args=('123',)) p2.start() # 和线程一样,进程也有join方法。 p1.join() p2.join() print('完成......')
运行结果:
abc 当前子进程:17234 123 当前子进程:17235 完成......
3.继承进程类来自定义进程类
继承python提供的进程类,重写方法,创建自己所需要的进程类,再实例化自定义的进程类。
import multiprocessing class Job(multiprocessing.Process): #重写构造方法 def __init__(self,cc): super(Job, self).__init__() self.cc = cc #重写run方法,和线程一样 def run(self): print(self.cc) #实例化对象 if __name__ == "__main__": pp = [] for i in range(10): p = Job(str(i)+':123456') pp.append(p) p.start() for p in pp: p.join() print('hahhahaha')
运行结果:
0:123456 1:123456 2:123456 3:123456 4:123456 5:123456 6:123456 7:123456 8:123456 9:123456 hahhahaha
3.多进程与多线程的对比
import threading import multiprocessing from timeit import timeit class Jobthread(threading.Thread): def __init__(self,li): super(Jobthread,self).__init__() self.li = li def run(self): sum(self.li) class Jobprocess(multiprocessing.Process): def __init__(self,li): super(Jobprocess, self).__init__() self.li = li def run(self): for i in self.li: sum(i) # 这个装饰器是自己写的,用来计算某个函数执行时间 @timeit def use_Pro(list): for i in range(0,len(list), 1000): p = Jobprocess(list[i:i+1000]) p.start() @timeit def use_Thr(list): for li in list: t = Jobthread(li) t.start if __name__ == "__main__": list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000 use_Pro(list) use_Thr(list)
运行结果:
use_Pro运行时间0.0041866302490234375 use_Thr运行时间0.02240157127380371
正如看到的结果一样,多进程适合计算密集型任务,多线程适合i/o密集型任务。
3.守护进程与终止进程
1.守护进程-daemon属性
和线程类似,进程类也有一个daemon属性,默认值为False。 当改变他的值为True时,当主进程结束,就会强行终止其他的所以进程。 实例: (1)第一个程序
import multiprocessing import time def job(): print('开始子进程') time.sleep(3) print('子进程结束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.start() print("程序结束......")
运行结果:
程序结束...... 开始子进程 子进程结束
主进程结束,其他进程还在继续执行。 (2)第二个程序
import multiprocessing import time def job(): print('开始子进程') time.sleep(3) print('子进程结束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.daemon = True p.start() print("程序结束......")
运行结果:
程序结束......
当主进程结束,其他进程将会被强制终止结束。
2.终止进程
import multiprocessing import time def job(): print('开始子进程') time.sleep(3) print('子进程结束') if __name__ == "__main__": p = multiprocessing.Process(target=job) p.daemon = True print(p.is_alive()) #启动进程之前查看进程状态 p.start() print(p.is_alive()) #启动进程之后查看进程状态 p.terminate() #终止进程 print(p.is_alive()) #终止进程命令一发出后,查看进程状态。此时进程在释放过程中,还没有被完全释放。 p.join() #先让进程完全释放 print(p.is_alive()) #最后查看进程状态 print("程序结束......")
运行结果:
False True True False 程序结束......
4.进程间通信
""" 通过队列实现进程间通信,队列充当消息管道的作用(类似生产者消费者模型) 这里通信一直存在,也就是这两个进程会一直存在,没有销毁释放。 """ import multiprocessing from multiprocessing import Queue import time class Put_news(multiprocessing.Process): def __init__(self,queue): super(Put_news, self).__init__() self.queue = queue def run(self): for i in range(100): self.queue.put(i) print("传递消息:%s" %i) time.sleep(0.1) class Get_news(multiprocessing.Process): def __init__(self,queue): super(Get_news, self).__init__() self.queue = queue def run(self): while True: time.sleep(0.11) print("接收消息++++++++++++:%s" %(self.queue.get())) if __name__ == "__main__": q = Queue() p = Put_news(q) g = Get_news(q) p.start() g.start() if not p.is_alive(): g.terminate()
运行结果:


5.分布式进程
任务需要处理的数据特别大, 希望多台主机共同处理任务。multiprocessing.managers子模块里面可以实现将进程分布到多台机器上 (管理端主机要运算一些列任务,通过与其他主机建立“连接“,将任务分配给其他主机执行,并将执行结果返回给管理端主机。) 管理端主机代码:
import random from queue import Queue from multiprocessing.managers import BaseManager # 1.创建队列(发送任务的队列,收取结果的队列) task_queue = Queue() result_queue = Queue() # 第二三步骤可以互换顺序 # 2.将队列注册到网络(这样其他主机可以通过网络接收任务,发送结果) # 注册的队列(任务队列,结果队列)的唯一标识码分别为'put_task_queue','get_result_queue' BaseManager.register('put_task_queue',callable=lambda :task_queue) BaseManager.register('get_result_queue',callable=lambda : result_queue) # 3.绑定端口(3333),设定密码(hahahaha) manager = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha') # 4.启动manager,开始共享队列 manager.start() # 5.通过网络访问共享的队列 task = manager.put_task_queue() result = manager.get_result_queue() # 6.向任务队列中放入执行任务的数据,这里放入100个任务 for i in range(100): n = random.randint(10,500) task.put(n) print('任务列表加入数据:'+str(n)) # 7.从结果队列中读取各个主机的任务执行结果 for j in range(100): res = result.get() print('执行结果:'+str(res)) # 8.任务执行结束,关闭共享队列 manager.shutdown()
运算主机代码:
""" 在各个工作主机上执行的代码相同 """ from multiprocessing.managers import BaseManager # 1. 连接manager端,获取共享的队列 import time worker = BaseManager(address=('172.25.254.158',3333),authkey=b'hahahaha') # 2.注册队列,去获取网络上共享的队列中的内容 BaseManager.register('put_task_queue') BaseManager.register('get_result_queue') # 3.连接网络 worker.connect() # 4.通过网络访问共享的队列 task = worker.put_task_queue() result = worker.get_result_queue() # 5.读取任务,处理任务,这里读取了50个任务进行处理 # 每台运算主机上的处理任务数量可以不同,不过为了避免修改代码,一般都相同。 for i in range(50): n = task.get() print('执行任务 %d**2 = '%(n)) res = '%d**2=%d' %(n,n**2) #这里设置执行的任务是求平方 result.put(res) #将结果放入结果队列 time.sleep(1) #休息1秒 print('工作主机执行任务结束.....')
6.进程池
和线程一样,进程也有进程池。 1.第一种方法
import multiprocessing import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(3) # 创建含有8个进程的进程池 pool = multiprocessing.Pool(8) # 给进城池的进程分配任务 for i in range(12): pool.apply_async(job,args=(i,)) # 关闭进程池,使进程池不再工作运行 pool.close() # 等待所有子进程结束之后再开始主进程 pool.join() print('all works completed!')
运行结果:
start id ---> 0 end id ----> 0 start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4 start id ---> 5 end id ----> 5 start id ---> 6 end id ----> 6 start id ---> 7 end id ----> 7 start id ---> 8 end id ----> 8 start id ---> 9 end id ----> 9 start id ---> 10 end id ----> 10 start id ---> 11 end id ----> 11 all works completed!
2.第二种方法
from concurrent.futures import ProcessPoolExecutor import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(3) # 创建含有2个进程的进程池 pool = ProcessPoolExecutor(max_workers=2) # 给进程池的进程分配任务,submit方法返回一个_base.Future对象 f1 = pool.submit(job,1) f2 = pool.submit(job,2) f3 = pool.submit(job,3) f4 = pool.submit(job,4) # 执行f1对象的各种方法 f1.done() f1.result()
运行结果:
start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4
3.第三种方法
from concurrent.futures import ProcessPoolExecutor import time def job(id): print('start id ---> %d' %id) print('end id ----> %d' %id) time.sleep(1) pool = ProcessPoolExecutor(max_workers=3) pool.map(job,range(1,10))
运行结果:
start id ---> 1 end id ----> 1 start id ---> 2 end id ----> 2 start id ---> 3 end id ----> 3 start id ---> 4 end id ----> 4 start id ---> 5 end id ----> 5 start id ---> 6 end id ----> 6 start id ---> 7 end id ----> 7 start id ---> 8 end id ----> 8 start id ---> 9 end id ----> 9