定时器常见实现方式(时间堆、时间轮)
需求
接口
首先需求场景主要有这几种(简化):
- 在 n 秒以后执行一个任务 X
- 每隔 n 秒执行一次任务 X
- 取消一个已经添加的定时器
根据上面的简化需求,得到需要的主要接口:
- 添加一个定时器
- 定时器过期执行(可能需要重复执行)
- 取消一个定时器
数据结构
最后,就是考虑用来存放定时器的数据结构(也是定时器设计的核心)
上面的接口可以简单的看成这几个操作:
- 添加
- 删除
- 查询(获取最近需要执行的一个)
对于这几个操作,常用的复杂度比较均衡的数据结构:
- 红黑树
- 优先队列(最小堆)
另外跳表(本质类似红黑树)也是可用的;还有一种比较巧妙的hash结构时间轮
,这是一个类似钟表指针的结构,将需要管理的定时器根据时间hash到数组中,以提高查询和添加的效率
这几种实现,现在流行的开源框架都各有采用,比如 Linux 内核用的就是时间轮的实现
实现
下面简单实现一下采用最小堆和时间轮作为数据结构的定时器,方便起见,编码演示使用python
时间堆
所谓的“时间堆”并不是什么稀奇东西,只是用小堆来管理以时间作为关键字的定时器集合
实现思路:
- 使用一个结构来保存定时器对象的数据,包括过期时间、回调函数等
- 所有的定时器对象保存在一个优先队列中(也就是最小堆),键值为时间
- 添加定时器即为向队列中插入新项
- 删除定时器可以使用惰性删除,首先给对应的定时器对象置位表示其已经取消,当达到一定条件时(比如取消的定时器数量达到总数量的1/2),进行一个清理操作
- 按照固定的时间间隔来tick,每次tick都需要看看有没有需要执行的定时器对象
- 因为对象由优先队列管理,因此,当队首元素不需要执行时,后面的元素则都不需要执行
- 一个对象需要执行时,可能需要将其删除;也可能是需要重复执行的,则需要设定好时间后再次加入
实现代码如下(仅为演示):
class Timer():
def __init__(self, owner, timestamp, callback, callargs, loop):
self.owner: Timers = owner
self.timestamp = timestamp
self.callback = callback
self.callargs = callargs
self.cancelled = False
self.loop = loop
def __lt__(self, other):
return self.timestamp < other.timestamp
@property
def Cancelled(self): return self.cancelled
@property
def Time(self): return self.timestamp
def cancel(self):
self.cancelled = True
self.owner.onCancel()
def trigger(self, now):
if not self.cancelled:
self.callback(now, *self.callargs)
if self.loop > 0:
self.timestamp += self.loop
else:
self.cancel()
class Timers():
def __init__(self):
self.timerQue = []
self.cancelCount = 0
def tick(self):
now = int(time.time())
self.process(now)
def process(self, now):
while len(self.timerQue) > 0:
timer: Timer = self.timerQue[0]
# 到期或取消的定时器需要处理
if timer.Time <= now or timer.Cancelled:
if not timer.Cancelled:
timer.trigger(now)
if not timer.Cancelled:
heapq.heapreplace(self.timerQue, timer)
else:
heapq.heappop(self.timerQue)
self.cancelCount -= 1
else:
break
def onCancel(self):
self.cancelCount += 1
if self.cancelCount > len(self.timerQue)/2:
self.purge()
# 这里处理的时候参考c++ std partition 操作,先将没用的都放到数组最后
# 然后,将前面有用的重新进行一次建队操作
def purge(self):
# patition
l = 0
r = len(self.timerQue) - 1
while l < r:
while l < r and not self.timerQue[l].Cancelled: l += 1
while l < r and self.timerQue[r].Cancelled: r -= 1
self.timerQue[l], self.timerQue[r] = self.timerQue[r], self.timerQue[l]
l += 1
r -= 1
# remove
mid = math.ceil((l + r) / 2)
if not self.timerQue[mid].Cancelled:
mid += 1
del self.timerQue[mid:]
# heapmake
heapq.heapify(self.timerQue)
def add(self, duration, callback, callargs, looptime):
now = int(time.time())
timer: Timer = Timer(self, now + duration, callback, callargs, looptime)
heapq.heappush(self.timerQue, timer)
return timer
# test
def func(callback):
waitEvent = threading.Event()
while not waitEvent.wait(1):
callback()
def func_a(now):
print('in func a', now)
def func_b(now):
print('in func b', now)
def func_c(a, b):
print('in func c: before sleep')
time.sleep(30)
print('in func c: after sleep')
a.cancel()
b.cancel()
timers = Timers()
threading.Thread(target=func, args=(lambda:timers.tick(), )).start()
timer_a = timers.add(0, func_a, (), 10)
timer_b = timers.add(5, func_b, (), 10)
threading.Thread(target=func_c, args=(timer_a, timer_b)).start()
时间轮
时间轮是一个比较巧妙地hash结构,性质有点类似钟表指针
首先考虑一个简单的大小为60数组t
,每一位表示对应时间的集合:第0秒需要处理的事务都在t[0]
,第5秒需要处理的事务都在t[5]
,依次类推
这样一来,当处在 x 秒时只需要去对应的数组元素即可;但是同样也带来了一个问题:最多只能处理一分钟内的事务(数组大小只有60)
为了解决这个问题,有一个简单的方案:在事务对象加一个字段 nRound 用来表示循环使用这个数组,当循环次数为 nRound 时才执行本事务
这样一来就解决了数组容量问题,但是随之而来的就是效率变得低下了;本来使用hash的目的就是为了避免不必要的遍历,可以直截了当地获取当前需要处理地任务,而现在又要遍历 t[n]
来判断 nRound 是否为当前轮次了
基于此,就有了这种更优美地解决方案时间轮
:再加一个大小为60数组d
,每一位还是表示对应时间地集合:第0分钟要处理地事务都在d[0]
中,第5分钟需要处理地事务都在d[5]
中,依次类推
当然,因为0-59秒的事务都放在了数组t
里了,所以d[0]
为空即可;当时间来到第1分钟时,再将d[1]
中的事务放置到t
中对应位置即可
这样一来,就已经可以处理1个小时内的任务了,可想而知,再加上更多的数组就可以处理更长的时间跨度了
(很明显,这里的数组大小和数组数量只是一个进制关系而已)
工作原理如下:
- 一级轮(就是一个数组)的每个格对应一个时间间隔
- 一级轮指针每次tick加1;当指针指向一级轮的某一格时,即表示这一格里的定时器都到期了
- 二级轮(包括更多级同理)的每个格对应一级轮的一圈
- 二级轮指针每当一级轮指针转一圈加1;当指针指向某一个时,即表示接下来需要处理这一格的定时器了(分散到一级轮里)
后面给出了一个简单的实现,简单起见就没有再处理重复执行的情况
这里再谈谈一些可以优化的地方:
- 指针和数组大小:在Linux内核定时器的实现里,采用了一个很巧妙的设计,一共5个数组,大小分别为 64(a) 64(b) 64(c) 64(d) 255(e) (64是2的6次方,255是2的8次方,也就总共占了32位);这样一来,只需利用整数的进位就可以自然的处理数组之间的进制关系了,这体现在,只需要一个32bit的指针和对应的位操作即可表示5个数组中的情况了,不再需要每个数组分配一个指针(例如末8位表示数组e的指针,之前的6位表示数组d的指针)
- 数组元素:更高级的数组对应的时间刻度就越长,就看可能有更多甚至非常大量的事务挤在一个格里,这时候特殊需求的操作(比如查询),可能就不能很好地支持,因此可以采用合适的数据结构来管理每个格里的事务;当然通常情况是不需要的,因为一般操作只是要把当前格里的事务hash到下一级的数组里
- 取消定时器:还是跟上面时间轮的实现一样,可以考虑采用惰性删除的策略
class Timer():
def __init__(self, delay, callback, callargs):
self.delay = delay
self.callback = callback
self.callargs = callargs
self.cancelled = False
@property
def Cancelled(self): return self.cancelled
def cancel(self):
self.cancelled = True
def trigger(self):
if not self.cancelled:
self.callback(*self.callargs)
self.cancelled = True
class Wheels():
WHEEL_NUM = 3
SOLT_NUM = 8
MAX_NUM = int(math.pow(SOLT_NUM, WHEEL_NUM))
def __init__(self):
self.pointer = []
self.wheels = []
for i in range(self.WHEEL_NUM):
self.pointer.append(0)
self.wheels.append([])
for j in range(self.SOLT_NUM):
self.wheels[i].append([])
def add(self, delay, func, args):
if delay >= self.MAX_NUM: return
past = delay
wheel = self.WHEEL_NUM - 1
while past >= self.SOLT_NUM:
past = past // self.SOLT_NUM
wheel -= 1
solt = (self.pointer[wheel] + past) % self.SOLT_NUM
delay = delay % int(math.pow(self.SOLT_NUM, self.WHEEL_NUM - wheel - 1))
self.wheels[wheel][solt].append(Timer(delay, func, args))
def tick(self):
print('tick')
for i in range(self.WHEEL_NUM - 1):
while self.wheels[i][self.pointer[i]]:
timer = self.wheels[i][self.pointer[i]].pop()
self.add(timer.delay, timer.callback, timer.callargs)
idx = self.WHEEL_NUM - 1
while self.wheels[idx][self.pointer[idx]]:
timer = self.wheels[idx][self.pointer[idx]].pop()
timer.trigger()
for i in range(self.WHEEL_NUM - 1, -1, -1):
self.pointer[i] = (self.pointer[i] + 1) % self.SOLT_NUM
if self.pointer[i] > 0:
break
def func(callback):
waitEvent = threading.Event()
while not waitEvent.wait(1):
callback()
wheels = Wheels()
threading.Thread(target=func, args=(lambda:wheels.tick(), )).start()
# test
def func_a():
print('in func a')
def func_b():
print('in func b')
timer_a = wheels.add(5, func_a, ())
timer_b = wheels.add(10, func_b, ())
总结
到这里,关于定时器的实现就讲完了;实现本身可能并不精彩,秒的是一些细节处的设计思想,最后再来回顾一下:
- 惰性删除和清理操作:参考了开源引擎kbengine中定时器的实现;惰性删除可以节省高频操作的开销,还可以减少频繁的内存操作;在清理操作的细节中,先将有用和无用的定时器分离(分别放置到数组前后),可以有效提高内存操作的效率
- 时间轮结构的设计:时间轮结构的设计非常巧妙,源于hash也兼顾了空间,并且将高频操作(最近时间段)和低频操作(较远时间段)分离了开来
- 时间轮指针设计:利用整数进位,简化时间轮指针