聊一聊異構系統間數據一致性
之前忙活過一個多方合作的項目,異構系統間維護數據一致性有時候是個頭疼但必須解決的問題,今天我從背景、問題、解決思路、技術方案等幾個方面淺談一下,拋磚引玉。
背景
異構系統
近兩年我司承接了某個持續性的會議項目,即每季度或月不定期舉行會議。本項目目前有三個主要供應方(面向用戶的A方,數據中間B方,會議數據同步C方【我司】)。
為了方便演示問題,以下流程和職責都做了裁剪。
簡化流程如下:
簡化職責如下:
-
A方職責: 用戶通過官網/小程式進行報名,A方調用B方的標準介面,不存儲數據 -
B方職責:作為ISP,提供標準查詢、新增、修改等相關介面,幾乎不提供訂製。基於表單和表單數據,完成數據存儲與流轉。 -
C方職責:提供導入/更新/審核/註銷等入口,新數據會通知到B方,B方數據新增/更新也會通知到C方。
從圖例來看,B方/C方數據存儲方面是冗餘的。但B方只存儲了核心數據,提供不了太多業務行為,C方具有業務需要的全套流程,但在此項目中作為後方支援及後續現場支援,三方形成了一種生態和諧。本篇部落格主旨在討論多方異構系統之間如何保證數據的一致性。
產品/項目
從標準Sass系統來講,這樣的多方交互,不利於系統穩定性,有諸多不可控因素。但從項目角度,這是各方考慮/鬥爭/談判/費用等綜合因素下友好協商的結果。
當然這是一個私有部署項目,所以會有很多堅持和妥協。
大領導提到一個說法:項目是要交付的,功能完美是產品考慮的。在功能不完善的情況下,如何去交付?
最後的兜底
哎,一言難盡。是通宵了幾次核對/修複數據的,這是最後的辦法了。為了苦逼不再重現,今年要對整個線動一動手術。(說好的.net 不996呢?)(拿著白菜價操著賣白粉的心)。
問題
請求無序
-
C方 需要所有子會報名前,主會必須報名。 -
B方 各會之間的報名數據是無序到達的。
循環更新
-
B方 任意報名數據更新或新增都會推送到C方,C方收到更新也會更新B方。這裡有一些措施進行了攔截中斷,但仍會頻繁循環更新問題。這是目前現狀(為什麼會出現?太趕工?)
排錯困難
-
無開發環境,需盲寫程式碼,發到測試環境進行聯調測試。 -
調用鏈太長,日誌過多,排錯時需要根據調用各服務介面來判斷走到了哪步,出現了一個問題。調用鏈能查到一些問題,但不容批量定位問題。單個查太難。
bug
-
高並發下,redis組件出現各種問題(timeout等) -
token問題 -
數據丟失 -
更新失效 -
數據重複 -
隊列積壓 -
介面請求時間超長 -
其他問題…
數據很大,也很小
大部分數據能對上,偶爾幾十個或斷斷續續產生新問題的數據需要及時人工修復。功能有缺陷,人工也是一種交付辦法,但不可持續,太他媽的累了。數據不一致,也是導致通宵核對/修複數據的一大原因。如果數據全一致,就不會那麼辛苦了。
解決思路
管理層
-
明確項目是要繼續做的 -
目標產品化/更方便維護方向發展。一團隊養一項目。 -
有改進想法提出來,拉會推進 -
缺人,招人(遙遙無期…)
技術層
-
針對請求無序問題,引入延時隊列,先處理主會、子會延遲幾秒鐘在處理。 -
針對循環更新問題,記錄B方數據來源,非必要情況下,不回更B方。必須終止掉。【冤冤相報何時了】 -
針對排錯困難問題,引入mysql記錄新增報名的請求以及處理結果,可以更快查詢處理結果。 -
針對bug,測試根據各測試場景進行複測,按10/100/1000/3000/萬級規模壓測。提前發下問題。 -
推進客戶方一起做必要去重邏輯。
其他因素
無論是標準產品還是交付項目,做任何改動都要評估。
-
多溝通,大家都是站在一條線的。有利於事情解決的方案認同度會更高。 -
預估花多少時間,有多少資源。 -
能擠出來的空窗期有多久,客戶方/產品方對於需求的急迫性有多強。 -
基於場景測試,把缺陷優先順序先列出來,根據空窗期先修復緊急缺陷。
把緊急且影響範圍廣的問題解決了,風險就小了很多了。80%的問題是由20%的因素造成的。 這也正符合程式優化中的時間/空間局部性。
「
進程運行時,在一段時間裡,程式的執行往往呈現高度的局部性,
包括時間局部性和空間局部性。
時間局部性是一旦一個指令被執行了, 則在不久的將來,它可能再被執行。
空間局部性是一旦一個指令一個存儲單元被訪問,那麼它附近的單元也將很快被訪問.」
技術方案
mysql實現延遲隊列
-
優先處理主會,子會延時處理
由於隱私問題,這裡只列部分欄位
-
資料庫輪詢獲取未處理數據
這裡如何提高消費速度,可以參考《電腦系統結構》中標量處理機的流水線的一些知識。 -
首先要無相關,即按AccountId分組,分組內的數據是無衝突/相關的,可以分批進行。記錄各任務狀態,最後統一提交資料庫狀態,然後1s後繼續輪詢。這種類似靜態流水線。動態流水線較為複雜,這裡暫不做實現。
do
{
var groupTemps = groupDatas.Skip((pageIndex - 1) * pageSize).Take(pageSize).ToList();
var currentRecords = new List<QidianNotifydelayData>();
foreach (var item in groupTemps)
{
currentRecords.AddRange(item.ToList());
}
var temp = taskFunc(currentRecords);
taskList.Add(temp);
pageIndex++;
}
while ((pageIndex - 1) * pageSize <= groupCount);
//等待全部執行
await Task.WhenAll(taskList.ToArray());
await _dbContext.CommitAsync();
Thread.Sleep(1);
-
如果1s輪詢覺得太浪費,後續可以根據請求發送標記位(下次輪詢時間),有數據時,可以快速輪詢,無數據時放寬時間。極端處理方式,當主會請求過來處理完成後,直接發起子會處理,但要考慮資料庫是否能承受的住這種並發壓力。
-
如果考慮請求會重複執行,可以在執行內加redis鎖。慎用for update,並發一大就over.
/// <summary>
/// 鎖定執行。
/// </summary>
/// <param name="key"></param>
/// <param name="func"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<BizResult<T>> LockExcute<T>(string key, Func<Task<BizResult<T>>> func, int timeSpan)
{
var db = (this._cacheClient as RedisClient).Db;
var mutexKey = string.Format("mutex:", key);
if (await db.StringSetAsync(mutexKey, "1", TimeSpan.FromSeconds(timeSpan), When.NotExists))
{
try
{
var item = await func.Invoke();
return item;
}
catch (Exception ex)
{
_logger.LogError("LockExcute:Exception:" + ex.Message);
return BizResult.BusinessFailed<T>(-1, $"執行失敗,Message:{ex.Message}");
}
finally
{
await db.KeyDeleteAsync(mutexKey);
}
}
else
{
_logger.LogWarning($"LockExcute:Key:{key},正在處理中,請稍候");
return BizResult.BusinessFailed<T>(-1, "正在處理中,請稍候");
}
}
redis實現延遲隊列
-
由於業務中一個Account同時只能處理一個主會,如果在處理子會的時候,主會請求突然過來了,就會有問題,這裡就需要加鎖主會。引入了Redis延遲隊列 -
基於Redis ZSet有序集合實現。 -
思路:當前時間戳和延時時間相加,也就是到期時間,存入Redis中,然後不斷輪詢,找到到期的,拿到再刪除即可。 -
目前實現缺點:不利於監控,未發起http請求處理業務,導致調用鏈有缺。
/// <summary>
/// 3.入隊列
/// </summary>
/// <param name="redisModel"></param>
/// <returns></returns>
public async Task EnqueueZset(DataToModel redisModel)
{
redisModel.UpdateTime = redisModel.UpdateTime.AddSeconds(5);// 最後更新時間 + 5秒
var redisDb = _redisConnectionService.GetRedisConnectionMultiplexer().GetDatabase(0);//默認DB0
if (redisDb != null)
{
IsoDateTimeConverter timeFormat = new IsoDateTimeConverter();
timeFormat.DateTimeFormat = "yyyy-MM-dd HH:mm:ss.fff";
await redisDb.SortedSetAddAsync(ZSet_Queue, JsonConvert.SerializeObject(redisModel, Formatting.Indented, timeFormat), redisModel.UpdateTime.ToTimeStamp());//得分 --放入redis
_logger.LogInformation($"數據排隊--入隊列!redisModel:{JsonConvert.SerializeObject(redisModel)}");
}
}
rabbmit實現延遲隊列
-
死信隊列過期–》重推信隊列?暫未實現。
數據更新方案
-
核心原則:先查詢對比,有變更再更新。從B方數據過來的,盡量不再更新回去。減小並發量,控制複雜度。
數據核對方案
-
待補充。未實現自動化。後期可以獲取雙方系統數據,匯總對比。
部署/壓測/監控
Jmeter(來自於測試同學提供的腳本)
這裡只做簡單截圖
-
配置預定義參數
-
必要情況下配置後置處理程式
-
配置好thread group,http request後,執行調用觀察介面
-
查詢請求執行是否成功 -
查看聚合報告
kubernetes
-
kubectl get nodes 獲取所有節點 -
kubectl get pod -A 查看所有服務,觀察status和age
-
kubectl logs [-f] [-p] POD [-c CONTAINER] 查看日誌資訊。
「
-c, –container=””: 容器名
-f, –follow[=false]: 指定是否持續輸出日誌
–interactive[=true]: 如果為true,當需要時提示用戶進行輸入。默認為true
–limit-bytes=0: 輸出日誌的最大位元組數。默認無限制
-p, –previous[=false]: 如果為true,輸出pod中曾經運行過,但目前已終止的容器的日誌
–since=0: 僅返回相對時間範圍,如5s、2m或3h,之內的日誌。默認返回所有日誌。只能同時使用since和since-time中的一種
–since-time=””: 僅返回指定時間(RFC3339格式)之後的日誌。默認返回所有日誌。只能同時使用since和since-time中的一種
–tail=-1: 要顯示的最新的日誌條數。默認為-1,顯示所有的日誌
–timestamps[=false]: 在日誌中包含時間戳」
mysql監控(來自於運維同學的回饋)
這裡只截圖簡單資訊
-
通過雲監控查看mysql狀態[最大連接數/cpu/記憶體/慢查詢/索引建議/鎖]
調用鏈/日誌
此處暫不截圖。
失控
-
一期方案 -
二期方案
-
三期方案
當然那是進展順利的情況下,不順利的情況下就變成了這樣
某些時候也會聽到如下言論:
-
一定要保證xx的信譽。 -
今晚就不要睡覺了吧?大家多堅持一下。
就如現在的疫情封控一樣,做好了精準防控一片讚歌,失控了就好好居家、共渡難關。
網路和現實都會告訴你什麼就是人間。
總結
以上是關於訂製化需求的一些解決方案,希望對未來類似產品或項目做個參考。本篇從問題著手,分析有利於解決/消除異構系統數據一致性辦法。當然數據一致性也依賴於自身系統的高可用,這裡未做過多描述,以後再說。
到此結束,謝謝觀看!