消息抽象層設計和實現-OSS.DataFlow

  前面已經介紹了消息生產消費中間類庫(OSS.DataFlow)的簡單使用,這篇主要介紹內部的設計實現。主要內容包含:

  1. 消息生產消費的抽象設計。

  2. 具體使用示例

一. 消息生產消費的抽象設計。

  需要首先強調的是,這裡的生產消費抽象主要在業務使用層面,拋開具體的RabbitMQ之類的消息隊列產品。可能說起來比較模糊,我們先看下常見的業務調用消息隊列的情況:

  上邊是一個常見的通過消息隊列將業務異步拆解的模型,按道理結構已經十分簡單了,特別是對於一個相對穩定的業務,代碼基本變動不大的情況下,這個模型基本足夠了。不過對於一個愛折騰的開發人,不玩點花里胡哨的,存在感就沒了,經過各種假設論證之後,終於找到了這麼幾個不滿意的地方:

 

  1.  開發,測試,生產,都需要搭建對應的Rabbit實例,特別是開發測試環境搶消息,再加上多人開發,大麻煩可能沒有,但小麻煩一定是不斷的。

  2.  業務代碼中,直接調用了具體消息隊列的產品,當某一個模塊消息量快速上升,無法局部切換隊列產品。(當然你也可以切分出獨立的服務,但是耗時,代價相對較大)。

  3.  對於同一解決方案內的異步消息,或多或少的會出現生產消息調用和消費消息調用代碼分散(比如用戶登錄後添加日誌)。

  4. 當前的項目代碼依賴外部消息隊列或者數據庫(如果公司偏項目型,新項目無法輕裝上陣)

 

  以上臆測僅個人偏見,僅供參考。在我的角度,我希望在業務調用消息中轉的過程中,需要面向的是接口,在需要的時候適配即可,所以我嘗試添加一個輕簡的中間層。

  這個中間層第一件事就是隔離,草圖設計如下:

  通過全局 DataFlowFactory 能夠創建消息發佈者(IDataPublisher),並能夠注入訂閱者(IDataSubscriber) ,業務層只需要通過 IDataPublisher,IDataSubscriber 接口交互,和具體消息存儲設施脫離。順着這個思路,當業務需要生產寫入消息時,創建發佈者,並通過發佈者寫入消息,並完成訂閱者的回調,這個環路即可完成。

  現在只需要解決兩個問題:

  1. 創建的發佈者,如何實現不同場景的擴展。

  2.  如何完成對應的訂閱者(支持同一消息類型,多個訂閱者)的回調。

  這裡我引入一個全局管理對象(DataFlowManager),內部的調用過程圖示如下:

 

  在 DataFlowManager 中提供了  PublisherProvider 公開屬性,可用來擴展不同消息設施的發佈者實現。同時,提供了 NotifySubscriber 方法,作為已註冊消息訂閱者的統一觸發入口(內部完成了多個訂閱者的調度,當然如果針對特殊消費者調用,用戶也可以跳過註冊訂閱者,自由實現訂閱處理)。

  通過上邊的整個過程,完全實現了消息中間層的功能,以插件的形式將具體的消息設施在程序的全局入口注入,這樣就可以針對不同環境不同業務模塊(入口的參數 source_name 控制)做定製化。同時,在大多數的項目中(包括在開發環境中)並無需立即使用獨立的消息設施,所以在中間件的內部,提供了一個默認的內存消息隊列實現,這樣也保證了類庫的即引即用,擴展後的圖示如下:

  根據上邊的過程圖可以看出,內部的默認隊列,和外部隊列所處統一層級,當沒有提供用戶自定義 PublisherProvider時(或者Provider 返回的 DataPublisher為空),系統會執行內部的默認隊列實現。

二. 具體使用示例

  上邊展示了內部設計,這裡介紹具體的代碼使用層面,看在實際的使用中是如何簡化內斂整個消息的處理。

  1.  業務側調用:

   上邊演示了消息Key為 「 P-S-Msg 」 ,有兩個訂閱者(這裡使用了委託方法,也可以傳入繼承IDataSubscriber<MsgData> 接口的實例),並且創建了一個名為 _publisher 的發佈者(如果沒有註冊其他消息存儲適配實現,會走默認內部消息隊列實現,即創建一個名為 NewSource 的隊列)。除了上邊的使用方式,有些時候我們的生產和消費代碼都是同一個服務內部,比如用戶登錄和添加登錄日誌,這個時候提供了一個更簡單的方式:

   可以看出,具體的業務代碼相對很清晰,不需要關注具體的消息底層實現,或者什麼觸發方式(定時,webhook或者內部消費線程)。而這些內容全部轉移到全局模塊插件化適配。

二. 消息的底層適配:

 

   上邊的代碼,自定義了一個 CustomMsgStorage 消息存儲適配器,並在全局初始化時,賦值給  DataFlowManager.PublisherProvider ,在這個適配器里,約定了當  source_name 等於 CustomStorageQueue 時返回消息發佈者 CustomMsgStoragePublisher,當然這個具體實現可以替換成 RabbitMQ,Redis,Mysql 等等,當消息實際消費觸發時,調用 DataFlowManager.NotifySubscriber() 方法即可,在上邊的測試用例里,我簡化了這個過程,直接調用,實際場景根據情況調整即可(比如放在RabbitMQ的消費監控線程,或者讀取Mysql數據的定時任務中)。

  簡單來說,DataFlowFactory處理業務使用接口(通過 msgkey 關聯生產者和消費訂閱者),DataFlowManager 控制具體的消息適配(通過 source_name 來控制底層適配), 基本解決了前面我所顧慮的問題。

如果你已經看到這裡,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公總號(見二維碼)