定時器常見實現方式(時間堆、時間輪)
需求
介面
首先需求場景主要有這幾種(簡化):
- 在 n 秒以後執行一個任務 X
- 每隔 n 秒執行一次任務 X
- 取消一個已經添加的定時器
根據上面的簡化需求,得到需要的主要介面:
- 添加一個定時器
- 定時器過期執行(可能需要重複執行)
- 取消一個定時器
數據結構
最後,就是考慮用來存放定時器的數據結構(也是定時器設計的核心)
上面的介面可以簡單的看成這幾個操作:
- 添加
- 刪除
- 查詢(獲取最近需要執行的一個)
對於這幾個操作,常用的複雜度比較均衡的數據結構:
- 紅黑樹
- 優先隊列(最小堆)
另外跳錶(本質類似紅黑樹)也是可用的;還有一種比較巧妙的hash結構時間輪
,這是一個類似鐘錶指針的結構,將需要管理的定時器根據時間hash到數組中,以提高查詢和添加的效率
這幾種實現,現在流行的開源框架都各有採用,比如 Linux 內核用的就是時間輪的實現
實現
下面簡單實現一下採用最小堆和時間輪作為數據結構的定時器,方便起見,編碼演示使用python
時間堆
所謂的「時間堆」並不是什麼稀奇東西,只是用小堆來管理以時間作為關鍵字的定時器集合
實現思路:
- 使用一個結構來保存定時器對象的數據,包括過期時間、回調函數等
- 所有的定時器對象保存在一個優先隊列中(也就是最小堆),鍵值為時間
- 添加定時器即為向隊列中插入新項
- 刪除定時器可以使用惰性刪除,首先給對應的定時器對象置位表示其已經取消,當達到一定條件時(比如取消的定時器數量達到總數量的1/2),進行一個清理操作
- 按照固定的時間間隔來tick,每次tick都需要看看有沒有需要執行的定時器對象
- 因為對象由優先隊列管理,因此,當隊首元素不需要執行時,後面的元素則都不需要執行
- 一個對象需要執行時,可能需要將其刪除;也可能是需要重複執行的,則需要設定好時間後再次加入
實現程式碼如下(僅為演示):
class Timer():
def __init__(self, owner, timestamp, callback, callargs, loop):
self.owner: Timers = owner
self.timestamp = timestamp
self.callback = callback
self.callargs = callargs
self.cancelled = False
self.loop = loop
def __lt__(self, other):
return self.timestamp < other.timestamp
@property
def Cancelled(self): return self.cancelled
@property
def Time(self): return self.timestamp
def cancel(self):
self.cancelled = True
self.owner.onCancel()
def trigger(self, now):
if not self.cancelled:
self.callback(now, *self.callargs)
if self.loop > 0:
self.timestamp += self.loop
else:
self.cancel()
class Timers():
def __init__(self):
self.timerQue = []
self.cancelCount = 0
def tick(self):
now = int(time.time())
self.process(now)
def process(self, now):
while len(self.timerQue) > 0:
timer: Timer = self.timerQue[0]
# 到期或取消的定時器需要處理
if timer.Time <= now or timer.Cancelled:
if not timer.Cancelled:
timer.trigger(now)
if not timer.Cancelled:
heapq.heapreplace(self.timerQue, timer)
else:
heapq.heappop(self.timerQue)
self.cancelCount -= 1
else:
break
def onCancel(self):
self.cancelCount += 1
if self.cancelCount > len(self.timerQue)/2:
self.purge()
# 這裡處理的時候參考c++ std partition 操作,先將沒用的都放到數組最後
# 然後,將前面有用的重新進行一次建隊操作
def purge(self):
# patition
l = 0
r = len(self.timerQue) - 1
while l < r:
while l < r and not self.timerQue[l].Cancelled: l += 1
while l < r and self.timerQue[r].Cancelled: r -= 1
self.timerQue[l], self.timerQue[r] = self.timerQue[r], self.timerQue[l]
l += 1
r -= 1
# remove
mid = math.ceil((l + r) / 2)
if not self.timerQue[mid].Cancelled:
mid += 1
del self.timerQue[mid:]
# heapmake
heapq.heapify(self.timerQue)
def add(self, duration, callback, callargs, looptime):
now = int(time.time())
timer: Timer = Timer(self, now + duration, callback, callargs, looptime)
heapq.heappush(self.timerQue, timer)
return timer
# test
def func(callback):
waitEvent = threading.Event()
while not waitEvent.wait(1):
callback()
def func_a(now):
print('in func a', now)
def func_b(now):
print('in func b', now)
def func_c(a, b):
print('in func c: before sleep')
time.sleep(30)
print('in func c: after sleep')
a.cancel()
b.cancel()
timers = Timers()
threading.Thread(target=func, args=(lambda:timers.tick(), )).start()
timer_a = timers.add(0, func_a, (), 10)
timer_b = timers.add(5, func_b, (), 10)
threading.Thread(target=func_c, args=(timer_a, timer_b)).start()
時間輪
時間輪是一個比較巧妙地hash結構,性質有點類似鐘錶指針
首先考慮一個簡單的大小為60數組t
,每一位表示對應時間的集合:第0秒需要處理的事務都在t[0]
,第5秒需要處理的事務都在t[5]
,依次類推
這樣一來,當處在 x 秒時只需要去對應的數組元素即可;但是同樣也帶來了一個問題:最多只能處理一分鐘內的事務(數組大小只有60)
為了解決這個問題,有一個簡單的方案:在事務對象加一個欄位 nRound 用來表示循環使用這個數組,當循環次數為 nRound 時才執行本事務
這樣一來就解決了數組容量問題,但是隨之而來的就是效率變得低下了;本來使用hash的目的就是為了避免不必要的遍歷,可以直截了當地獲取當前需要處理地任務,而現在又要遍歷 t[n]
來判斷 nRound 是否為當前輪次了
基於此,就有了這種更優美地解決方案時間輪
:再加一個大小為60數組d
,每一位還是表示對應時間地集合:第0分鐘要處理地事務都在d[0]
中,第5分鐘需要處理地事務都在d[5]
中,依次類推
當然,因為0-59秒的事務都放在了數組t
里了,所以d[0]
為空即可;當時間來到第1分鐘時,再將d[1]
中的事務放置到t
中對應位置即可
這樣一來,就已經可以處理1個小時內的任務了,可想而知,再加上更多的數組就可以處理更長的時間跨度了
(很明顯,這裡的數組大小和數組數量只是一個進位關係而已)
工作原理如下:
- 一級輪(就是一個數組)的每個格對應一個時間間隔
- 一級輪指針每次tick加1;當指針指向一級輪的某一格時,即表示這一格里的定時器都到期了
- 二級輪(包括更多級同理)的每個格對應一級輪的一圈
- 二級輪指針每當一級輪指針轉一圈加1;當指針指向某一個時,即表示接下來需要處理這一格的定時器了(分散到一級輪里)
後面給出了一個簡單的實現,簡單起見就沒有再處理重複執行的情況
這裡再談談一些可以優化的地方:
- 指針和數組大小:在Linux內核定時器的實現里,採用了一個很巧妙的設計,一共5個數組,大小分別為 64(a) 64(b) 64(c) 64(d) 255(e) (64是2的6次方,255是2的8次方,也就總共佔了32位);這樣一來,只需利用整數的進位就可以自然的處理數組之間的進位關係了,這體現在,只需要一個32bit的指針和對應的位操作即可表示5個數組中的情況了,不再需要每個數組分配一個指針(例如末8位表示數組e的指針,之前的6位表示數組d的指針)
- 數組元素:更高級的數組對應的時間刻度就越長,就看可能有更多甚至非常大量的事務擠在一個格里,這時候特殊需求的操作(比如查詢),可能就不能很好地支援,因此可以採用合適的數據結構來管理每個格里的事務;當然通常情況是不需要的,因為一般操作只是要把當前格里的事務hash到下一級的數組裡
- 取消定時器:還是跟上面時間輪的實現一樣,可以考慮採用惰性刪除的策略
class Timer():
def __init__(self, delay, callback, callargs):
self.delay = delay
self.callback = callback
self.callargs = callargs
self.cancelled = False
@property
def Cancelled(self): return self.cancelled
def cancel(self):
self.cancelled = True
def trigger(self):
if not self.cancelled:
self.callback(*self.callargs)
self.cancelled = True
class Wheels():
WHEEL_NUM = 3
SOLT_NUM = 8
MAX_NUM = int(math.pow(SOLT_NUM, WHEEL_NUM))
def __init__(self):
self.pointer = []
self.wheels = []
for i in range(self.WHEEL_NUM):
self.pointer.append(0)
self.wheels.append([])
for j in range(self.SOLT_NUM):
self.wheels[i].append([])
def add(self, delay, func, args):
if delay >= self.MAX_NUM: return
past = delay
wheel = self.WHEEL_NUM - 1
while past >= self.SOLT_NUM:
past = past // self.SOLT_NUM
wheel -= 1
solt = (self.pointer[wheel] + past) % self.SOLT_NUM
delay = delay % int(math.pow(self.SOLT_NUM, self.WHEEL_NUM - wheel - 1))
self.wheels[wheel][solt].append(Timer(delay, func, args))
def tick(self):
print('tick')
for i in range(self.WHEEL_NUM - 1):
while self.wheels[i][self.pointer[i]]:
timer = self.wheels[i][self.pointer[i]].pop()
self.add(timer.delay, timer.callback, timer.callargs)
idx = self.WHEEL_NUM - 1
while self.wheels[idx][self.pointer[idx]]:
timer = self.wheels[idx][self.pointer[idx]].pop()
timer.trigger()
for i in range(self.WHEEL_NUM - 1, -1, -1):
self.pointer[i] = (self.pointer[i] + 1) % self.SOLT_NUM
if self.pointer[i] > 0:
break
def func(callback):
waitEvent = threading.Event()
while not waitEvent.wait(1):
callback()
wheels = Wheels()
threading.Thread(target=func, args=(lambda:wheels.tick(), )).start()
# test
def func_a():
print('in func a')
def func_b():
print('in func b')
timer_a = wheels.add(5, func_a, ())
timer_b = wheels.add(10, func_b, ())
總結
到這裡,關於定時器的實現就講完了;實現本身可能並不精彩,秒的是一些細節處的設計思想,最後再來回顧一下:
- 惰性刪除和清理操作:參考了開源引擎kbengine中定時器的實現;惰性刪除可以節省高頻操作的開銷,還可以減少頻繁的記憶體操作;在清理操作的細節中,先將有用和無用的定時器分離(分別放置到數組前後),可以有效提高記憶體操作的效率
- 時間輪結構的設計:時間輪結構的設計非常巧妙,源於hash也兼顧了空間,並且將高頻操作(最近時間段)和低頻操作(較遠時間段)分離了開來
- 時間輪指針設計:利用整數進位,簡化時間輪指針