python高阶教程-并行编程

  • 2019 年 11 月 20 日
  • 筆記

本篇内容来自原创小册子《python高阶教程》,点击查看目录

为什么要写并行代码

python的优势在于可以快速构建算法原型,但是执行效率不高。比如说实现一个图像的分类识别算法,我们需要对图像进行预处理。在海量数据面前,单线程明显会成为性能的瓶颈。

用函数实现多线程

如果只是简单的多线程任务,可以写成函数的形式。这主要用到了threading模块中的Thread类。

import threading  import time  import random  def function(i):        count = 0        while count < 3:              time.sleep(random.randint(0,3))              count += 1              print("thread %d, time %s" %(i, time.ctime(time.time())))        return  for i in range(2):        t = threading.Thread(target=function, args=(i,))        t.start()

这段代码主要由一个任务函数functionfor循环体构成。

在循环体中,我们以threading模块中的Thread类为模板,以function和循环体变量i为参数初始化一个实例,然后调用这个实例的start()方法。在function函数中,打印三次当前的时间,但是休眠的时间间隔是随机的。这主要是为了模拟不同的计算量,表明不同线程是并行执行的。

这段代码的执行结果如下:

thread 0, time Mon Jun 18 18:37:13 2018  thread 1, time Mon Jun 18 18:37:14 2018  thread 0, time Mon Jun 18 18:37:14 2018  thread 0, time Mon Jun 18 18:37:16 2018  thread 1, time Mon Jun 18 18:37:17 2018  thread 1, time Mon Jun 18 18:37:19 2018

观察执行结果,可以发现thread 0 和thread 1被随机调度。当然,我们这里两个线程属于同一个进程,微观上,在同一时刻还是只有一个线程被处于运行状态;宏观上,两个线程同时执行。

用类实现多线程

使用函数实现多线程,本质上是产生了多个实例。我们也可以定义一个类,用来继承threading.Thread. 代码如下:

import threading  import time  import random    class myThread (threading.Thread):   #继承父类threading.Thread      def __init__(self, threadID):          threading.Thread.__init__(self)          self.threadID = threadID      def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数          count = 0          while count < 3:                  time.sleep(random.randint(0,3))                  count += 1                  print("thread %d, time %s" %(self.threadID, time.ctime(time.time())))          return    # 创建新线程  thread1 = myThread(0)  thread2 = myThread(1)    # 开启线程  thread1.start()  thread2.start()

这段代码与使用函数实现多线程功能相同,执行结果如下:

thread 0, time Mon Jun 18 19:11:59 2018  thread 1, time Mon Jun 18 19:11:59 2018  thread 0, time Mon Jun 18 19:12:00 2018  thread 1, time Mon Jun 18 19:12:00 2018  thread 1, time Mon Jun 18 19:12:01 2018  thread 0, time Mon Jun 18 19:12:03 2018  

使用lock实现线程互斥

import threading  import time  import random  count = 0  lock = threading.Lock()  def use_lock(func):      def wrapper(*arg, **kw):          global lock          lock.acquire()          res = func(*arg, **kw)          lock.release()          return res      return wrapper  def inc(func):      def wrapper(*arg, **kw):          global count          count += 1          return func(*arg, **kw)      return wrapper  def dec(func):      def wrapper(*arg, **kw):          global count          count -= 1          return func(*arg, **kw)      return wrapper  @use_lock  @inc  def increment_lock():      pass  @use_lock  @dec  def decrement_lock():      pass  @inc  def increment():      pass  @dec  def decrement():      pass  def thread1():      count = 0      while count < 999999:          count += 1          increment_lock()  def thread2():      count = 0      while count < 999999:          count += 1          decrement_lock()  def thread3():      count = 0      while count < 999999:          count += 1          increment()  def thread4():      count = 0      while count < 999999:          count += 1          decrement()  thread_ins_1 = threading.Thread(target=thread1, args=())  thread_ins_2 = threading.Thread(target=thread2, args=())  thread_ins_3 = threading.Thread(target=thread3, args=())  thread_ins_4 = threading.Thread(target=thread4, args=())  count = 0  thread_ins_1.start()  thread_ins_2.start()  thread_ins_1.join()  thread_ins_2.join()  print("wiht lock, count is ", count)  count = 0  thread_ins_3.start()  thread_ins_4.start()  thread_ins_3.join()  thread_ins_4.join()  print("without lock, count is ", count)

这段代码的执行结果如下所示:

wiht lock, count is  0  without lock, count is  32376  

可以看出,没有使用锁的共享资源出现了错乱。

使用信号量实现线程同步

import threading  import time  import random  # initial value is 0  # which means we have to release it before using it  semaphore = threading.Semaphore(0)  item = 0  def consumer():      global item      time.sleep(random.randint(0,3))      # If semaphore is not released, then wait      semaphore.acquire()      print("Consumer notify: consumed item number %s" %item)  def producer():      global item      global semaphore      time.sleep(random.randint(0,3))      item = random.randint(0,1000)      print("Producer notify: produced item number %s" %item)      # release semaphore, which is add it by 1      semaphore.release()  # run for 3 times  for i in range(3):      t1 = threading.Thread(target= producer)      t2 = threading.Thread(target= consumer)      t1.start()      t2.start()      t1.join()      t2.join()

这里我们把信号量的值初始化为0,意味着必须先释放才能获取。那么释放信号量的线程就可以先执行,如此完成两个线程之间的同步。

代码的执行结果如下:

Producer notify: produced item number 977  Consumer notify: consumed item number 977  Producer notify: produced item number 812  Consumer notify: consumed item number 812  Producer notify: produced item number 500  Consumer notify: consumed item number 500  

GIL限制

GIL全称为Global Interpreter Lock,是CPython解释器中用来防止多线程并发执行机器码的一个互斥锁。GIL会造成python的CPU密集型程序的多线程效率低下。

采用多线程来测试GIL的代码如下:

from threading import Thread  class threads_object(Thread):      def run(self):          function_to_run()  class nothreads_object(object):      def run(self):          function_to_run()  def non_threaded(num_iter):      funcs = []      for i in range(int(num_iter)):          funcs.append(nothreads_object())      for i in funcs:          i.run()  def threaded(num_threads):      funcs = []      for i in range(int(num_threads)):          funcs.append(threads_object())      for i in funcs:          i.start()      for i in funcs:          i.join()  def function_to_run():      count = 0      while count < 1e4:          count += 1  def show_results(func_name, results):      print("%-23s %4.6f seconds" %(func_name, results))  if __name__ == '__main__':      import sys      from timeit import Timer      repeat = 1000      number = 1      num_threads = [1,2,4,8]      print("Starting tests")      for i in num_threads:          t = Timer("non_threaded(%s)"                      %i, "from __main__ import non_threaded")          best_result = min(t.repeat(repeat= repeat, number= number))          show_results("non_threaded (%s iters)"                      % i, best_result)          t = Timer("threaded(%s)"                       %i, "from __main__ import threaded")          best_result = min(t.repeat(repeat= repeat, number= number))          show_results( "threaded (%s threads)"                       % i, best_result)

结果如下:

Starting tests  non_threaded (1 iters)  0.000940 seconds  threaded (1 threads)    0.001085 seconds  non_threaded (2 iters)  0.001863 seconds  threaded (2 threads)    0.002322 seconds  non_threaded (4 iters)  0.003775 seconds  threaded (4 threads)    0.004687 seconds  non_threaded (8 iters)  0.007560 seconds  threaded (8 threads)    0.009515 seconds  从执行结果上看,采用多线程方案会比顺序执行慢一些。

多进程

多进程的编程模式与多线程颇为相似。

import multiprocessing  import time  import random  def function(i):        count = 0        while count < 3:              time.sleep(random.randint(0,3))              count += 1              print("process %d, time %s" %(i, time.ctime(time.time())))        return  if __name__ == '__main__':      for i in range(2):          t = multiprocessing.Process(target=function, args=(i,))          t.start()

执行结果如下:

process 1, time Tue Jun 19 14:37:49 2018  process 1, time Tue Jun 19 14:37:49 2018  process 0, time Tue Jun 19 14:37:51 2018  process 1, time Tue Jun 19 14:37:52 2018  process 0, time Tue Jun 19 14:37:53 2018  process 0, time Tue Jun 19 14:37:53 2018