异步,同步,阻塞,非阻塞程序的实现
- 2019 年 11 月 21 日
- 筆記
终于用透支生命的方法把这一课学完了。感动。以后不这样了。 实现异步非阻塞是一个大命题,这里只从原理出发。我会慢慢修改这篇文章。 本文将从异步sleep的实现入手,来讲解异步非阻塞程序的原理。
什么是异步,同步,阻塞,非阻塞
在写这篇文章前,我对这四个概念是非常模糊的。
同步,异步
异步同步的差异,在于当线程调用函数的时候,线程获取消息的方式. 如果是同步,线程会等待接受函数的返回值(或者轮循函数结果,直到查出它的返回状态和返回值)。如果是异步,线程不需要做任何处理,在函数执行完毕后会推送通知或者调用回调函数。
同步: 线程 —-我主动来拿结果—-> 函数 异步: 线程 <—你把结果拿给我—- 函数
阻塞,非阻塞
阻塞非阻塞的差异,在于线程调用函数的时候,线程的状态。 当线程调用函数,线程就被挂起,在函数结束前什么都干不了。这就是阻塞。 反之,当线程调用函数,线程还能干其它事。这就是非阻塞。此时,函数一般会立即返回状态,而不是等待求值。以免阻塞住线程。
他们没有关系
异步同步和阻塞非阻塞没有什么本质关联。一个讲的是消息方式,一个讲的是线程状态。 线程在同步调用下,也能非阻塞(同步轮循非阻塞函数的状态),在异步下,也能阻塞(调用一个阻塞函数,然后在函数中调用回调,虽然没有什么意义)。
下面,我会慢慢实现一个异步非阻塞的sleep。最后利用Python的特性,将callback调用方式改为yield的伪同步调用。
场景一:同步阻塞
import time def wait(name): print(name, " start") time.sleep(1) print(name," is over") wait("yzh") wait("zhh")
上面的程序执行完毕后,想都不用想,输出如下:
打印 yzh start # 等待1s 打印 yzh is over 打印 zhh start # 等待1s 打印 zhh is over
阻塞的后果
上面的代码,如果调用次数很多,则最后一个人要等待之前所有的人阻塞结束,才能被响应。在web项目中,这是很可怕的。所以我们需要引入非阻塞。非阻塞就是为了让一个响应的操作,不影响另一个响应。否则,当A用户在访问某个耗时巨大的网页时,B用户只能对着白板发呆。
在tornado中,有一个gen.sleep函数。它能让响应神奇的变成:
打印 yzh start 打印 zhh start # 等待1s左右 打印 yzh is over 打印 zhh is over
这个异步sleep函数,似乎在单进程下,让每个函数互相不影响,而又在内部停留了1S。 那么,我们该如何实现自己的非阻塞sleep呢。 (tornado的sleep,原理十分复杂。以后再细说。)
场景二:轮循非阻塞
实现非阻塞场景,关键在于函数不能阻塞住当前线程。也就是说,要启用新的线程让系统帮忙调度,或者以自己的方式确保所有任务都能被调度(比如yield切换来切换去)。
使用线程
import time from multiprocessing.dummy import Pool as ThreadPool class Status(object): pass p = ThreadPool(4) def my_sleep(): status = Status() status.status = 0 def _inner(): time.sleep(2) status.status = 1 p.apply_async(_inner) return status def wait(name): print(name, " start") yield my_sleep() print(name, "over") gen1 = wait("yzh") # wait是一个生成器,保存为gen1 gen2 = wait("zhh") timer1 = next(gen1) timer2 = next(gen2) tasks = [] tasks.append([gen1,timer1]) tasks.append([gen2,timer2]) while tasks: for task in tasks: if task[1].status == 1: try: next(task[0]) # 状态正确则继续执行父生成器 except StopIteration: tasks.remove(task)
使用线程没什么好说的,线程会更新状态,当状态更新后,在下次轮循会触发生成器继续执行后面的动作。
不使用线程
import time def my_sleep(now): """ 这个函数本来就是一个生成器。所以可以在单线程下切换运行状态。 """ while time.time() < now + 2: yield def wait(name): print(name, " start") now = time.time() yield my_sleep(now) print(name, " is over") gen1 = wait("yzh") # wait是一个生成器,保存为gen1 gen2 = wait("zhh") timer1 = next(gen1) # 当执行gen的时候,它会Yield一个timer生成器。 # timer是生成器,这是我们可以在单线程下切换timer上下文的关键。 timer2 = next(gen2) tasks = [] tasks.append([gen1,timer1]) tasks.append([gen2,timer2]) while tasks: for task in tasks: try: next(task[1]) # 不断的轮循每个生成器关连的timer。直到timer执行完毕,引发异常。 except StopIteration: try: next(task[0]) # 当timer异常,我们可以知道它的父生成器要继续执行了。 # 对应的yield my_sleep(now) 执行完毕。可以继续下一步,所以我们对父生成器发送继续执行指令 except StopIteration: tasks.remove(task) # 当父生成器也执行完毕,整个任务终止。把当前任务移除任务队列。
上面的代码中,在一个while循环中轮循timer的状态。由于timer存在于wait中。所以需要把timer“提取”出来。 又因为,没有使用多线程,所以必须自己实现一些简单的调度处理,也就是说,要能自由的切换各个timer的上下文。在单线程下可以使用yield。 1. 把timer 从生存器gen yield返回出来 2. 轮循timer的状态(实质是切换进出timer,看它有没有引发StopIteration异常) 3. 如果发生了异常说明gen应该执行下一步操作了。next(gen) 4. 如果gen也发生了StopIteration异常,说明这个任务完毕。
场景三:异步非阻塞
实现异步的经典方式是使用回调,实现非阻塞的经典方式是使用线程。 所以,代码就呼之欲出了。
import time from multiprocessing.dummy import Pool as ThreadPool def my_sleep(callback, *callback_args): def _inner(): time.sleep(2) callback(*callback_args) p.apply_async(_inner) def wait(name): print(name, " start") my_sleep(wait_callback,name) def wait_callback(name): print(name, " is over") p = ThreadPool(4) wait("yzh") wait("zhh") p.close() p.join()
在wait中,唤起my_sleep函数。由于my_sleep在新线程中执行,所以它不会阻塞住主线程。 在my_sleep结束时,调用回调函数。使得任务继续进行。 也就是说,在每个要处理阻塞的地方,都人为的把函数切成三个部分: 1. 执行函数前半部 2. 执行新线程,把后半部作为回调函数传入。函数退出。 3. 等待后半部在线程完毕后被执行。
场景四:终极,伪同步实现异步非阻塞
这个以后再写。先吃饭。