一次業務代碼的流式重構
業務場景
目標 ~> 調度 ~> MQ ~> 引擎,就是生產者消費者模型,非常簡單。
為了提高性能,調度需要將一個大目標拆分為多個子任務,啟動多個引擎並發地去執行。
舉個例子,用戶輸入一個 A 段目標 1.0.0.0/8(2^24=16,777,216),設置了全端口(1-65535)三種協議(ICMP UDP TCP)掃描,假定引擎每次處理 10W 目標,200 個端口時效率最佳。
老代碼
SpiltTargets
~> SpiltPorts
~> SpiltProtocol
~> MQ
,代碼抽象為三個函數,順序執行,每個階段執行完才能進入下個階段,中間產生的所有數據都保存在內存中,然後全部推送到 MQ。
SpiltTargets
後,子任務數量變為 16,777,216 / 100,000 = 168
接着 SpiltPorts
後,65535 / 200 = 328,此時子任務數量變為 168 * 328 = 55104
最終 SpiltProtocol
,子任務數量 55104 * 2 + 168(ICMP 協議無端口)= 110376,高達 11W 之多
優點
- 代碼實現簡單
- 純 CPU 運算,整個拆分過程快,由 MQ 持久化消息,不擔心重啟丟數據(不過不能在拆分的時候重啟)
缺點
- 調度內存佔用高(一行字符串最終變為 11W 行字符串)
- MQ 消息數量太多,內存佔用大的同時,還可能丟消息
後續
其實按照 10W 目標,200 個端口拆分,整個系統還算撐得住,直到後來我們的系統把客戶的路由器給打掛了(看來有時候不能一味的追求快)。
為了掃描變慢點,拆分粒度改為了 256 個目標,50 個端口,最終產生消息數 65535 * 1311 * 2 + 65535 = 171,898,305,都上億了,調度和 MQ 都頂不住了!
當時的修改是引入二級隊列,一級還是按照 10W 拆分,後台協程定時從一級獲取消息按照 256 拆分為二級,引擎從二級隊列獲取子任務。
流式重構
雖然上面的二級隊列解決了問題,但是我感覺並不是很完美,為什麼要等到所有的流程都走完才推消息呢?為什麼要先推消息,然後拉回來,再推出去呢?
受到 go-zero/stream
啟發,我決定將其流式化重構,去除業務代碼,核心的骨架如下。
type Stream struct {
source <-chan []string // 一批目標
done chan struct{} // 退出信號
}
func NewStream(targets []string) Stream {
// 此處使用無緩衝的 channel 演示,具體可以根據上下游的處理能力設置 buffer
source := make(chan []string)
done := make(chan struct{})
go func() {
defer close(source)
for _, v := range targets {
select {
case <-done: // 監聽退出信號
return
default:
}
source <- []string{v} // 傳遞給下一階段
}
}()
return Stream{
source: source,
done: done,
}
}
func (s Stream) SpiltTargets(chunk int) Stream {
source := make(chan []string)
var buf []string
go func() {
defer close(source)
for msg := range s.source {
select {
case <-s.done:
return
default:
}
// 緩存 chunk 數量的目標後,傳遞給下一階段,算法很簡單,此處忽略
for _, v := range msg {
buf = append(buf, v)
}
source <- buf
}
}()
return Stream{
source: source,
done: s.done,
}
}
func (s Stream) SpiltPorts(chunk int) Stream {
// 邏輯和 SpiltTargets 一致,只不過對端口做處理
}
func (s Stream) PushMQ(protocol []string) Stream {
// 邏輯基本和上面一致
// 有個策略,只有在當前隊列消息數少於 500 時,才推送
// 不能一股腦全推送,否則就和老代碼效果一樣了(拆分速度遠遠快於消費速度)
}
func (s Stream) Wait() {
// 等待所有的子任務都拆分完成
for range s.source {
}
// 關閉 MQ 連接
}
func (s Stream) Tidy() {
// 通知所有階段都退出
close(s.done)
// 刪除隊列
// 關閉 MQ 連接
}
使用效果如下:
func main() {
s := NewStream([]string{"1.0.0.0/8"})
s.SpiltTargets(10000).PushMQ("icmp").SpiltPorts(200).PushMQ("udp", "tcp").Wait()
}
代碼效果看起來還不錯,就像水一樣徐徐流過,而不像之前水庫泄洪似的。
優點
- 不用擔心拆分粒度,省內存,MQ 消息數可控
- 方便拓展,根據業務需求可以加入更多的處理階段
缺點
- 整個拆分過程伴隨着任務運行一直存在,不能利用 MQ 持久化
- 只能處理局部數據,不能處理全量數據
後續
由於持久化方案太複雜,目前暫時沒做,不過問題不大,重啟這種非正常情況畢竟機率非常小
總結
Go 的 channel 非常適合做流式處理。
在設計時不僅僅要完成功能,還要適當考慮性能,雖然這樣花費的時間可能稍微多點。