《中間件性能挑戰賽–分散式統計和過濾的鏈路追蹤》java 選手分享
- 2021 年 1 月 5 日
- 筆記
2020年6月份天池舉辦的《中間件性能挑戰賽》可謂是異常激烈,本人抽業餘時間報名參與,感受比賽慘烈的同時,也有諸多感慨哈,總結一個多月的賽程,多少有一些心得與大家分享
本文原創地址://www.cnblogs.com/xijiu/p/14235551.html 轉載請註明
賽題
賽題地址: //tianchi.aliyun.com/competition/entrance/231790/information
實現一個分散式統計和過濾的鏈路追蹤
-
賽題背景
本題目是另外一種取樣方式(tail-based Sampling),只要請求的鏈路追蹤數據中任何節點出現重要數據特徵(錯慢請求),這個請求的所有鏈路數據都採集。目前開源的鏈路追蹤產品都沒有實現tail-based Sampling,主要的挑戰是:任何節點出現符合取樣條件的鏈路數據,那就需要把這個請求的所有鏈路數據採集。即使其他鏈路數據在這條鏈路節點數據之前還是之後產生,即使其他鏈路數據在分散式系統的成百上千台機器上產生。
-
整體流程
用戶需要實現兩個程式,一個是數量流(橙色標記)的處理程式,該機器可以獲取數據源的http地址,拉取數據後進行處理,一個是後端程式(藍色標記),和客戶端數據流處理程式通訊,將最終數據結果在後端引擎機器上計算。具體描述可直接打開賽題地址 //tianchi.aliyun.com/competition/entrance/231790/information。此處不再贅述

解題
題目分析
可將整體流程粗略分為三大部分
- 一、front 讀取 http stream 至 front 節點
- 網路io
- 二、front 節點處理數據
- cpu處理
- 三、將 bad traces 同步至 backend,backend 經過匯總計算完成上報
- 網路傳輸 + cpu
遵循原則:各部分協調好,可抽象為生成、消費模型,切勿產生數據飢餓;理想效果是stream流完,計算也跟著馬上結束
方案一 (trace粒度)
最先想到的方案是以trace細粒度控制的方案
因題目中明確表明某個 trace 的數據出現的位置前後不超過2萬上,故每2萬行是非常重要的特徵

- ① 按行讀取字元流
BufferedReader.readLine()
- ② 分析計算
- 在某個 trace 首次出現時(p),記錄其結束位置 p + 2w,並將其放入待處理隊列(queue)中
- 如果當前 trace 為 badTrace,將其放入 BadTraceSet 中
- 每處理一行,均從 queue 隊列中拿出 firstElement,判斷是否與之相等,如果相等,且為 badTrace,那麼進入第3步
- ③ 向 backend 發送數據 註:後續所有涉及網路交互的部門均為 netty 非同步交互
- 將當前 trace 對應的所有數據按照 startTime 排序
- 將排好序的數據與該 trace 最後結束位置 endPosition 一併發送至 backend 節點
- ④ backend 通知 front2 發送數據
- backend 接收到從 front1 發送過來的 trace 數據,向 front2 發送通知,要求其發送該 trace 的全部數據
- 註:此處交互有多種情況,在步驟5時,會具體說明
- ⑤ front2 將數據發送至 backend 此處列舉所有可能發生的情況

- 場景 1:front1 主動上報 traceA 後,發現 front2 已經上報該 traceA 數據,結束
- 場景 2:front1 主動上報 traceA 後,front2 未上報,front2 發現該trace在已就緒集合中,排序、上報,結束
- 場景 3:front1 主動上報 traceA 後,front2 未上報,且 front2 的已就緒集合沒有該 trace,在錯誤集合中發現該 trace,結束 註:因該 trace 存在於 badTraceSet 中,故將來某個時刻 front2 一定會主動上報該 trace
- 場景 4:front1 主動上報 traceA 後,front2 未上報,且 front2 的已就緒集合沒有該 trace,那麼等待行數超限後,檢查該 trace 是否存在於 badTraceSet 中,如果已存在,結束
- 場景 5:front1 主動上報 traceA 後,front2 未上報,且 front2 的已就緒集合沒有該 trace,那麼等待行數超限後,檢查該 trace 是否存在於 badTraceSet 中,如果不存在,排序、上報,結束 註:即便是 front2 中不存在該trace的資訊,也需要上報
- ⑥ 結果計算
- 在收集到 front1 跟 front2 數據後,對2個有序集合進行 merge sort 後,計算其 MD5
方案分析
此方案的跑分大致在 25s 左右,成績不甚理想,總結原因大致可分為以下幾種
- 交互場景較為複雜
- 需要維護一塊快取區域
- 如果該快取區域通過行數來失效過期數據的話,那麼需要額外的分支計算來判斷過期數據,拖慢整體響應時間
- 如果通過快取大小來自動失效過期數據的話,那麼大小設置很難平衡,如果太小,則可能會失效一些正常數據,導致最終結果不正確,如果太大,又會導致程式反應變慢,帶來一系列不可控的問題
基於上述原因,為了充分利用 2萬行的數據特徵,引入方案二
方案二 (batch粒度)
說明:為了更優雅處理數據過期及充分利用2萬行特性,故衍生出此版本
因題目中明確表明某個trace的數據出現的位置前後不超過2萬上,故每2萬行數據可作為一個批次存儲,過期數據自動刪除

-
① 按行讀取字元流
BufferedReader.readLine()
-
② 每2萬行數據作為一個batch,並分配唯一的batchId(自增即可),此處涉及大量cpu計算,分為2部分
- 在每行的 tag 部分尋找
error=1或http.status_code!=200的數據並將其暫存 - 將 traceId 相同的 span 數據放入預先開闢好空間的數據結構
List<Map<String, List<String>>>中,方便後續 backend 節點拿取數據 - 註:此處下載數據與處理數據並行執行,交由2個執行緒處理,一切為了提速
- 在每行的 tag 部分尋找
-
③ 上報 badTraceId
- 每切割 2萬行,統計所有的 badTraceId,與 batchId 一併上報至 backend
- 因同一個 span 數據可能分布在2個 front 節點中,所以有可能 front1 存在錯誤數據,front2 卻全部正確,2個 front 又不能直接通訊,所以此時需要同步至 backend,由 backend 統計全量的 badTraceIds
- front 收到 backend 的通知後,進行當前批次錯誤 trace 數據的統計,因當前批次的數據有可能出現在上一個批次跟下一個批次,故一定要等到處理每行數據的執行緒已經處理完 currentBatchNum+1 個執行緒後,方能執行操作
-
④ 通知2個 front 節點發送指定 traceIds 的全量數據
- backend 主動向2個 front 發送獲取指定 traceIds 的全量數據通知
- front 將 span 數據排好序後上報至 backend
- backend 執行二路歸併排序後,計算整體 span 的 md5 值,反覆循環,直至數據流讀取完畢 註:因2個 front 節點為2核4g,backend 節點為單核2g,為減少 backend 壓力,將部分排序工作下放至 front 節點
-
⑤ 計算結果
- 歸併排序,計算最終結果
方案總結
當前方案耗時在20s左右,統計發現字元流的讀取耗時15s,其他耗時5s,且監控發現各個緩衝區沒有發現飢餓、過剩的情況,所以當前方案的瓶頸還是卡在字元流的讀取、以及cpu判斷上,所以一套面向位元組流處理的方案呼之欲出
- 跟讀
BufferedReader源碼,發現其將位元組流轉換字元流做了大量的工作,也是耗時的源頭,故需要將當前方案改造為面向位元組的操作
方案三 (面向位元組)
字元處理是耗時的根源,只能將方案改造為面向位元組的方式,倘若如此,java 的大部分數據結構不能再使用
大層面的設計思想與方案二一致,不過面向位元組處理的話,從讀取流、截斷行、判斷是否為bad trace、數據組裝等均需為位元組操作
- 好處:預分配記憶體,面向位元組,程式性能提高
- 弊端:編碼複雜,需自定義數據協議

-
① 讀取位元組流
- 程式在初始化時,預先分配10個位元組數據
byte[],每個數組存放10M數據 - io 與 cpu 分離,將讀取數據任務交個某個獨立執行緒,核心執行緒處理cpu
- 程式在初始化時,預先分配10個位元組數據
-
② 數據處理
- 用固定記憶體結構
int[20000]替換之前動態分配記憶體的數據結構體List<Map<String, List<String>>>,只記錄每行開始的 position - 同時將 bad trace id 存入預先分配好的數組中
- 用固定記憶體結構
-
③ 上報 badTraceId
- 同方案二
-
④ 通知2個 front 節點發送指定 traceIds 的全量數據
- backend 主動向2個 front 發送獲取指定 traceIds 的全量數據通知
- 因在步驟二時,並沒有針對 trace 進行數據聚合,所以在搜集數據時,需要遍歷
int[20000],將符合要求的 trace 數據放入自定義規範的byte[]註:剛開始設計的(快排+歸併排序)的方案效果不明顯,且線上的評測環境的2個 front 節點壓力很大,再考慮到某個 trace 對應的 span 數據只有幾十條,故此處將所有的排序操作都下放給 backend 節點,從而減輕 front 壓力 - 因 span 為變長數據,故自定義規範
byte[]存儲數據的設計如下- 預先分配10M
byte[],來存儲一個批次中的所有 bad trace 對應 span 數據 - 用2個 byte 存放下一個 span 數據的長度
- 存儲 span 數據
- 最後返回
byte[]及有效長度
- 預先分配10M
-
⑤ 計算結果
- 排序,計算最終結果
執行緒並發

- A-Thread: slot 粒度,讀取 http stream 執行緒
- B-Thread: block 粒度,處理 slot 中的 block,將 block 數據按行切割、抓取 bad trace id 等
- C-Thread: block 粒度,響應 backend 拉取數據的請求
阻塞場景
- A-Thread 讀取 http stream 中數據後,將其放入下一個快取區,如果下一個緩衝區還沒有被執行緒C消費,那麼A-Thread 將被阻塞
- B-Thread 處理數據,如果B-Thread下一個要處理的
byte[]數據A執行緒還未下載完畢,那麼B-Thread將被阻塞(io阻塞) - C-Thread 為拼接 bad trace 的執行緒,需要 previous、current、next 3個 batch 都 ready後,才能組織數據,當B-Thread還未處理完next batch 數據時,C-Thread將被阻塞
解決思路
- A-B 同步:10個 slot 分配10個
Semaphore,為 A-Thread 與 B-Thread 同步服務,A-Thread 產生數據後,對應 slot 的訊號量+1,B-Thread 消費數據之前,需要semaphore.acquire() - B-C 同步:通過
volatile及納秒級睡眠Thread.sleep(0, 2)實現高效響應。實際測試,某些場景中,該組合性能超過Semaphore;C-Thread 發現 B-Thread 還未產出 next batch 的數據,那麼進入等待狀態 - A-C 同步:同樣利用
volatile及納秒級睡眠Thread.sleep(0, 2)
JVM調參
列印gc輸出日誌時發現,程式會發生3-5次 full gc,導致性能欠佳,分析記憶體使用場景發現,流式輸出的數據模型,在記憶體中只會存在很短的一段時間便會失效,真正流入老年代的記憶體是很小的,故應調大新生代佔比
java -Dserver.port=$SERVER_PORT -Xms3900m -Xmx3900m -Xmn3500m -jar tailbaseSampling-1.0-SNAPSHOT.jar &
直接分配約 4g 的空間,其中新生代占 3.5g,通過觀測 full gc 消失;此舉可使評測快2-3s
方案總結
此方案最優成績跑到了5.7s,性能有了較大提升,總結快的原因如下:
- 面向位元組
- 記憶體預分配;避免臨時開闢記憶體
- 使用輕量級鎖
- 避免程式阻塞或飢餓
奇技淫巧
奇技淫巧,俗稱偷雞,本不打算寫該模組,不過很多上分的小技巧都源於此,真實的通用化場景中,可能本模組作用不大,不過比賽就是這樣,無所不用其極。。。
快速讀取位元組數組
因java語言設計緣故,凡事讀取比 int 小的數據類型,統一轉為 int 後操作,試想以下程式碼
while ((byteNum = input.read(data)) != -1) {
for (int i = 0; i < byteNum; i++) {
if (data[i] == 10) {
count++;
}
}
}
大量的位元組對比操作,每次對比,均把一個 byte 轉換為 4個 byte,效率可想而知
一個典型的提高位元組數組對比效率的例子,採用萬能的Unsafe,一次性獲取8個byte long val = unsafe.getLong(lineByteArr, pos + Unsafe.ARRAY_BYTE_BASE_OFFSET); 然後比較2個 long 值是否相等,提速是成倍增長的,那麼怎麼用到本次賽題上呢?
span數據是類似這樣格式的
193081e285d91b5a|1593760002553450|1e86d0a94dab70d|28b74c9f5e05b2af|508|PromotionCenter|DoGetCommercialStatus|192.168.102.13|http.status_code=200&component=java-web-servlet&span.kind=server&bizErr=4-failGetOrder&http.method=GET
用”|”切割後,倒數第二位是ip,且格式固定為192.168.***.***,如果採用Unsafe,每次讀取一個 int 時,勢必會落在192.168.中間,有4種可能192.、92.1、2.16、.168,故可利用此特性,直接進行 int 判斷
int val = unsafe.getInt(data, beginPos + Unsafe.ARRAY_BYTE_BASE_OFFSET);
if (val == 775043377 || val == 825111097 || val == 909192754 || val == 943075630) {
}
此「技巧」提速1-2秒
大循環遍歷
提供2種遍歷位元組數組方式,哪種效率更高
-
方式1
byte[] data = new byte[1024 * 1024 * 2]; int byteNum; while ((byteNum = input.read(data)) != -1) { for (int i = 0; i < byteNum; i++) { if (data[i] == 10) { count++; } } } -
方式2
byte[] data = new byte[1024 * 1024 * 2]; int byteNum; int beginIndex; int endIndex; int beginPos; while ((byteNum = input.read(data)) != -1) { beginIndex = 0; endIndex = byteNum; beginPos = 0; while (beginIndex < endIndex) { int i; for (i = beginPos; i < endIndex; i++) { if (data[i] == 124) { beginPos = i + 1; times++; break; } else { if (data[i] == 10) { count++; beginIndex = i + 1; beginPos = i + 1; break; } } } if (i >= byteNum) { break; } } }
兩種方式達到的效果一樣,都是尋找換行符。方式2不同的是,每次找到換行符都 break 掉當前循環,然後從之前位置繼續循環。其實這個小點卡了我1個星期,就是將字元流轉換為位元組流時,性能幾乎沒有得到提高,換成方式2後,性能至少提高一倍。為什麼會呈現這樣一種現象,我還沒找到相關資料,有知道的同學,還望不吝賜教哈
結束
這種cpu密集型的賽題,一向是 c/cpp 大展身手的舞台,前排幾乎被其霸佔。作為一名多年 crud 的 javaer,經過無數個通宵達旦,最終拿到了集團第6的成績,雖不算優異,但自己也儘力了哈
最終比賽成績貼上哈





