python的多线程和多进程(一)
- 2019 年 11 月 3 日
- 筆記
在进入主题之前,我们先学习一下并发和并行的概念:
–并发:在操作系统中,并发是指一个时间段中有几个程序都处于启动到运行完毕之间,且这几个程序都是在同一个处理机上运行。但任一时刻点上只有一个程序在处理机上运行。形象的点描述:一个人做很多事情,但同一时刻只能做一件事情。
–并行:当系统有一个CPU时,则程序的操作有可能非并发。当一个CPU执行一个程序时,另一个CPU可以执行另一个程序,两个程序互不抢占CPU资源,可以同时进行。形象的描述:多人同时做多件事情
可能有小伙伴又会问串行又是什么鬼呢?再来唠叨一下:
串行和并行是数据传输方式的区别。其实和并发和并行的概念是两个领域:
1、数据传输方式不同,串行口传输方式为数据排成一行,一位一位送出,接受也是一样。并行口传输8位数据一次送出。
2、针脚不同,串行口针脚少,并行口针脚多
3、用途不同,串行口现在只用作控制接口,并行口多用作打印机、扫描仪等接口。
来吧,进入正题:
进程的概念:计算机程序其实只是存储在磁盘上可执行的二进制(或其他可执行的类型)文件。只有把它们加载到内存中并被操作系统调用,才算是被执行,并用拥有生命周期。所以说进程是一个执行中的程序。
每个进程都拥有自己的地址空间,内存,数据栈以及其他用于跟踪执行的辅助数据。
此间,操作系统管理其上的所有进程的执行,并为这些进程合理分配时间。
多进程:那就是在一个多核的电脑上同时运行多个程呗,其实是实现并行。
但是呢,我们先从多线程学习起。多线程搞定了,多进程也就是小意思了。
在此之前先介绍一个概念,之后会用到。GIL锁,有可能大家会听到别人说python运行很慢啥的,其实就是这个GIL锁的原因,那它到底是个啥呢:
GIL全局解释性锁,python在设计的时候,还没有多核处理器的概念。因此,为了设计的方便和线程的安全,python之父就设计了一个锁。这个锁要求,任何一个进程中,同时只能有一个线程在执行。因此并不能多个线程分配多个CPU资源。所以python中的线程只能实现并发,而不能实现真正的并行,这就是python所线程的局限性。在python3中GIL锁进行了该井,在遇到任何IO阻塞(不是耗时)的时候,会自动切换线程,这就使得在进行多IO操作的时候python的锁多线程有很大的优势。同时一个线程在运行时间或者运行步骤达到一定的阈值时,也会自动的切换线程。
对比于多进程,多线程其实是实现并发的操作,在在python中使用multiprocessing包实现多进程:
首先来看下多线程的实现,这里使用的是threading包。用sleep模拟耗时操作:
1 import time 2 import threading 3 4 def get_detail_html(url): 5 print("starting get detail html") 6 time.sleep(4) 7 print("ending get detail html") 8 9 def get_detail_url(url): 10 print("starting get detail url") 11 time.sleep(2) 12 print("ending get detail url") 13 14 if __name__ == '__main__': 15 thread1 = threading.Thread(target=get_detail_html, args=("",)) 16 thread2 = threading.Thread(target=get_detail_url, args=("",)) 17 start_time = time.time() 18 thread1.setDaemon(True) 19 thread2.setDaemon(True) 20 thread1.start() 21 thread2.start() 22 print(123456) 23 thread1.join() 24 thread2.join()
25 print("using time: {}".format(time.time()-start_time)
做个注释吧,可能有点乱:
1、在子线程sleep时候,主线程继续往下执行,主线程执行完,程序此刻并没有退出。子线程的程序还是会执行完。
2、为了在主线程执行完之后kill掉子线程。需要给子线程.setdeDaemon(True)。将子线程设置为守护线程。意思为:主线程结束后设置为守护线程的子线程也会义无反顾的殉情而亡当有多个子线程,其中部分设置守护线程。主线程会等到没有设置为守护线程的子线程运行完之后推出而设置为守护线程的子线程,如果在未设置为守护线程的子线程结束前运行完毕,那万事大吉,如果没有运行完成还是跟着主线程殉情
3、如果想让主线程等待子线程执行完成再往下面执行的话,用thread1.join()。将主线程阻塞住,也就是说.join()之后的主线程代码不会执行,直到子线程执行完毕再执行这个时候的using time 就是4秒钟。如果只有部分子线程阻塞,那么阻塞时间就是设置了.join()的子线程运行的时间。然后主线程结束,但是还是会等到所有子线程运行结束后主线程才会退出
上边是使用函数的方法实现多线程,那用类呢?可定也能实现咯:
import time
import threading
# 用类实现多线程 class GetDetailHtml(threading.Thread): """ 继承threading.Thread类 """ def __init__(self, name): # 给调用父类init方法线程命名 super().__init__(name=name) def run(self): """ 重写run方法 :return: """ print("starting get detail html") time.sleep(4) print("ending get detail html") class GetDetailUrl(threading.Thread): def __init__(self, name): # 给调用父类init方法线程命名 super().__init__(name=name) def run(self): print("starting get detail url") time.sleep(2) print("ending get detail url") if __name__ == '__main__': thread1 = GetDetailHtml('get_detail_html') thread2 = GetDetailUrl('get_detail_url') start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print('used time: {}'.format(time.time() - start_time))
只需要重写类的run方法即可,其他和函数方法使用一样。
如果多个线程需要用到相同的数据咋办呢?这就涉及到线程间的通信问题了,接下来我们来详细的了解一下。
方法一:利用共享变量的方式,嗯,也就是全局变量global。
import time import threading # 1、共享全局变量 使用global。 detail_url_list = [] def get_detail_html(): global detail_url_list # 使用for循环,这样子做得话并发不高 # for url in detail_url_list: # print("starting get detail html") # time.sleep(4) # print("ending get detail html") # 使用pop操作,并开启多个get_detail_html的子线程进行处理。但是线程是不安全的 url = detail_url_list.pop() print("starting get detail html") time.sleep(4) print("ending get detail html") def get_detail_url(): global detail_url_list print("starting get detail url") time.sleep(2) for i in range(20): detail_url_list.append(i) print("ending get detail url") if __name__ == '__main__': thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start()thread1.join() thread2.join() print("using time: {}".format(time.time() - start_time))
这个很容易理解,用起来也很方便。可以帮我们快速的解决比较简单的问题。
方法二:利用队列queue。。比价高级点的用法。先看实现方式。
import time import threading from queue import Queue, PriorityQueue #可以设置优先级的queue,调整执行顺序 def get_detail_html(queue): while True: url = queue.get() print("starting get detail html") time.sleep(4) print("ending get detail html") def get_detail_url(queue): while True: print("starting get detail url") time.sleep(2) for i in range(20): queue.put(i) print("ending get detail url") if __name__ == '__main__': detail_url_queue = Queue(maxsize=10) thread1 = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) thread2 = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) start_time = time.time() thread1.start() thread2.start()thread1.join() thread2.join() print("using time: {}".format(time.time() - start_time))
简单的说就是:put往queue里放东西,get从queue里拿东西。这样就能达到线程间的通信以及线程安全。
既然线程中的数据大家都能用,会不会出现一个线程在修改数据,结果在还没有修改完之后,GIL锁已经被切换。切换后的线程也修改这个数据,那就会出现数据的错乱,脏数据。针对这种情况,python中的lock就完美的解决了这个情况。
from threading import Lock, RLock #可重入锁 #RLock使得一个线程中,可以连续调用多次acquire,但是的有相对应数量的release import threading total = 0 lock = Lock() """ 1、用锁会影响性能 2、锁会引起死锁 1、acquire之后没有release 2、线程相互等待 两个数据被两个线程分别获取,相互等待对方的资源释放 """ def add(lock): global total for i in range(1000000):
# 上锁 lock.acquire() total += 1
# 释放锁 lock.release() def desc(lock): global total for i in range(1000000):
# 上锁 lock.acquire() total -= 1
# 释放锁 lock.release()
thread1 = threading.Thread(target=add, args=(lock,)) thread2 = threading.Thread(target=desc, args=(lock,)) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
针对一般情况下我们lock就可以搞定问题,那有没有更牛逼的工具让我们应对更加复杂的情况呢?答案当然是肯定的:
conditon:条件变量,用于复杂的线程间同步的锁
import threading # condition(条件变量)是用于复杂的线程间同步的锁 from threading import Condition cond = Condition() class XiaoAi(threading.Thread): def __init__(self, cond): super(XiaoAi, self).__init__(name="小爱") self.cond = cond def run(self): # with是一个魔法方法,相当于先上锁操作完之后再解锁,和with open相似 with self.cond: # 等待状态,等着接受通知,接到通知后往下放执行 self.cond.wait() print("{} : 在".format(self.name)) # 发送通知 self.cond.notify() # 等待状态,等着接受通知,接到通知后往下放执行 self.cond.wait() print("{} : 好啊".format(self.name)) class TianMao(threading.Thread): def __init__(self, cond): super(TianMao, self).__init__(name="天猫") self.cond = cond def run(self): with self.cond: print("{} : 小爱在么?".format(self.name)) # 发送通知 self.cond.notify() # 等待状态,等着接受通知,接到通知后往下放执行 self.cond.wait() print("{} : 我们来对古诗吧!".format(self.name)) # 发送通知 self.cond.notify() if __name__ == '__main__': xiaoai = XiaoAi(cond) tianmao = TianMao(cond) xiaoai.start() tianmao.start() xiaoai.join() tianmao.join()
在condition中我们可以运行任意多的线程。那么如果我们想控制线程的数量该怎么办呢?针对这个场景python也是给我们一个工具的:
semaphore 是控制进入数量的锁, 基于condition实现
# semaphore 是控制进入数量的锁, 基于condition实现 # 文件的读写,写一般只有一个线程写,读可以允许多个 # 爬虫,控制爬虫的数量 import threading import time class HtmlSpider(threading.Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print('get html text success') # 释放锁, 一共释放3次 self.sem.release() class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): # 调用一次,次数减一,为0时,线程锁住 self.sem.acquire() html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) html_thread.start() if __name__ == '__main__': sem = threading.Semaphore(3) url_producer = UrlProducer(sem) url_producer.start()
是不是常听说线程池?那这个池子到底是个啥呢?当然是放线程的池子呗!!在python中是用
ThreadPoolExecutor来实现线程池的!!
第一种用法:
from concurrent.futures import ( wait, # 使主线程阻塞 ThreadPoolExecutor, # 线程池 as_completed, #是一个生成器 Future # 未来对象, task的返回容器 ) """ 线程池 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值 当一个线程完成的时候主线程能够立刻知道 futures可以让多线程和多进程接口一致 """ import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交的函数到线程池中,submit是立刻返回的 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done 用于判定某个任务是否完成 print(task1.done()) # cancle取消该线程,取消成功返回True,失败返回False 注意:线程在执行过程中不能被取消 # 此时开启两个线程,所以task1和task2添加之后立刻运行,因此取消不成功 print(task2.cancel()) time.sleep(3) print(task1.done()) # 可以获取task1的执行结果 print(task1.result()) """ # 通过executor 的map(yield task的返回值)获取已经完成的task值 for data in executor.map(get_html, urls): print('future get {} page'.format(data)) """
第二种用法:
from concurrent.futures import ( wait, # 使主线程阻塞 ThreadPoolExecutor, # 线程池 as_completed, #是一个生成器 Future # 未来对象, task的返回容器 ) import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 获取已经成功的task urls = [3, 3, 2, 4, 8] # 添加到excutor之后子线程立刻开始执行 all_tasks = [executor.submit(get_html, (url)) for url in urls] # 使主线程阻塞,所有子线程执行完成后主线程再往下边执行 wait(all_tasks) print(12314123) # as_completed 的代码块和子线程的执行并不是同步的 for future in as_completed(all_tasks): data = future.result() print('get {} page 111111'.format(data))
第三种用法:
from concurrent.futures import ( wait, # 使主线程阻塞 ThreadPoolExecutor, # 线程池 as_completed, #是一个生成器 Future # 未来对象, task的返回容器 ) import time def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过executor 的map(yield task的返回值)获取已经完成的task值 for data in executor.map(get_html, urls): print('future get {} page'.format(data))
好了多线程就讲这么多吧,有点粗糙,更多的细节以后再补充吧! 下一篇记录一下多进程!!