一次業務代碼的流式重構

封面圖

業務場景

目標 ~> 調度 ~> MQ ~> 引擎,就是生產者消費者模型,非常簡單。

為了提高性能,調度需要將一個大目標拆分為多個子任務,啟動多個引擎並發地去執行。

舉個例子,用戶輸入一個 A 段目標 1.0.0.0/8(2^24=16,777,216),設置了全端口(1-65535)三種協議(ICMP UDP TCP)掃描,假定引擎每次處理 10W 目標,200 個端口時效率最佳。

老代碼

SpiltTargets ~> SpiltPorts ~> SpiltProtocol ~> MQ,代碼抽象為三個函數,順序執行,每個階段執行完才能進入下個階段,中間產生的所有數據都保存在內存中,然後全部推送到 MQ。

SpiltTargets 後,子任務數量變為 16,777,216 / 100,000 = 168

接着 SpiltPorts 後,65535 / 200 = 328,此時子任務數量變為 168 * 328 = 55104

最終 SpiltProtocol,子任務數量 55104 * 2 + 168(ICMP 協議無端口)= 110376,高達 11W 之多

優點

  • 代碼實現簡單
  • 純 CPU 運算,整個拆分過程快,由 MQ 持久化消息,不擔心重啟丟數據(不過不能在拆分的時候重啟)

缺點

  • 調度內存佔用高(一行字符串最終變為 11W 行字符串)
  • MQ 消息數量太多,內存佔用大的同時,還可能丟消息

後續

其實按照 10W 目標,200 個端口拆分,整個系統還算撐得住,直到後來我們的系統把客戶的路由器給打掛了(看來有時候不能一味的追求快)。

為了掃描變慢點,拆分粒度改為了 256 個目標,50 個端口,最終產生消息數 65535 * 1311 * 2 + 65535 = 171,898,305,都上億了,調度和 MQ 都頂不住了!

當時的修改是引入二級隊列,一級還是按照 10W 拆分,後台協程定時從一級獲取消息按照 256 拆分為二級,引擎從二級隊列獲取子任務。

流式重構

雖然上面的二級隊列解決了問題,但是我感覺並不是很完美,為什麼要等到所有的流程都走完才推消息呢?為什麼要先推消息,然後拉回來,再推出去呢?

受到 go-zero/stream 啟發,我決定將其流式化重構,去除業務代碼,核心的骨架如下。

type Stream struct {
	source <-chan []string // 一批目標
	done   chan struct{}   // 退出信號
}

func NewStream(targets []string) Stream {
    // 此處使用無緩衝的 channel 演示,具體可以根據上下游的處理能力設置 buffer
	source := make(chan []string) 
	done := make(chan struct{})

	go func() {
		defer close(source)

		for _, v := range targets {
			select {
			case <-done: // 監聽退出信號
				return
			default:
			}
			source <- []string{v} // 傳遞給下一階段
		}
	}()

	return Stream{
		source: source,
		done:   done,
	}
}

func (s Stream) SpiltTargets(chunk int) Stream {
	source := make(chan []string)

	var buf []string

	go func() {
		defer close(source)

		for msg := range s.source {
			select {
			case <-s.done:
				return
			default:
			}

			// 緩存 chunk 數量的目標後,傳遞給下一階段,算法很簡單,此處忽略
			for _, v := range msg {
				buf = append(buf, v)
			}
			source <- buf
		}
	}()

	return Stream{
		source: source,
		done:   s.done,
	}
}

func (s Stream) SpiltPorts(chunk int) Stream {
	// 邏輯和 SpiltTargets 一致,只不過對端口做處理
}

func (s Stream) PushMQ(protocol []string) Stream {
	// 邏輯基本和上面一致

	// 有個策略,只有在當前隊列消息數少於 500 時,才推送
	// 不能一股腦全推送,否則就和老代碼效果一樣了(拆分速度遠遠快於消費速度)
}

func (s Stream) Wait() {
	// 等待所有的子任務都拆分完成
	for range s.source {
	}

	// 關閉 MQ 連接
}

func (s Stream) Tidy() {
	// 通知所有階段都退出
	close(s.done)

	// 刪除隊列

	// 關閉 MQ 連接
}

使用效果如下:

func main() {
	s := NewStream([]string{"1.0.0.0/8"})
	s.SpiltTargets(10000).PushMQ("icmp").SpiltPorts(200).PushMQ("udp", "tcp").Wait()
}

代碼效果看起來還不錯,就像水一樣徐徐流過,而不像之前水庫泄洪似的。

優點

  • 不用擔心拆分粒度,省內存,MQ 消息數可控
  • 方便拓展,根據業務需求可以加入更多的處理階段

缺點

  • 整個拆分過程伴隨着任務運行一直存在,不能利用 MQ 持久化
  • 只能處理局部數據,不能處理全量數據

後續

由於持久化方案太複雜,目前暫時沒做,不過問題不大,重啟這種非正常情況畢竟機率非常小

總結

Go 的 channel 非常適合做流式處理。

在設計時不僅僅要完成功能,還要適當考慮性能,雖然這樣花費的時間可能稍微多點。

參考

//github.com/kevwan/stream