Fabric2.2中的Raft共識模組源碼分析

引言

Hyperledger Fabric是當前比較流行的一種聯盟鏈系統,它隸屬於Linux基金會在2015年創建的超級賬本項目且是這個項目最重要的一個子項目。目前,與Hyperledger的另外幾個子項目Hyperledger Iroha,Hyperledger Indy和Hyperledger Sawtooth一樣,Hyperledger Fabric正處於生命周期中的活躍(active)階段,它的架構設計正在不斷地完善並持續為開發者和用戶提供更強大,更便捷的區塊鏈服務。

與主流的區塊鏈系統一樣,Hyperledger Fabric實際上也是個去中心化的分散式賬本,其總賬上的數據和交易記錄由網路中的多方節點共同維護,而且賬本上的記錄一旦被寫入將永遠無法被篡改,同時支援基於時間戳的交易追溯。然而,與當前成熟的比特幣,以太坊等公有鏈系統不同的是,Hyperledger Fabric是一種聯盟鏈系統,它的去中心化程度是受限的,即它只允許被授權的節點加入到區塊鏈網路中。更多地,Fabric還提供了創建通道的功能來進一步滿足不同聯盟方的實際需求,這進一步提高了區塊鏈系統的安全性和私密性。

區塊鏈系統中的交易處理和共識模組是一個核心功能,它們為實現區塊鏈的主體功能發揮了核心作用。在接下來的內容中,為了讓讀者更好地了解Fabric中的共識模組,我們首先簡要介紹了Hyperledger Fabric的架構設計,接著詳細分析了Fabric中的交易流程,最後結合Hyperledger Fabric的源碼來深入了解Raft共識演算法及其在區塊鏈系統中的具體實現。

Hyperledger Fabric架構簡介

在Hyperledger Fabric系統的架構中,它引以為豪的一項設計就是採用了模組化的設計思想,並且支援可插拔的組件開發。

具體地,Fabric的架構主要包括三個重要的組成模組,即成員服務,區塊鏈服務以及合約程式碼服務。以下將詳細介紹每個模組包含的功能以及設計原理。

a) 成員服務模組:成員服務之所以會單獨劃分為一個模組主要是考慮到聯盟鏈的特殊性,即每個節點在進入區塊鏈系統之前都需要經過身份驗證,只有通過驗證的節點才能參與到系統中。成員服務提供成員的註冊,身份管理以及認證功能,保證了系統的安全性,便利了節點的許可權管理。

b) 區塊鏈服務模組:無論是在公有鏈系統還是在聯盟鏈系統,區塊鏈服務始終作為區塊鏈系統的核心組成部分,為區塊鏈主體功能提供底層的服務支撐。具體地,該模組主要承擔了節點間的共識管理,賬本的分散式存儲,去中心化網路協議的具體實現等任務。

c) 合約程式碼服務模組:該模組也不是Fabric系統獨有的,很多系統如以太坊等都具備部署智慧合約的功能。在Hyperledger Fabric系統中,智慧合約在Docker容器中運行,所以該模組提供了一個智慧合約的執行引擎為合約程式碼程式提供了一個強大,便捷的部署運行環境。

根據以上的模組劃分,Fabric的詳細架構如下圖1.1所示。

image-20220411201941787

圖1.1 Hyperledger Fabric架構圖

Hyperledger Fabric中的交易流程介紹

在Hyperledger Fabric系統中,所謂的交易就是一次合約程式碼的調用過程,包含兩種類型的交易,即部署交易和調用交易。

部署交易主要用來在Hyperledger Fabric區塊鏈中安裝合約程式碼。具體地,它使用一個程式作為參數創建新的合約程式碼,然後執行部署以完成合約的安裝。而調用交易簡單來說就是執行合約程式碼,當成功地執行調用交易後,可以相應地修改賬本的狀態,並且為客戶端返回輸出結果。不管是部署交易還是調用交易,只要在區塊鏈系統中執行後都會被打包成區塊,區塊鏈接在一起就組成了分散式賬本的區塊鏈。

Hyperledger Fabric區塊鏈系統中,交易主要可以分為三個階段,分別是提議階段,排序打包階段以及驗證提交階段。這裡每個階段參與的節點的類型略有不同,設計到的技術原理也不同。具體地,下圖2.1詳細地描述了Hyperledger Fabric中的交易流程。

image-20220411200222040

圖2.1 Hyperledger Fabric中的交易流程

2.1 提議階段

在Fabric的第一階段中,主要的工作流程是客戶端節點提交交易提案、背書節點模擬執行鏈碼、背書節點為交易提案進行背書、背書節點返回背書結果給客戶端。

具體地,提議階段主要可以細分為以下幾個步驟:

  1. 客戶端首先構建交易的提案,提案的作用是調用通道中的鏈碼來讀取或者將數據寫入分散式賬本。客戶端打包交易提案,並使用用戶的私鑰對提案進行簽名。

  2. 應用端打包完交易提案後,便開始把提案提交給通道中的背書節點。通道的背書策略定義了哪些節點背書後交易才能有效,應用端根據背書策略選擇相應的背書節點,並向它們提交交易提案。

  3. 背書節點收到交易提案後,首先校驗交易的簽名是否合法,然後根據簽名者的身份,確認其是否具有許可權進行相關交易。此外,背書節點還需要檢查交易提案的格式是否正確以及是否之前提交過,這樣做的目的是防止重放攻擊。

  4. 在所有合法性校驗通過後,背書節點按照交易提案,模擬調用鏈碼。鏈碼模擬執行時,讀取的鍵值對數據是節點中本地的狀態資料庫。需要注意的是,鏈碼在背書節點中是模擬執行,即對資料庫的寫操作並不會對賬本作改變。

  5. 在鏈碼模擬執行完成之後,將返回模擬執行的返回值、鏈碼讀取過的數據集和鏈碼寫入的數據集。讀操作集合和寫操作集合將在確認節點中用於確定交易是否最終寫入賬本。

  6. 背書節點把鏈碼模擬執行後得到的讀寫集等資訊使用其私鑰進行簽名(背書籤名)後發回給提案提交方即客戶端。

image-20220411200235712

圖2.2 交易流程之提議階段

2.2 排序和打包交易階段

一般地,等客戶端收集到足夠多的背書節點返回的響應提案的背書響應後,客戶端便會將交易提案、讀寫集和背書籤名等發送給排序節點。排序節點將會對自己接收到的交易資訊按照通道分類進行排序,且打包成區塊。

排序和打包交易階段可以細分為以下幾個子階段:

  1. 客戶端收到背書響應之後,檢查背書節點的簽名和比較不同節點背書的結果是否一致。如果提案是查詢賬本的請求,則客戶端無需提交交易給排序節點。如果是更新賬本的請求,客戶端在收集到滿足背書策略的背書響應數量之後,把背書提案中得到的讀寫集、所有背書節點的簽名和通道號發給排序節點。

  2. 排序節點在收到各個節點發來的交易後,並不檢查交易的全部內容,而是按照交易中的通道號對交易分類排序,然後把相同通道的一批交易打包成區塊。

  3. 排序節點把打包好的區塊廣播給通道中的所有成員。區塊的廣播有兩種觸發條件,一種是當通道的交易數量達到某個預設的閾值,另一種是在交易數量沒有超過閾值但距離上次廣播的時間超過某個特定閾值,也可觸發廣播數據塊。兩種方式相結合,使得排序過的交易可以及時廣播出去。

image-20220411200247216

圖2.3 交易流程之排序和打包區塊階段

2.3 驗證和提交階段

最後,對於驗證和提交階段來說,擔負的職責就是驗證其收到的區塊,即驗證區塊中的背書籤名以及驗證交易的有效性。驗證成功後,Peer節點將更新賬本和世界狀態。

驗證和提交階段的詳細工作流如下:

  1. 節點收到排序節點發來的交易數據塊後,逐筆檢查區塊中的交易。先檢查交易的合法性以及該交易是否曾經出現過。然後調用 VSCC(Validation System Chaincode)的系統鏈碼檢驗交易的背書籤名是否合法,以及背書的數量是否滿足背書策略的要求。

  2. 接下來進行多版本並發控制 MVCC 的檢查,即校驗交易的讀集是否和當前賬本中的版本一致。如果沒有改變,說明交易寫集中對數據的修改有效,把該交易標註為有效,交易的寫集更新到狀態資料庫中。

  3. 如果當前賬本的數據和讀集版本不一致,則該交易被標註為無效,不更新狀態資料庫。數據塊中的交易數據在標註成”有效”或”無效”後封裝成區塊寫入賬本的區塊鏈中。

  4. 最後,節點會通過事件機制通知客戶端交易是否已經被加入區塊鏈以及交易是否有效。

image-20220411200256709

圖2.4 交易流程之驗證和提交階段

Hyperledger Fabric中的共識演算法及其源碼分析

對於Hyperledger Fabric系統來說,前一節分析的整個交易流程就是共識,通過這個交易處理流程,所有的Peer節點在由排序節點提供的流程中對交易的排序和根據交易打包成的區塊達成了一致。因此,我們可以知道排序服務是共識機制中最重要的一環,所有的交易都需要通過排序服務後才能達成全網節點的共識。

Hyperledger Fabric 的網路節點本質上是互相複製的狀態機,節點之間需要保持相同的賬本狀態。為了實現這個目的,各個節點需要通過共識過程,對賬本狀態的變化達成一致性的認同。

如何實現所有節點的共識可以說是去中心化的區塊鏈系統所面臨的最重要的問題之一,而共識機制又被稱為”區塊鏈的靈魂”。所以,針對不同的區塊鏈系統選擇合適的共識演算法對於分散式系統保持一致性是至關重要的。

在區塊鏈領域,使用的比較多的共識演算法有大名鼎鼎的PoW共識演算法,PoS和DPoS等權益證明演算法,以及PBFT,RAFT等共識演算法。對於Hyperledger Fabric這類聯盟鏈系統,PoW和PoS等演算法並不適用,因為這類演算法實現共識的本質都是挖礦。雖然這類演算法具備完全去中心化和節點自由進出的優點,但是由於挖礦需要耗費大量的電力和CPU資源以及達成共識的周期很長,並不適用於商業的區塊鏈應用。因此,與現在大部分的聯盟鏈系統一樣,Hyperledger Fabric也將目光聚集在PBFT和RAFT等共識演算法上。

Fabric的共識服務設計成了可插拔的模組,以此滿足了根據不同應用場景切換不同共識選項的需求。在Hyperledger Fabric最新版本中,Fabric系統的共識模組中實現了三種共識演算法,其中包括Solo,Kafka以及Raft演算法。官方推薦的是使用Raft共識演算法,但是為了更好地理解Fabric中的共識模組,我們也簡單介紹一下Solo和Kafka這兩種共識演算法。

  1. solo共識:假設網路環境中只有一個排序節點,從Peer節點發送來的消息由一個排序節點進行排序和產生區塊。由於排序服務只有一個排序節點為所有的peer節點服務,雖然可以肯定保證順序一致性,但是沒有高可用性和可擴展性,所以不適合用於生產環境,只能用於開發和測試環境。

  2. Kafka共識:Kafka是一個分散式的流式資訊處理平台,目標是為實時數據提供統一的、高吞吐、低延遲的性能。Hyperledger Fabric之前版本的核心共識演算法通過Kafka集群實現,簡單來說,就是通過Kafka對所有交易資訊進行排序(如果系統存在多個通道,則對每個通道分別排序)。

  3. Raft共識:Raft是Hyperledger Fabric在1.4.1版本中引入的,它是一種基於 etcd 的崩潰容錯(CFT)排序服務。Raft 遵循 “領導者和追隨者” 模型,其中領導者在通道中的排序節點之間動態選出(這個節點集合稱為”consenter set”),該領導者將消息複製到跟隨者節點。Raft保證即使在小部分(≤ (N-1)/2)節點故障的情況下,系統仍然能正常對外提供服務,所以Raft被稱為”崩潰容錯”。

其實,Hyperledger Fabric在1.4.1版本以前,它的核心共識演算法通過Kafka集群實現,但是在1.4.1版本之後,Fabric推薦使用Raft演算法實現節點的共識。其實從提供服務的視角來看,基於Raft和Kafka的排序服務是類似的,他們都是基於CFT(crash fault tolerant)模型的排序服務,並且都使用了主從節點的設置。但是為什麼Hyperledger Fabric選擇Raft演算法呢?我們列舉了Raft相較於Kafka所展現出的優勢來回答這個問題。

a. 第一點,Raft 更容易設置。雖然 Kafka 有很多崇拜者,但即使是那些崇拜者也(通常)會承認部署 Kafka 集群及其所必須的 ZooKeeper 集群會很棘手,需要在 Kafka 基礎設施和設置方面擁有高水平的專業知識。此外,使用 Kafka 管理的組件比使用 Raft 管理的組件多,Kafka 有自己的版本,必須與排序節點協調。而使用 Raft,所有內容都會嵌入到排序節點中。

b. 第二點,Kafka和zookeeper的設計不適用於大型網路。它們的設計是CFT模型,但局限於運行的比較緊密的主機上。也就是說,需要有一個組織專門運行Kafka集群。鑒於此,當有多個組織使用基於Kafka排序服務的時候,其實沒有實現去中心化,因為所有的節點連接的都是由一個組織單獨控制的Kafka集群。如果使用Raft演算法,每個組織可以貢獻排序節點,共同組成排序服務,可以更好的去中心化。

c. 第三點,Raft是原生支援的,而Kafka需要經過複雜的步驟部署,並且需要單獨學習成本。而且Kafka和Zookeeper的支援相關的issue要通過apache來處理,而不是Hyperledger Fabric。Raft的實現是包含在Fabric社區的,開發支援更加便利。

d. 第四點,Raft 是向開發拜占庭容錯(BFT)排序服務邁出的第一步。正如我們將看到的,Fabric 開發中的一些決策是由這個驅動的。Fabric使用Raft共識演算法是向BFT類演算法過渡的步驟。

鑒於以上對Kafka和Raft的分析比較,再考慮到目前最新版的Fabric中推薦採用的共識演算法,我們以下將詳細分析Raft共識演算法,而不是之前版本使用的Kafka。具體地,在3.1節中我將從理論角度闡述Raft共識演算法及共識流程,而在3.2節我則會從Fabric源碼角度來進一步深入分析Raft演算法的實現及其與Fabric交易流程的整合。

3.1 Raft演算法理論

在分散式系統中,為了消除單點故障提高系統可用性,通常會使用副本來進行容錯,但這會帶來另一個問題,即如何保證多個副本之間的一致性?而所謂的一致性並不是指集群中所有節點在任一時刻的狀態必須完全一致,而是指一個目標,即讓一個分散式系統看起來只有一個數據副本,並且讀寫操作都是原子的,這樣應用層就可以忽略系統底層多個數據副本間的同步問題。也就是說,我們可以將一個強一致性分散式系統當成一個整體,一旦某個客戶端成功的執行了寫操作,那麼所有客戶端都一定能讀出剛剛寫入的值。即使發生網路分區故障,或者少部分節點發生異常,整個集群依然能夠像單機一樣提供服務。

為了實現一致性的目的,共識演算法基於狀態複製機模型來建模,所有的節點從一個相同的狀態(state)出發,經過相同的操作日誌,最終達到一致的狀態。在眾多的共識演算法中,Paxos演算法可以說是一個最經典的共識演算法,也是公認的可以實現有效共識的演算法。然而,由於Paxos卻很少在實際架構中應用,因為它難以理解,更加難以實現,作者為了讓讀者更好地理解Paxos演算法,甚至為此專門發表了論文進行進一步解釋。其次,Paxos沒有提供一個足夠好的用來構建一個現實系統的基礎,而且它也並不是十分易於構建實踐性的系統。因此,現在的分散式系統通常使用Paxos的一種變種共識演算法,即Raft演算法,它的優點是容易理解,容易實現,這使得Raft得到更廣泛的普及和應用。

3.1.1 基本概念

Raft 使用 Quorum 機制(一種集群一致性和可用性之間的權衡機制)來實現共識和容錯,我們將對 Raft 集群的操作稱為提案(提案可以簡單理解為對集群的讀寫操作),每當發起一個提案,必須得到大多數(> N/2)節點的同意才能提交。

接下來,我們詳細介紹下Raft中涉及到的一些關鍵概念以及術語。

  1. Leader:Leader負責提取新的日誌條目,將它們複製到跟隨者訂購節點,以及管理何時認為條目已提交。在Hyperledger Fabric中,其中一個排序節點將擔任Leader。

  2. Follower:Follower從Leader那裡接收日誌並確定性地複製它們,確保日誌保持一致。Follower也會收到來自Leader的”心跳”資訊。如果Leader停止在可配置的時間內發送這些消息,Follower將轉換為候選狀態。

  3. 候選狀態(candidate):處於候選狀態的節點會發起選舉,如果它收到集群中大多數成員的投票認可,就轉換為Leader。

  4. 日誌條目:Raft排序服務中的主要工作單元是”日誌條目”,這些條目的完整序列稱為”日誌”。如果成員的多數(法定人數,換言之)成員到條目及其順序達成一致,我們認為日誌是一致的。

  5. 有限狀態機(FSM):Raft中的每個排序節點都有一個FSM,它們共同用於確保各個排序節點中的日誌序列是確定性的(以相同的順序編寫)。

  6. Consenter設置:排序節點主動參與給定信道的共識機制並接收信道的複製日誌。這可以是所有可用節點(在單個群集中或在對系統通道有貢獻的多個群集中),或者是這些節點的子集。

  7. 法定人數:描述需要確認提案的最少數量的同意者,以便可以提交交易。對於每個consenter集,這是大多數節點。在具有五個節點的群集中,必須有三個節點才能存在仲裁。如果由於任何原因導致法定數量的節點不可用,則排序節點將無法用於通道上的讀取和寫入操作,並且不能提交新日誌。

  8. 任期:每開始一次新的選舉,稱為一個任期(term),每個 term 都有一個嚴格遞增的整數與之關聯。每當 candidate 觸發領導人選舉時都會增加 term,如果一個 candidate 贏得選舉,他將在本任期中擔任 Leader 的角色。但並不是每個任期都一定對應一個Leader,有時候某個任期內會由於選舉超時導致選不出 Leader,這時candidate會遞增任期號並開始新一輪選舉。

在了解了Raft中的基本概念後,我們再來簡單了解一下Raft演算法的運行過程。

首先,Raft 集群必須存在一個主節點(Leader),沒有主節點集群就無法工作,我們作為客戶端向集群發起的所有操作都必須經由主節點處理。所以 Raft 核心演算法中的第一部分就是領導人選舉,先票選出一個主節點,再考慮其它事情。其次,主節點負責接收客戶端發過來的操作請求,將操作包裝為日誌同步給其它節點,在保證大部分節點都同步了本次操作後,就可以安全地給客戶端回應響應了。這在 Raft中被叫做日誌複製。然後,因為主節點的責任是如此之大,所以節點們在領導人選舉的時候一定要謹慎,只有符合條件的節點才可以當選主節點。此外主節點在處理操作日誌的時候也一定要謹慎,為了保證集群對外展現的一致性,不可以覆蓋或刪除前任主節點已經處理成功的操作日誌。所謂的”謹慎處理”,其實就是在選主和提交日誌的時候進行一些限制,這一部分在 Raft 共識演算法中叫安全性保證。

Raft 核心演算法其實就是由這三個子問題組成的:領導人選舉、日誌複製、安全性。這三部分共同實現了 Raft 核心的共識和容錯機制。

3.1.2 領導人選舉

Raft集群中每個節點都處於Leader,Follower和Candidate三種角色之一。在領導人選舉的過程中,節點的這些狀態將隨著選舉場景的不同而發生切換。接下來,我們將詳細剖析領導人選舉的流程。

Raft 的領導人選舉基於一種心跳機制,集群中每個節點剛啟動時都是 Follower 身份,Leader 會周期性的向所有節點發送心跳包來維持自己的權威,那麼首個 Leader 是如何被選舉出來的呢?方法是如果一個 Follower 在一段時間內沒有收到任何心跳,也就是選舉超時,那麼它就會主觀認為系統中沒有可用的 Leader,並發起新的選舉。

這裡有一個問題,即這個”選舉超時時間”該如何制定?如果所有節點在同一時刻啟動,經過同樣的超時時間後同時發起選舉,整個集群會變得低效不堪,極端情況下甚至會一直選不出一個主節點。Raft 巧妙的使用了一個隨機化的定時器,讓每個節點的”超時時間”在一定範圍內隨機生成,這樣就大大的降低了多個節點同時發起選舉的可能性。

若Follower想發起一次選舉,Follower需要先增加自己的當前任期,並將身份切換為candidate。然後它會向集群其它節點發送”請給自己投票”的消息。在此之後,系統中會出現三種可能的結果。

第一種,當前candidate節點選舉成功。當candidate從整個集群的大多數(N/2+1)節點獲得了針對同一任期的選票時,它就贏得了這次選舉,立刻將自己的身份轉變為Leader 並開始向其它節點發送心跳來維持自己的權威。每個節點針對每個任期只能投出一張票,並且按照先到先得的原則。這個規則確保只有一個 candidate會成為Leader。

第二種,當前candidate節點選舉失敗。candidate 在等待投票回復的時候,可能會突然收到其它自稱是Leader 的節點發送的心跳包,如果這個心跳包里攜帶的任期號不小於 candidate 當前的任期號,那麼candidate 會承認這個Leader,並將身份切回 Follower。這說明其它節點已經成功贏得了選舉,我們只需立刻跟隨即可。但如果心跳包中的任期號比自己小,candidate會拒絕這次請求並保持選舉狀態。

第三種,選舉超時。如果有多個Follower 同時成為 candidate,選票是可能被瓜分的,如果沒有任何一個candidate 能得到大多數節點的支援,那麼每一個 candidate都會超時。此時candidate 需要增加自己的任期號,然後發起新一輪選舉。如果這裡不做一些特殊處理,選票可能會一直被瓜分,導致選不出Leader來。這裡的”特殊處理”指的就是前文所述的隨機化選舉超時時間。

3.1.3 日誌複製

前面我們也提到了,Raft共識演算法是基於狀態複製機(RPM)模型實現的,也就是說Raft需要保證集群中所有節點的日誌log一致。在Raft模型中,Leader節點承擔了領導集群的任務,所有的日誌都需要先交給Leader節點處理,並由Leader節點複製給其他節點(Follower),這個處理過程被稱為日誌複製。

一旦Leader被集群中的節點選擇出來,它就開始接收客戶端請求,並將操作包裝成日誌,並複製到其它節點上去。日誌複製的整體流程如下:

  1. Leader為客戶端提供服務,客戶端的每個請求都包含一條即將被RPM執行的指令。

  2. Leader把該指令作為一條新的日誌附加到自身的日誌集合,然後向其它節點發起附加條目請求,來要求它們將這條日誌附加到各自本地的日誌集合。

  3. 當這條日誌已經確保被安全的複製,即大多數(N/2+1)節點都已經複製後,Leader 會將該日誌追加到它本地的狀態機中,然後把操作成功的結果返回給客戶端。

各節點的每條日誌除了存儲狀態機的操作指令外,還會擁有一個index值被用來表明它在日誌集合中的位置。此外,每條日誌還會存儲一個任期號,該任期號表示Leader收到這條指令時的當前任期,任期號相同的日誌條目是由同一個Leader在其任期內發送的。當一條日誌被Leader節點認為可以安全的應用到狀態機時,稱這條日誌是committed。那麼什麼樣的日誌可以被 commit 呢?只有當Leader 得知這條日誌被集群過半的節點複製成功時,這條日誌才可以被commit。Raft 保證所有 committed 日誌都已經被持久化,且”最終”一定會被狀態機apply。

當集群中各節點都正常工作的時候,Raft演算法的這種日誌複製機制可以保證一致性,那麼當節點可能出現宕機等特殊情況下,Raft又是如何保持集群日誌一致的呢?

Raft對於當節點出現意外情況宕機後出現的不一致問題也是有解決方法,但是這需要遵循一些規則。其中,最重要的一條就是,Raft 強制要求Follower必須複製Leader的日誌集合來解決不一致問題。換句話說,Follower節點上任何與Leader不一致的日誌,都會被Leader節點上的日誌所強制覆蓋。這並不會產生什麼問題,因為某些選舉上的限制,如果Follower上的日誌與Leader不一致,那麼該日誌在Follower上一定是未提交的,而未提交的日誌並不會應用到狀態機,當然也不會被外部的客戶端感知到。

要使得Follower的日誌集合跟自己保持完全一致,Leader 必須先找到二者間最後一次達成一致的地方。因為一旦這條日誌達成一致,在這之前的日誌一定也都一致。這個確認操作是在一致性檢查步驟完成的。Leader 針對每個Follower 都維護一個next index,表示下一條需要發送給該Follower的日誌索引。當一個 Leader 剛剛上任時,它初始化所有next index值為自己最後一條日誌的index+1。但凡某個Follower的日誌跟Leader不一致,那麼下次日誌複製時的一致性檢查就會失敗。在被Follower 拒絕這次日誌複製請求後,Leader會減少next index的值並進行重試。最終一定會存在一個next index使得Leader和Follower在這之前的日誌都保持一致。

針對每個Follower,一旦確定了next index的值,Leader便開始從該 index 同步日誌,follower會刪除掉現存的不一致的日誌,保留Leader 最新同步過來的。整個集群的日誌會在這個簡單的機制下自動趨於一致。此外要注意,Leader 從來不會覆蓋或者刪除自己的日誌,而是強制Follower與它保持一致。

3.1.4 安全性保證

前一節也提到了,為了保證集群的日誌一致性,Raft 強制要求Follower必須複製Leader的日誌集合來解決不一致問題。這樣做的前提是需要保證每一輪選舉出來的Leader具備”日誌的正確性”,這也就是前面著重強調的”選舉上的限制”。

我們假設有以下的場景:

  • Leader 將一些日誌複製到了大多數節點上,進行commit後發生了宕機。

  • 某個Follower 並沒有被複制到這些日誌,但它參與選舉併當選了下一任 leader。

  • 新的Leader又同步並commit了一些日誌,這些日誌覆蓋掉了其它節點上的上一任committed日誌。

  • 各個節點的狀態機可能apply了不同的日誌序列,出現了不一致的情況。

僅僅依靠前面兩個小節提到的領導人選舉和日誌複製機制並不能保證在這種情況下的節點一致性。為了解決這類問題,Raft加上了一些額外的限制來保證狀態機的安全性和共識演算法的準確性。

Raft首先採取的一個措施就是增加了對選舉的限制。我們再來分析下前文所述的場景,根本問題其實發生在第2步。candidate 必須有足夠的資格才能當選Leader,否則它就會給集群帶來不可預料的錯誤。這需要增加一個判斷,即每個 candidate 必須在競選投票請求中攜帶自己本地日誌的最新 (term, index),如果 Follower發現這個candidate 的日誌還沒有自己的新,則拒絕投票給該candidate。candidate想要贏得選舉成為Leader,必須得到集群大多數節點的投票,那麼它的日誌就一定至少不落後於大多數節點。又因為一條日誌只有複製到了大多數節點才能被commit,因此能贏得選舉的candidate一定擁有所有committed日誌。而比較兩個 (term, index) 的邏輯非常簡單:如果任期號不同則任期號更大的日誌更新,否則index大的日誌更新。

其次,Raft規定了Leader只允許commit包含當前任期號的日誌。所謂 commit 其實就是對日誌簡單進行一個標記,表明其可以被 apply 到狀態機,並針對相應的客戶端請求進行響應。之所以有這個限制,Raft主要考慮到以下的場景:

image-20220411200321469

圖3.1 Leader對不含當前任期號的日誌進行commit引發的異常情況

a) S1是Leader,收到請求後將 (term2, index2) 只複製給了S2,尚未複製給S3 ~ S5。

b) S1宕機,S5當選term3的Leader(S3、S4、S5 三票),收到請求後保存了 (term3, index2),尚未複製給任何節點。

c) S5 宕機,S1恢復,S1重新當選term4的Leader,繼續將 (term2, index2) 複製給了 S3,已經滿足大多數節點,我們將其 commit。

d) S1又宕機,S5 恢復,S5 重新當選Leader(S2、S3、S4 三票),將 (term3, inde2) 複製給了所有節點並 commit。注意,此時發生了致命錯誤,已經 committed 的 (term2, index2) 被 (term3, index2) 覆蓋了。

在上述場景中,問題的根源發生在階段c,即使作為term4 leader 的 S1 將 (term2, index2) 複製給了大多數節點,它也不能直接將其commit,而是必須等待term4的日誌到來並成功複製後,一併進行commit。

為了解決這個問題,Raft規定了Leader只允許commit包含當前任期號的日誌。在增加了這條限制後,我們再來看階段e。

e) 在添加了這個限制後,要麼 (term2, index2) 始終沒有被 commit,這樣S5 在階段d將其覆蓋就是安全的;要麼 (term2, index2) 同 (term4, index3) 一起被 commit,這樣 S5 根本就無法當選 leader,因為大多數節點的日誌都比它新,也就不存在上圖中出現的問題了。

在對Raft共識演算法增加了這兩個限制後,狀態機的安全性得到了極大的保證,更有效地實現了集群日誌的一致性。

3.2 Fabric中Raft演算法的源碼分析

其實,採用 Raft 的系統最著名的當屬etcd(一個高可用的分散式鍵值資料庫),一般認為etcd的核心就是 Raft 演算法的實現。作為一個分散式kv系統,etcd 使用Raft在多節點間進行數據同步,每個節點都擁有全量的狀態機數據。值得說明的是,Hyperledger Fabric對於Raft共識演算法的實現也是參考或者說基於etcd中已經實現的Raft演算法,這一點在Fabric的源碼中也可以得到充分的體現。更重要的是,Fabric在源碼中便將Raft模組的實現命名為etcdraft,這進一步體現出Hyperledger Fabric中的Raft只是對etcd中的Raft做了一層封裝來實現聯盟鏈中的節點共識。

接下來,我將詳細介紹Hyperledger Fabric中對Raft共識演算法的實現與封裝細節,從這裡我們也可以進一步了解到Raft演算法的細節。

3.2.1 Fabric中Raft源碼的核心數據結構

從Fabric中的源碼可以窺見,其底層調用了etcd已經實現的成熟的Raft演算法作為Fabric中共識演算法的核心。etcd中的Raft實現了領導者選舉,日誌複製等核心操作,而把應用層相關的操作如節點間的通訊以及存儲等交給上層應用層也就是這裡的Hyperledger Fabric。

我們首先來看etcd/raft中涉及到的核心數據結構:Node介面和node結構體。

Node介面主要定義了一些Raft演算法實現所必須的方法,這也是根據Raft的理論模型來定義的,主要包括時鐘,選舉等操作。

// Node represents a node in a raft cluster.
type Node interface {
	Tick() //時鐘的實現,選舉超時和心跳超時基於此實現
	Campaign(ctx context.Context) error //參與Leader競爭
	Propose(ctx context.Context, data []byte) error //在日誌中追加數據,需要實現方保證數據追加的成功
	ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更
	Step(ctx context.Context, msg pb.Message) error //根據消息變更狀態機的狀態
	//標誌某一狀態的完成,收到狀態變化的節點必須提交變更,Raft底層的任何變動都會通知到這裡
	Ready() <-chan Ready
	//進行狀態的提交,收到完成標誌後,必須提交過後節點才會實際進行狀態機的更新。在包含快照的場景,為了避免快照落地帶來的長時間阻塞,允許繼續接受和提交其他狀態,即使之前的快照狀態變更並沒有完成。
	Advance()
	//進行集群配置變更
	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
	//變更leader
	TransferLeadership(ctx context.Context, lead, transferee uint64)
	//保證線性一致性讀,
	ReadIndex(ctx context.Context, rctx []byte) error
	//狀態機當前的配置
	Status() Status
	// ReportUnreachable reports the given node is not reachable for the last send.
	//上報節點的不可達
	ReportUnreachable(id uint64)
	//上報快照狀態
	ReportSnapshot(id uint64, status SnapshotStatus)
	//停止節點
	Stop()
}

此外,node結構體也是etcd/raft中的一個核心數據結構,其主要定義了一系列的通道(go語言中的一種數據類型)來實現Raft中的資訊傳遞,而且節點的不同狀態之間的切換也是通過這些通道實現的。

// node is the canonical implementation of the Node interface
type node struct {
   propc      chan msgWithResult
   recvc      chan pb.Message
   confc      chan pb.ConfChange
   confstatec chan pb.ConfState
   readyc     chan Ready
   advancec   chan struct{}
   tickc      chan struct{}
   done       chan struct{}
   stop       chan struct{}
   status     chan chan Status
   logger Logger
}

以上的兩個數據結構是etcd/raft實現的,並不是Fabric自己實現的,Fabric只是單純地調用了它們來完成節點間的共識。但是要怎麼在Fabric中實現與etcd/raft的有機整合就是Fabric需要考慮的事了。

Hyperledger Fabric對Raft演算法的核心實現程式碼都是放在fabric/orderer/consensus/etcdraft包下的,這裡主要包含幾個核心的數據結構,即Chain介面,Chain結構體和node結構體。

首先,Chain介面的定義在fabric/orderer/consensus/etcdraft/consensus.go文件下,它主要定義了排序節點對接收到的客戶端發送來的消息的處理操作,它的詳細定義如下:

// Chain defines a way to inject messages for ordering.
type Chain interface {
    // 負責對普通交易消息進行處理排序
	Order(env *cb.Envelope, configSeq uint64) error
    // 負責對配置交易消息進行處理和排序。當排序服務在 BroadCast 介面收到消息進行校驗和過濾之後,就交由對應 Chain 實例進行處理。
	Configure(config *cb.Envelope, configSeq uint64) error
	WaitReady() error
	Errored() <-chan struct{}
   // Start()負責啟動此 Chain 服務。
	Start()
	Halt()
}

其次,Chain結構體實現了Chain介面,它裡面主要定義了一些通道(channel)用於節點間的通訊,以便根據通訊消息做相應的操作。

// Chain implements consensus.Chain interface.
type Chain struct {
   configurator Configurator
   rpc RPC // 節點與外部節點進行通訊的對象,RPC 是一個介面,包含兩個方法SendConsensus 和 SendSubmit。前面這種用於節點間 raft 資訊的通訊,後者用於轉發交易請求給 leader 節點。
   raftID    uint64
   channelID string
   lastKnownLeader uint64
   ActiveNodes     atomic.Value
   submitC  chan *submit // 接收 Orderer 客戶端提交的共識請求消息的通道
   applyC   chan apply // 接收 raft 節點間應用消息的通道
   observeC chan<- raft.SoftState
   haltC    chan struct{}         
   doneC    chan struct{} 
   startC   chan struct{} 
   snapC    chan *raftpb.Snapshot //接收 raft 節點快照數據的通道
   gcC      chan *gc 
   …
   Node *node // 封裝了底層 raft 庫的節點實例
   …
}

最後,node結構體主要用於將Fabric自己實現的Raft上層應用和etcd的底層Raft實現連接起來,可以說node結構體是它們之間通訊的橋樑,正是它的存在屏蔽了Raft實現的細節。

type node struct {
   chainID string
   logger  *flogging.FabricLogger
   metrics *Metrics
   unreachableLock sync.RWMutex
   unreachable     map[uint64]struct{}
   tracker *Tracker
   storage *RaftStorage
   config  *raft.Config
   rpc RPC
   chain *Chain // 前面定義的Fabric自己實現的Chain結構體
   tickInterval time.Duration
   clock        clock.Clock
   metadata *etcdraft.BlockMetadata
   subscriberC chan chan uint64
   raft.Node // etcd底層的Raft中的節點介面
}

3.2.2 Fabric Raft機制的啟動過程源碼分析

Raft的啟動入口位於fabric/orderer/consensus/etcdraft/chain.go文件中,在Chain的Start()方法中會啟動etcdraft/node.go中的node.start(),而node.start()方法中進而啟動etcd已經封裝好的raft.StartNode()方法。

Chain中的Start()方法定義如下:

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
   …
// 這裡又啟動了etcdraft/node中的start方法
c.Node.start(c.fresh, isJoin)
   close(c.startC)
   close(c.errorC)
   go c.gc()
   go c.run()
   es := c.newEvictionSuspector()
   interval := DefaultLeaderlessCheckInterval
   if c.opts.LeaderCheckInterval != 0 {
      interval = c.opts.LeaderCheckInterval
   }
   c.periodicChecker = &PeriodicCheck{
      Logger:        c.logger,
      Report:        es.confirmSuspicion,
      ReportCleared: es.clearSuspicion,
      CheckInterval: interval,
      Condition:     c.suspectEviction,
   }
   c.periodicChecker.Run()
}

Chain中的Start方法主要完成了啟動etcdraft.Node端的循環來初始化Raft集群節點。而且Chain裡面通過調用c.run()實現了通過循環處理客戶端和Raft底層發送的消息。

我們再來看etcdraft.Node端的Start方法,它作為Chain端和raft/node端的橋樑,會根據Chain中傳遞的元數據配置資訊獲取啟動Raft節點的ID資訊,並且調用底層的Raft.StartNode方法啟動節點,並且像Chain端中一樣會啟動n.run()來循環處理消息。

func (n *node) start(fresh, join bool) {
   …
   var campaign bool
   if fresh {// 是否是新節點標記位
      if join {// 是否是新加入節點標記位
         raftPeers = nil
         n.logger.Info("Starting raft node to join an existing channel")
      } else {
         n.logger.Info("Starting raft node as part of a new channel")
         sha := sha256.Sum256([]byte(n.chainID))
         number, _ := proto.DecodeVarint(sha[24:])
         if n.config.ID == number%uint64(len(raftPeers))+1 {
            campaign = true
         }
      }
      // 調用raft/node中的啟動節點函數,初始化raft
      n.Node = raft.StartNode(n.config, raftPeers)
   } else {
      n.logger.Info("Restarting raft node")
      n.Node = raft.RestartNode(n.config)
   }
   n.subscriberC = make(chan chan uint64)
// run方法中會啟動一個循環用來接收raft節點發來的消息,在這裡經過進一步處理後,轉發給Chain層進行處理,消息的轉發機制都是通過通道來完成的。
   go n.run(campaign)
}

最後,在etcdraft/node中啟動的raft.StartNode()表示進一步啟動了Raft底層的Node節點,在這裡會進行Raft的初始化,讀取配置啟動各個節點以及初始化logindex等。與前面的啟動流程一樣,它同樣會開啟一個run方法以循環的方法不斷監聽各通道的資訊來實現狀態的切換和做出相應的動作。

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
   r := newRaft(c)
   r.becomeFollower(1, None)
   for _, peer := range peers {
      // 將配置中給定的所有節點加入集群
…
   }
//初始化logindex
r.raftLog.committed = r.raftLog.lastIndex()
   for _, peer := range peers {
      r.addNode(peer.ID)
   }
   n := newNode()
   n.logger = c.Logger
   go n.run(r)
   return &n
}

結合上述的源碼分析,圖3.2更加詳細地描述了Raft的啟動流程。

image-20220411201122918

圖3.2 Raft啟動流程圖

3.2.3 Fabric Raft機制的交易處理流程源碼分析

我們在上一節已經根據源碼仔細分析了Raft的啟動流程,接下來Fabric中的排序節點便可以開始接收交易並開始排序和打包成區塊了。這個交易處理流程可以說是Fabric中交易的核心。下面我們也跟著源碼來詳細分析這部分的實現細節。

1. 提案的提交

首先,客戶端將會把已經背書的交易提案以broadcast請求的形式轉發給Raft集群的Leader進行處理。我們在第二節中也提到了,Fabric中的交易可以分為兩類,一類是普通交易,另一類是部署交易(也叫做配置交易)。這兩類請求將分別調用不同的函數,即Order和Configure函數來完成交易提案的提交。

// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
   c.Metrics.NormalProposalsReceived.Add(1)
   return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
   c.Metrics.ConfigProposalsReceived.Add(1)
   return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

2. 轉發交易提案到Leader

我們從上面的源程式碼中可以注意到,不論是何種交易類型,裡面都會調用Submit方法來提交交易提案。在Submit方法中,主要做的事就是將請求消息封裝為結構體並且寫入指定的一個通道中(submitC)以便傳遞給Chain進行處理。此外,它還會判斷當前節點是否是Leader,如果不是,還會將消息重定向給Leader節點。

func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
…
   leadC := make(chan uint64, 1)
   select {
   case c.submitC <- &submit{req, leadC}: // 將消息封裝並且寫入submitC通道
      lead := <-leadC
      if lead == raft.None {
         c.Metrics.ProposalFailures.Add(1)
         return errors.Errorf("no Raft leader")
      }
      if lead != c.raftID { // 當前節點不是Leader,則轉發消息給Leader
         if err := c.forwardToLeader(lead, req); err != nil {
            return err
         }
      }
   …
   return nil
}

3. 對交易排序

前面也提到了,提案將被轉發給Leader,並且消息被封裝為消息結構體後寫入了submitC通道中傳遞到了Chain端。Chain端將不斷接收交易並將它們進行排序處理。

在ordered方法中,將根據不同類型的消息執行不同的排序操作。對於接收到是通道配置消息,比如通道創建、通道配置更新等。先調用ConsensusSupport對配置消息進行檢查和應用,然後直接調用 BlockCutter.Cut() 對報文進行切塊,這是因為配置資訊都是單獨成塊;而對於普通交易消息,則直接校驗之後,調用 BlockCutter.Ordered() 進入快取排序,並根據出塊規則決定是否出塊。

func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]* common.Envelope, pending bool, err error) {
  if c.isConfig(msg.Payload) {
      // 配置消息
      …
      batch := c.support.BlockCutter().Cut()
      batches = [][]*common.Envelope{}
      if len(batch) != 0 {
         batches = append(batches, batch)
      }
      batches = append(batches, []*common.Envelope{msg.Payload})
      return batches, false, nil
   }
   // 普通交易資訊
   if msg.LastValidationSeq < seq {
     …
   }
   batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
   return batches, pending, nil
}

4. 打包區塊

交易消息經c.ordered處理之後,會得到由BlockCutter返回的數據包bathches(可打包成塊的數據)和快取是否還有數據的資訊。如果快取還有餘留數據未出塊,則啟動計時器,否則重置計時器,這裡的計時器由case timer.C處理。

接下來,將會調用propose方法來打包交易為區塊。propose會根據batches數據包調用createNextBlock打包出block ,並將block傳遞給c.ch通道(只有Leader具有propose的許可權)。而如果當前交易是配置資訊,還需要標記處當前正在進行配置更新的狀態。

func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
   for _, batch := range batches {
      b := bc.createNextBlock(batch) // 根據當前批次創建一個區塊
      c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
      select {
      case ch <- b: // 將block傳遞給c.ch通道,Leader可以通過這個通道收到這個區塊
      default:
         c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
      }
      // if it is config block, then we should wait for the commit of the block
      if protoutil.IsConfigBlock(b) {
         c.configInflight = true
      }
      c.blockInflight++
   }
}

5. Raft對區塊的共識

Leader將會前面說的區塊通過調用c.Node.Propose將數據傳遞給底層Raft狀態機。這裡的Propose就是提議將數據寫入到各節點的日誌中,這裡也是實現節點間共識的入口方法。

Propose就是將日誌廣播出去,要所有節點都盡量保存起來,但還沒有提交,等到Leader收到半數以上的節點都響應說已經保存完了,Leader這時就可以提交了,下一次Ready的時候就會帶上committedindex。

func (n *node) Propose(ctx context.Context, data []byte) error {
   return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

這裡涉及到了Raft演算法的具體共識步驟,這裡就不詳細深入了,這部分的內容將在3.2.4節深入剖析。

6. 保存區塊

經過Raft共識後,節點需要將區塊寫入到本地,這裡Raft底層會通過通道的方式傳遞保存區塊到本地的消息(即CommittedEntries不為空的消息)。在這裡,Fabric通過實現apply方法完成了保存區塊的功能。

func (c *Chain) apply(ents []raftpb.Entry) {
   …
   for i := range ents {
      switch ents[i].Type {
      case raftpb.EntryNormal:// 如果是普通entry消息
         …
         block := protoutil.UnmarshalBlockOrPanic(ents[i].Data)
         c.writeBlock(block, ents[i].Index) // 寫入區塊到本地
 c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
      case raftpb.EntryConfChange:// 如果是配置entry消息
         var cc raftpb.ConfChange
         if err := cc.Unmarshal(ents[i].Data); err != nil {
            c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
            continue
         }
         c.confState = *c.Node.ApplyConfChange(cc)
         switch cc.Type {
         case raftpb.ConfChangeAddNode:
            c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
         case raftpb.ConfChangeRemoveNode:
            c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
         default:
            c.logger.Panic("Programming error, encountered unsupported raft config change")
         }
…
      if ents[i].Index > c.appliedIndex {
         c.appliedIndex = ents[i].Index
      }
   }
}

在apply方法中,如果是普通entry,則會調用writeblock寫入區塊到本地,如果這個 block 是配置塊,則將配置塊寫入到 orderer 的賬本中,同時需要解析出其中的配置資訊,看看是否存在 raft 配置項和 raft 節點變動,如果存在變動,則調用 raft 狀態機的 ProposeConfChange 應用此變更,應用層也進行相關的資訊更新;如果是配置entry,解析出其中的配置更新資訊,先調用底層raft 狀態機的ApplyConfChange 應用此配置更新。

結合上述的源碼分析,圖3.3以流程圖的形式更加詳細地描述了Raft的交易流程。

image-20220411201321029

圖3.3 Raft交易源碼分析流程圖

至此,Fabric中關於Raft機制的交易處理流程已經大致分析完成了,限於篇幅,我們僅著重分析了從交易提案提交到保存區塊到各節點的過程,而忽略了背書和驗證等流程的細節,這部分的內容與Fabric中Raft共識演算法的實現關係較小,就不在本文中詳細介紹相關的源碼實現了。

3.2.4 Fabric Raft底層核心演算法實現細節源碼分析

在前一節即3.2.3節中我已經從源碼的角度詳細描述了Fabric中交易提案的提交,交易的打包和區塊的保存等核心內容。然而,前一節中對於Raft實現共識的細節並沒有涉及太多,這部分的內容Fabric本來就沒有自己去實現,而是調用的第三方(etcd)中已經實現好了的Raft演算法。Fabric做的只是實現了將發送提案到Leader以及保存共識後的區塊這些應用層的功能以及實現了與Raft集群底層的消息交互。為了更好地理解Raft的精髓,我們還是不得不進入到etcd的Raft源碼中一探究竟。

3.2.4.1 領導者選舉

當Follower節點發現Leader的心跳超時,會觸發etcd/raft/node.go文件中的run函數中的tickc信道。通過調用tickElection函數實現了超時選舉的功能。

func (r *raft) tickElection() {
   r.electionElapsed++
   if r.promotable() && r.pastElectionTimeout() {
      r.electionElapsed = 0
      r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
   }
}

超時選舉函數中調用Step函數,發送MsgHup消息,並調用campaign函數發布競選消息。在campaign函數中,節點會將自己的Follower狀態設置為candidate狀態,與此同時遞增任期號,最後candidate節點將會向其他節點發送競選消息。

func (r *raft) campaign(t CampaignType) {
   var term uint64
   var voteMsg pb.MessageType
   if t == campaignPreElection {
      r.becomePreCandidate()
      voteMsg = pb.MsgPreVote
      term = r.Term + 1 // 當前任期號自增一
   } else {
      r.becomeCandidate()
      voteMsg = pb.MsgVote
      term = r.Term
   }
   if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
      // 如果集群是單節點,那節點將投票給自己,則獲取的票數一定超過了一半,當選為Leader
      if t == campaignPreElection {
         r.campaign(campaignElection)
      } else {
         r.becomeLeader()
      }
      return
   }
// 向其他節點發送競選領導者的消息
   for id := range r.prs {
      if id == r.id {
         continue
      }
      …
      r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
   }
}

其他節點通過Step函數實現對競選消息的判斷,並依據相應的判斷決定是否給candidate節點投票。其中投票的判斷邏輯主要分兩步。第一步,如果投票資訊中的任期號小於自身的任期號,則直接返回nil,不予投票響應。第二步,通過和本地已存在的最新日誌做比較來判斷,首先看消息中的任期號是否大於本地最大任期號,如果是則投票,否則如果任期號相同則要求競選消息中有最大的日誌索引。

func (r *raft) Step(m pb.Message) error {
   switch { 
   case m.Term > r.Term:// 節點只會投票給任期號大於自己任期號的candidate
      if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
         force := bytes.Equal(m.Context, []byte(campaignTransfer))
         inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
         if !force && inLease {
            ….
            return nil
         }
      }
   case m.Term < r.Term: 
      return nil
   }
   switch m.Type {
   case pb.MsgVote, pb.MsgPreVote: // 如果candidate擁有最新的日誌則發送投票給該candidate
      if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
         …
         r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
         if m.Type == pb.MsgVote {
            // Only record real votes.
            r.electionElapsed = 0
            r.Vote = m.From
         }
      } else {// 否則當前節點會拒絕給此次參與領導者選舉的candidate投票
         r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
      }
   }
   return nil
}

candidate節點收到其他節點的回復後,判斷獲取的票數是否超過半數,如果是則設置自身為Leader,否則還是設置為follower,說明本輪競選領導者失敗。

func stepCandidate(r *raft, m pb.Message) error {
   switch m.Type {
   case pb.MsgProp:
      …
   case myVoteRespType:// 統計投票結果
      gr := r.poll(m.From, m.Type, !m.Reject) 
      switch r.quorum() {
      case gr:// 判斷票數是否超過半數
         if r.state == StatePreCandidate {
            r.campaign(campaignElection)
         } else {
            r.becomeLeader()// 如果票數超過一般則當選為Leader
            r.bcastAppend()
         }
      case len(r.votes) - gr:
         r.becomeFollower(r.Term, None)
      }
   case pb.MsgTimeoutNow:
      …
   }
   return nil
}

最後,結合上述源碼分析,圖3.4更加詳細地描述了Raft的領導者選舉流程。

image-20220411201421105

圖3.4 Raft領導者選舉流程圖

3.2.4.2 日誌複製

在3.2.3節中我們也分析了,對於Leader中生成的塊,Leader會調用etcd的Node介面中的Propose方法來提交寫日誌請求。Propose 內部具體調用stepWithWaitOption實現日誌消息的傳遞,並阻塞/非阻塞地等待結果的返回。

Leader節點調用appendEntry將消息追到Leader的日誌之中,但不進行數據的commit。之後調用bcastAppend 將消息廣播至其他follower節點。

func stepLeader(r *raft, m pb.Message) error {
   switch m.Type {
   case pb.MsgBeat:
      r.bcastHeartbeat()
      return nil
   case pb.MsgCheckQuorum:
      …
   case pb.MsgProp:
      for i, e := range m.Entries {
         if e.Type == pb.EntryConfChange {
            if r.pendingConfIndex > r.raftLog.applied {
               r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
                  e.String(), r.pendingConfIndex, r.raftLog.applied)
               m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
            } else {
               r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
            }
         }
      }
      if !r.appendEntry(m.Entries...) {// appendEntry將消息追到Leader的日誌之中
         return ErrProposalDropped
      }
      r.bcastAppend()/ /bcastAppend 將消息廣播至其他Follower節點。
      return nil
   }
}

Follower節點接收到請求後,會調用handleAppendEntries函數來判斷是否接受Leader提交的日誌。判斷邏輯如下:如果Leader提交的日誌index小於本地已經提交的日誌index則將本地的index回復給Leader。查找追加的日誌和本地log的衝突,如果有衝突,則先找到衝突的位置,用Leader的日誌從衝突位置開始進行覆蓋,日誌追加成功後,返回最新的日誌index至Leader。如何任期資訊不一致,則直接拒絕Leader的追加請求。

func (r *raft) handleAppendEntries(m pb.Message) {
   if m.Index < r.raftLog.committed {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
      return
   }
   if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {// 當前Follower追加日誌,可能存在衝突的情況,需要找到衝突的位置用Leader的日誌進行覆蓋
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
   } else {// 如果兩者的任期資訊不一致,當親節點拒絕此次追加日誌請求,並把最新的日誌index回復給Leader,便於進行追加
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
   }
}

當Leader接收到Follower的響應後,針對拒絕和接收的兩個場景有不同的處理邏輯,這也是保證follower一致性的關鍵環節

  1. 當Leader 確認Follower已經接收了日誌的append請求後,則調用maybeCommit進行提交,在提交過程中確認各個節點返回的matchindex,排序後取中間值比較,如果中間值比本地的commitindex大,就認為超過半數已經認可此次提交,可以進行commit,之後調用sendAppend向所有節點廣播消息,follower接收到請求後調用maybeAppend進行日誌的提交。

  2. 如果Follower拒絕Leader的日誌append請求。Leader接收到拒絕請求後會進入探測狀態,探測follower最新匹配的位置。

1)	func stepLeader(r *raft, m pb.Message) error {
case pb.MsgAppResp:
   pr.RecentActive = true
   if m.Reject {// Follower發送的是拒絕append的響應
      if pr.maybeDecrTo(m.Index, m.RejectHint) {
         if pr.State == ProgressStateReplicate {
            pr.becomeProbe() // 進入試探append階段, 繼續探測follower最新匹配的位置。
         }
         r.sendAppend(m.From)
      }
   } else {
      oldPaused := pr.IsPaused()
      if pr.maybeUpdate(m.Index) {
         switch {
         case pr.State == ProgressStateProbe:
            pr.becomeReplicate()// 日誌追加成功,狀態轉換為複製狀態
         case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
            pr.becomeProbe()
            pr.becomeReplicate()
         case pr.State == ProgressStateReplicate:
            pr.ins.freeTo(m.Index)
         }
         if r.maybeCommit() {// 如果超過半數已經認可此次提交,Leader可以進行commit
            r.bcastAppend()// 廣播通知所有Follower進行日誌的提交
         } else if oldPaused {
            r.sendAppend(m.From)
         }
       }
   }
}

最後,結合上述源碼分析,圖3.5以流程圖的形式更加詳細地描述了Raft的日誌複製流程。

image-20220411201503063

圖3.5 Raft日誌複製源碼分析流程圖

至此,我們已經基本通過源碼來進一步對Raft共識演算法進行分析和理解,特別是該演算法的一些實現細節。需要說明的是,在Raft源碼分析這一節,我並沒有將如何保證安全性這一特性單獨拿出來分析,這是與我們前面的理論部分不同的一個地方。因為Raft共識的安全性主要是通過給演算法添加一些限制條件來保證的,這些特性最終都能在領導者選舉和日誌複製這兩部分的源碼內容中得到體現,所以在源碼分析階段沒有必要單獨成節。

總結

本次源碼與結構分析我選擇的目標區塊鏈系統是Hyperledger Fabric,而我選擇Fabric的原因主要是因為在當前已有的較為成熟的聯盟鏈中,Fabric可以說是最受歡迎的也是應用最廣泛的一個區塊鏈系統,而且它還是現有其他聯盟鏈實現的基礎,很多其他聯盟鏈中都能看到Fabric的設計原理。

本文首先介紹了Fabric中的交易的基本流程,其主要分成提議、排序打包以及驗證提交這三個階段。交易可以說是區塊鏈系統的核心功能,而與其他區塊鏈系統的交易有很大的不同的是,Fabric更加註重交易的隱私性和安全性,通過引入背書機制來加強這些特性。其次,說到交易流程就不得不涉及到共識,因為Fabric也是一個去中心化的分散式賬本,需要完成去中心化系統中各節點對交易的共識才能保證系統的一致性。Fabric提供了可插拔的共識組件,允許用戶選擇不同的適用於不同場景的共識演算法。Fabric官方推薦的是Raft共識演算法,該演算法目前也是比較成熟的一個共識演算法,它是Paxos演算法的一種延伸實現,可以容忍少部分的節點崩潰。本文基於Fabric開源的源程式碼對它的交易流程和共識流程都做了詳細的理論分析,特別是針對Fabric中Raft共識模組,本文花費了大量的篇幅從源程式碼出發來詳細剖析它的實現原理。