分佈式鏈路追蹤 SkyWalking 源碼分析 —— Agent 發送 Trace 數據

  • 2019 年 12 月 13 日
  • 筆記

摘要: 原創出處 http://www.iocoder.cn/SkyWalking/agent-send-trace/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. TraceSegmentServiceClient
    • 2.1 實現 BootService 接口
    • 2.2 實現 GRPCChannelListener 接口
    • 2.3 實現 TracingContextListener 接口
    • 2.4 實現 IConsumer 接口
  • 666. 彩蛋

1. 概述

分佈式鏈路追蹤系統,鏈路的追蹤大體流程如下:

  1. Agent 收集 Trace 數據。
  2. Agent 發送 Trace 數據給 Collector
  3. Collector 接收 Trace 數據。
  4. Collector 存儲 Trace 數據到存儲器,例如,數據庫。

本文主要分享【第二部分】 SkyWalking Agent 發送 Trace 數據

考慮到減少外部組件的依賴,Agent 收集到 Trace 數據後,不是寫入外部消息隊列( 例如,Kafka )或者日誌文件,而是 Agent 寫入內存消息隊列後台線程異步】發送給 Collector 。

本文涉及的類非常少,如下圖所示:

2. TraceSegmentServiceClient

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 發送服務客戶端。它是一個服務,也是一個客戶端,負責將 TraceSegment 異步發送到 Collector 。

我們先來看看 TraceSegmentServiceClient 的屬性

  • TIMEOUT 靜態屬性,發送等待超時時長,單位:毫秒。
  • lastLogTime 屬性,最後打印日誌時間。該屬性主要用於開發調試。
  • segmentUplinkedCounter 屬性,TraceSegment 發送數量。
  • segmentAbandonedCounter 屬性,TraceSegment 被丟棄數量。在 Agent 未連接上 Collector 時,產生的 TraceSegment 將被丟棄。
  • carrier 屬性,內存隊列。在 《SkyWalking 源碼分析 —— DataCarrier 異步處理庫》 有對 DataCarrier 的詳細解析。
  • serviceStub 屬性,非阻塞 Stub 。
  • status 屬性,連接狀態。

下面,我們來介紹 TraceSegmentServiceClient 實現的接口以及對應的方法。

2.1 實現 BootService 接口

#beforeBoot() 方法,代碼如下:

  • 第 86 行:調用 GRPCChannelManager#addChannelListener(this) 方法,將自己添加到 GRPCChannelManager 中,作為一個監聽器,從而調用 #statusChanged(GRPCChannelStatus) 方法,實現對連接狀態( status )的監聽處理。

#boot() 方法,代碼如下:

  • 第 95 至 97 行:創建 DataCarrier 對象,作為內存隊列,並設置自己作為消費者,從而調用 #consume(List<TraceSegment> ) 方法,實現異步發送 TraceSegment 到 Collector 。

#afterBoot() 方法,代碼如下:

  • 第 102 行:調用 TracingContext.ListenerManager#add(this) 方法,將自己添加到 ListenerManager 中,作為一個監聽器,從而調用 #afterFinished(TraceSegment) 方法,實現收集到新的 TraceSegment ,添加到內存隊列。

#shutdown() 方法,代碼如下:

  • 第 107 行:調用 DataCarrier#shutdownConsumers() 方法,停止消費。

2.2 實現 GRPCChannelListener 接口

#statusChanged(GRPCChannelStatus) 方法,代碼如下:

  • 第 211 至 214 行:連接成功,創建 Stub 對象。
  • 第 215 行:記錄連接狀態。

2.3 實現 TracingContextListener 接口

#afterFinished(TraceSegment) 方法,代碼如下:

  • 第 197 至 199 行:當 TraceSegment.ignore = true 時,忽略該 TraceSegment 。
  • 第 201 行:提交 TraceSegment 到內存隊列。

2.4 實現 IConsumer 接口

#consume(List<TraceSegment>) 方法,代碼如下:

  • —— 連接中 ——
  • 第 119 行:創建 org.skywalking.apm.agent.core.remote。GRPCStreamServiceStatus 對象。
  • 第 122 至 141 行:創建 StreamObserver 對象。在下面,我們可以看到 Agent 發送 TraceSegment 給 Collector 是非阻塞的方式,通過該對象,觀察執行結果。
    • 第 130 行 || 第 139 行:當發生錯誤或者完成時,調用 GRPCStreamServiceStatus#finished() 方法,標記完成。為什麼呢?下面會看到。
    • 第 134 行:調用 GRPCChannelManager#reportError(Throwable) 方法,彙報錯誤。如果是連接錯誤,GRPCChannelManager 會負責斷開重連。
  • 第 144 至 151 行:逐條非阻塞發送 TraceSegment 請求。
    • 注意,此處若等待完成超時,TraceSegment 依然在發送,或者被 Collector 處理中,直到最終的成功或失敗。
    • DistributedTraceId#toUniqueId()
    • ID#transform()
    • AbstractTracingSpan#transform()
    • ExitSpan#transform()
    • LogDataEntity#transform()
    • TraceSegmentRef#transform()
    • KeyValuePair#transform()
    • 第 146 行:調用 TraceSegment#transform() 方法,將 TraceSegment 轉換成 org.skywalking.apm.network.proto.UpstreamSegment 對象,用於 gRPC 傳輸,參見 TraceSegmentService.proto 的數據結構定義。
    • 第 154 行:調用 StreamObserver#onCompleted() 方法,標記全部請求發送完成。
    • 第 157 至 159 行:調用 GRPCStreamServiceStatus#wait4Finish(maxTimeout) 方法,等待 Collector 處理完成。這就是為什麼上面需要調用 GRPCStreamServiceStatus#finished() 方法。完成後,記錄數量到 segmentUplinkedCounter
  • —— 未連接 ——
  • 第 161 行:記錄數量到 segmentAbandonedCounter
  • —— ALL ——
  • 調用 #printUplinkStatus() 方法,每三十秒,打印一次 segmentUplinkedCounter 和 segmentAbandonedCounter 數據。主要用於開發調試。另外,該方法會重置 segmentUplinkedCountersegmentAbandonedCounter 計數。

ps:目前 DataCarrier 最長每 20 秒消費一次。

#onError(List<TraceSegment>, Throwable) 方法,當消費發生異常時,打印日誌。