Event事件、进程池与线程池、协程

  • 2019 年 12 月 16 日
  • 筆記

Event事件

Event事件的作用:

  • 用来控制线程的执行。
  • 由一些线程去控制另一些线程。
from threading import Event  from threading import Thread  import time  e = Event()  # e.wait()  # e.set()  def light():      print('红灯亮了...')      time.sleep(5)      e.set()#set方法的作用是将wait方法的False状态改为True,      #当e中的wait状态为False时程序会暂停,当为True状态时程序会继续运行        print('绿灯亮了。。。')    def car(name):      print(f'{name}正在等红灯。。。')      e.wait()      print(f'{name}正在加速漂移。。。')    t = Thread(target=light)  t.start()    for line in range(10):      t1 = Thread(target=car,args=(f'{line}号car',))      t1.start()

进程池与线程池

1)什么是进程池和线程池?

​ 进程池与线程池是用来控制当前程序允许创建(进程/线程)的数量。

2)进程池与线程池的作用:

​ 保证在硬件允许的范围内创建(进程/线程)的数量。

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor  import time  #pool = ProcessPoolExecutor()#默认以CPU的个数限制进程数  #pool = ThreadPoolExecutor()默认以CPU的个数*5限制线程数    pool = ThreadPoolExecutor(5)#代表处于并发状态的最多有5个线程  #也就是在池子里面的只能有5个,但是如果池子里面有进程或线程出来了就可以有新的进程或线程进入池子。  def task():      print('线程任务开始了。。。')      time.sleep(1)      print('线程任务结束了。。。')    for line in range(5):#这里对线程的提交是异步提交      pool.submit(task)    #打印结果:  线程任务开始了。。。  线程任务开始了。。。  线程任务开始了。。。  线程任务开始了。。。  线程任务开始了。。。  线程任务结束了。。。  线程任务结束了。。。  线程任务结束了。。。  线程任务结束了。。。  线程任务结束了。。。

回调函数

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor  import time  #pool = ProcessPoolExecutor()#默认以CPU的个数限制进程数  #pool = ThreadPoolExecutor()默认以CPU的个数*5限制线程数    pool = ThreadPoolExecutor(5)#代表处于并发状态的最多有5个线程  #也就是在池子里面的只能有5个,但是如果池子里面有进程或线程出来了就可以有新的进程或线程进入池子。  def task(res):      print(res)      print('线程任务开始了。。。')      time.sleep(1)      print('线程任务结束了。。。')      return 123    def call_back(res):      print(type(res))      res2 = res.result()#接收到的是task return的值      print(res2)  # 异步提交任务  # pool.submit('传函数地址').add_done_callback('回调函数地址')    for line in range(5):      pool.submit(task,1).add_done_callback(call_back)      #这里的1是传入task的参数    pool.shutdown()#让线程池内的所有任务执行完成后,才执行下面的代码  print('所有线程运行结束')    #执行结果  1  线程任务开始了。。。  1  线程任务开始了。。。  1  线程任务开始了。。。  1  线程任务开始了。。。  1  线程任务开始了。。。  线程任务结束了。。。  线程任务结束了。。。  线程任务结束了。。。  线程任务结束了。。。  <class 'concurrent.futures._base.Future'>  <class 'concurrent.futures._base.Future'>  <class 'concurrent.futures._base.Future'>  <class 'concurrent.futures._base.Future'>  123  123  线程任务结束了。。。  123  123  <class 'concurrent.futures._base.Future'>  123  所有线程运行结束

使用线程池和回调函数高性能爬取梨视频

爬取步骤:

  1. 从主页中获取所有视频的ID号,拼接视频详情页URL
  2. 在视频详情页中获取真实的视频URL
  3. 往真实视频URL地址发送请求,获取视频二进制数据
  4. 把视频二进制数据保存到本地
from concurrent.futures import ThreadPoolExecutor  import requests  import re  import uuid    pool = ThreadPoolExecutor(100)    #1.发送请求函数  def get_page(url):      response = requests.get(url)      print("获取电影信息标记",response)      return response    #2.解析主页获取视频ID号  def parse_index(response):      id_list = re.findall('<a href="video_(.*?)".*?>',                           response.text,re.S)      return id_list    #3.解析视频详情页获取真实视频链接  def parse_detail(res):      response = res.result()      movie_detail_url = re.findall('srcUrl="(.*?)"',response.text,re.S)[0]      print(f'往视频链接:{movie_detail_url}发送请求。。。')      #异步向视频详情页发送请求,      print("电影链接标记",movie_detail_url)      pool.submit(get_page,movie_detail_url).add_done_callback(save_movie)      return movie_detail_url  #保存视频  def save_movie(res):      print('biaoji')      movie_response = res.result()      print('biaoji',movie_response)      name = str(uuid.uuid4())      print(f'{name}视频开始保存。。。')      with open(f'{name}.mp4','wb') as f:          f.write(movie_response.content)      print('视频下载完毕')    if __name__ == '__main__':      #1.访问视频主页获取数据      index_response = get_page('https://www.pearvideo.com/')      #2.解析主页获取所有视频ID号      id_list = parse_index(index_response)      #3.循环对每个视频详情页链接进行拼接      for id in id_list:          print(id)          detail_url = 'https://www.pearvideo.com/video_' + id          #异步提交爬取视频详情页,把返回的数据,交给parse_detail(回调函数)          pool.submit(get_page,detail_url).add_done_callback(parse_detail)

协程

对比一下:

​ 进程:资源单位

​ 线程:执行单位

​ 协程:在单线程下实现并发

注意:协程不是操作系统资源,它是程序员起的名字,目的是为让单线程能实现并发。

协程的目的:通过手动模拟操作系统“多道技术”,实现切换+保存状态。

​ 1)手动实现遇到IO切换,欺骗操作系统误以为没有IO操作,特点:

​ 1.单线程,遇到IO切换+保存状态

​ 2.单线程,计算密集型,来回切换+保存状态,反而效率更低。

优点:在IO密集型的情况下,会提高效率。

缺点:若在计算密集型的情况下,来回切换,反而效率更低。

如何实现协程?

方法:切换+保存状态

​ yield:保存状态

​ 并发:切换

gevent模块

​ Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

import gevent  def eat(name):      print(f'{name} eat 1')      gevent.sleep(2)      print(f'{name} eat 2')    def play(name):      print('%s play 1'%name)      gevent.sleep(1)      print('%s play 2'%name)  #创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,  # 后面可以有多个参数,可以是位置实参或关键字实参,  # 都是传给函数eat的  g1 = gevent.spawn(eat,'egon')  g2 = gevent.spawn(play,name='egon')  g1.join()  g2.join()  print('主')    egon eat 1  egon play 1  egon play 2  egon eat 2  主

上面代码的gevent.sleep(2)模拟的是gevent可以识别的IO阻塞,而time.sleep()或者其他的阻塞gevent是不能识别的,需要用下面一行代码打补丁。

from gevent import monkey;monkey.patch_all()

必须放在被打补丁者的前面,如放在time,socket模块之前,只要用gevent,就将上面的代码放在第一行就可以了。

from gevent import monkey;monkey.patch_all()  import gevent  import time      def eat(name):      print(f'{name} eat 1')      time.sleep(2)      print(f'{name} eat 2')    def play(name):      print('%s play 1'%name)      time.sleep(1)      print('%s play 2'%name)    g1 = gevent.spawn(eat,'egon')  g2 = gevent.spawn(play,name='egon')  g1.join()  g2.join()  print('主')    egon eat 1  egon play 1  egon play 2  egon eat 2  主

通过gevent实现单线程下的socket并发

#服务端  from gevent import monkey;monkey.patch_all()  import gevent  import socket  import threading  #如果不想用money.patch_all()打补丁,可以用gevent自带的socket  # from gevent import socket  # s=socket.socket()    def server(server_ip,port):      s = socket.socket()      s.bind((server_ip,port))      s.listen(5)      while True:          print(threading.current_thread().name)          conn,addr = s.accept()          gevent.spawn(talk,conn,addr)    def talk(conn,addr):      try:          while True:              res = conn.recv(1024).decode('utf-8')              print(f'client{addr[0]}:{addr[1]} msg:{res}')              conn.send(res.upper().encode('utf-8'))      except Exception as e:          print(e)      finally:          conn.close()    if __name__ == '__main__':      server('127.0.0.1',6666)
#客户端  import threading  import socket      def client(server_ip,port):      c = socket.socket()      #套接字对象一定要加到函数内,即局部名称空间内,      # 放在函数外则被所有线程共享,则大家公用一个套接字对象,      # 那么客户端端口永远一样了      c.connect((server_ip,port))      count = 0      while True:          c.send(f'{threading.current_thread().getName()} say hello {count}'.encode('utf-8'))          msg = c.recv(1024)          print(msg.decode('utf-8'))          count += 1    if __name__ == '__main__':      for i in range(10):          t = threading.Thread(target=client,args=('127.0.0.1',6666))          t.start()