讀完 RocketMQ 源碼,我學會了如何優雅的創建執行緒

RocketMQ 是一款開源的分散式消息系統,基於高可用分散式集群技術,提供低延時、高可靠的消息發布與訂閱服務。

這篇文章,筆者整理了 RocketMQ 源碼中創建執行緒的幾點技巧,希望大家讀完之後,能夠有所收穫。

1 創建單執行緒

首先我們先溫習下常用的創建單執行緒的兩種方式:

  • 實現 Runnable 介面
  • 繼承 Thread 類

▍一、實現 Runnable 介面

圖中,MyRunnable 類實現了 Runnable 介面的 run 方法,run 方法中定義具體的任務程式碼或處理邏輯,而Runnable 對象是作為執行緒構造函數的參數。

▍二、 繼承 Thread 類

執行緒實現類直接繼承 Thread ,本質上也是實現 Runnable 介面的 run 方法。

2 單執行緒抽象類

創建單執行緒的兩種方式都很簡單,但每次創建執行緒程式碼顯得有點冗餘,於是 RocketMQ 里實現了一個抽象類 ServiceThread 。

抽象類 ServiceThread

我們可以看到抽象類中包含了如下核心方法:

  1. 定義執行緒名;
  2. 啟動執行緒;
  3. 關閉執行緒。

下圖展示了 RocketMQ 眾多的單執行緒實現類。

實現類的編程模版類似 :

我們僅僅需要繼承抽象類,並實現 getServiceNamerun 方法即可。啟動的時候,調用 start 方法 , 關閉的時候調用 shutdown 方法。

3 執行緒池原理

執行緒池是一種基於池化思想管理執行緒的工具,執行緒池維護著多個執行緒,等待著監督管理者分配可並發執行的任務。這避免了在處理短時間任務時創建與銷毀執行緒的代價。執行緒池不僅能夠保證內核的充分利用,還能防止過分調度。

JDK中提供的 ThreadPoolExecutor 類,是我們最常使用的執行緒池類。

ThreadPoolExecutor構造函數

參數名 作用
corePoolSize 隊列沒滿時,執行緒最大並發數
maximumPoolSizes 隊列滿後執行緒能夠達到的最大並發數
keepAliveTime 空閑執行緒過多久被回收的時間限制
unit keepAliveTime 的時間單位
workQueue 阻塞的隊列類型
threadPoolFactory 改變執行緒的名稱、執行緒組、優先順序、守護進程狀態
RejectedExecutionHandler 超出 maximumPoolSizes + workQueue 時,任務會交給RejectedExecutionHandler來處理

任務的調度通過執行 execute方法完成,方法的核心流程如下:

  1. 如果 workerCount < corePoolSize,創建並啟動一個執行緒來執行新提交的任務。
  2. 如果 workerCount >= corePoolSize,且執行緒池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且執行緒池內的阻塞隊列已滿,則創建並啟動一個執行緒來執行新提交的任務。
  4. 如果 workerCount >= maximumPoolSize,並且執行緒池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

4 執行緒池封裝

在 RocketMQ 里 ,網路請求都會攜帶命令編碼,每種命令映射對應的處理器,而處理器又會註冊對應的執行緒池。

當服務端 Broker 接收到發送消息命令時,都會有單獨的執行緒池 sendMessageExecutor 來處理這種命令請求。

基於 ThreadPoolExecutor 做了一個簡單的封裝 ,BrokerFixedThreadPoolExecutor 構造函數包含六個核心參數:

  1. 核心執行緒數和最大執行緒數相同 ,數量是:cpu核數和4比較後的最小值;
  2. 空閑執行緒的回收的時間限制,默認1分鐘;
  3. 發送消息隊列,有界隊列,默認10000;
  4. 執行緒工廠 ThreadFactoryImpl ,定義了執行緒名前綴:SendMessageThread_ 。

RocketMQ 實現了一個簡單的執行緒工廠:ThreadFactoryImpl,執行緒工廠可以定義執行緒名稱,以及是否是守護執行緒 。

執行緒工廠

開源項目 Cobar ,Xmemcached,Metamorphosis 中都有類似執行緒工廠的實現 。

5 執行緒名很重要

執行緒名很重要,執行緒名很重要,執行緒名很重要 ,重要的事情說三遍。

我們看到 RocketMQ 中,無論是單執行緒抽象類還是多執行緒的封裝都會配置執行緒名 ,因為通過執行緒名,非常容易定位問題,從而大大提升解決問題的效率。

定位的媒介常見有兩種:日誌文件堆棧記錄

▍一、日誌文件

經常處理業務問題的同學,一定都經常與日誌打交道。

  • 查看 ERROR 日誌,追溯到執行執行緒, 要是執行緒池隔離做的好,基本可以判斷出哪種業務場景出了問題;
  • 通過查看執行緒列印的日誌,推斷執行緒調度是否正常,比如有的定時任務執行緒列印了開始,沒有列印結束,推論當前執行緒可能已經掛掉或者阻塞。

▍二、堆棧記錄

jstack 是 java 虛擬機自帶的一種堆棧跟蹤工具 ,主要用來查看 Java 執行緒的調用堆棧,執行緒快照包含當前 java 虛擬機內每一條執行緒正在執行的方法堆棧的集合,可以用來分析執行緒問題。

jstack -l 進程pid

筆者查看執行緒堆棧,一般關注如下幾點:

  1. 當前 jvm 進程中的執行緒數量和執行緒分類是否在預期的範圍內;
  2. 系統介面超時或者定時任務停止的異常場景下 ,分析堆棧中是否有鎖未釋放,或者執行緒一直等待網路通訊響應;
  3. 分析 jvm 進程中哪個執行緒佔用的 CPU 最高。

6 總結

本文是RocketMQ 系列文章的開篇,和朋友們簡單聊聊 RocketMQ 源碼里創建執行緒的技巧。

  1. 單執行緒抽象類 ServiceThread

    使用者只需要實現業務邏輯以及定義執行緒名即可 ,不需要寫冗餘的程式碼。

  2. 執行緒池封裝

    適當封裝,定義執行緒工廠,併合理配置執行緒池參數。

  3. 執行緒名很重要

    文件日誌,堆棧記錄配合執行緒名能大大提升解決問題的效率。

RocketMQ 的多執行緒編程技巧很多,比如執行緒通訊,並發控制,執行緒模型等等,後續的文章會一一為大家展現。


如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!