.net core 消息流處理流程

前言

2020年即將進入尾聲,分享一下在現公司業務處理流程,一起討論在分散式場景下,如何通過消息流的方式處理各種複雜的業務場景,這裡涉及到一些常用組件,後面結合場景與程式碼來具體說明

場景說明

這裡就拿我負責的簡訊應用來舉例,它由3個核心模組組成

  1. 簡訊網關(接收客戶提交簡訊,接收通道簡訊回執,轉發回執給客戶…)
  2. 計費服務(簡訊計費,購買套餐…)
  3. 簡訊通道(簡訊提交到通道…)

痛點

  • 批量提交營銷類簡訊,如客戶一次提交10w-20w條簡訊
  • 簡訊實時計費,延遲問題
  • 通道限流控制
  • 通知回執延時問題

組件

mysql
redis
阿里雲日誌

流程如下

blockchain

痛點解決方案

  • 批量提交簡訊

批量提交這裡主要以下幾種情況

  1. 簡訊內容一致,手機號多個
  2. 簡訊內容不一致(例如內容攜帶用戶資訊等… 通常採用excel上傳)
  3. 循環調用介面,提交簡訊

這裡僅針對情況3來說明,首先將用戶資訊存入redis(先讀快取,再讀庫),減少在驗證與鑒權時對資料庫的查詢壓力,校驗通過的消息開始寫入收單隊列,並記錄日誌(注意日誌一定要非同步寫入),隊列使用的redis隊列,以目前的業務承載能力,是完全沒有問題的,收單介面的qps可以通過分散式來提升,這個得益於k8s容器伸縮,平時我們一般是5個pod在運行(相當於負載5個應用),在節假日高峰期可以起10個pod,這裡性能的瓶頸主要集中在redis隊列,如果有更高要求可以嘗試換成rabbitmq,kafka等

  • mysql批量持久化

這也一直是我比較迷的一個地方,資料庫使用的阿里雲mysql,單條循環插入速度在200/s左右,我這裡採用的dapper,通過拼接values來批量插入,速度大概能達到3000/s,後面看看有沒有更好的方案

  • 簡訊實時計費,延遲問題

在分散式情況下,實時計費又是一個比較大的性能瓶頸,直接讀庫,並修改用戶條數顯然是不行的,這樣會出現臟讀,導致最終數據不準確而出損(程式設計師就要背鍋),而且效率低下,使用redis分散式鎖等同於將分散式改成單應用,需要頻繁更新快取里的用戶簡訊餘量,速度大概在150/s – 180/s,消費速度過慢會導致隊列里數據堆積,簡訊延遲過高,特別是通知類簡訊(如獲取驗證碼)對延遲有較高的要求,這裡使用redis的計數器來實現,通過計數器遞減的方式,如果簡訊餘量<0則用戶簡訊餘量不足,再單獨起一個任務,每分鐘同步一下用戶簡訊餘量,用戶充值與簡訊失敗回退,也是對計數器的操作

  • 通道限流控制

通道限流主要是供貨商對通道進行流量限制,超頻的簡訊會直接失敗,一般出現在營銷類簡訊,因為每條通道的價格(與地域,三網,到達率…有關)都不一樣,每個用戶都會分配通道,為了讓簡訊盡量成功,程式需要進行限流,這裡也是使用redis計數器實現,設置一個1分鐘失效的快取,超過頻次後,會嘗試其它通道,沒有可用通道則再次寫入隊列

  • 通知回執延時問題

系統拿到回執後,需要將回執通知給客戶配置的http地址,並得到客戶的響應,未響應的回執會再次入列,直到客戶響應,或者超過推送策略(推送3次或超過多長時間),有些客戶配置的http地址響應非常慢,或者乾脆是一些訪問超時的地址,這樣會導致通知延遲,通知類發卡密業務的簡訊對回執有較高要求,這裡通過多執行緒來實現,通過創建多個執行緒從隊列里獲取回執並轉發

  List<Task> tasks = new List<Task>();
  for (int i = 0; i < 10; i++)
  {
      tasks.Add(Task.Run(async () =>{...}));
  }
  await Task.WhenAll(tasks);
  • 注意事項
  1. 因為操作redis的地方非常多,為了便於管理,所有redis的key建議統一寫在常量類里,並寫上注釋,並制定對應的規範,方便維護
/// 項目名_類型_操作_參數    有效期
project_queue_action_params 
  1. 業務日誌,業務日誌方便排查問題,建議不要偷懶,在業務的入口跟出口都寫上對應的日誌資訊
Tags: