etcd學習(6)-etcd實現raft源碼解讀

etcd中raft實現源碼解讀

前言

關於raft的原來可參考etcd學習(5)-etcd的Raft一致性演算法原理

本文次閱讀的etcd程式碼版本v3.5.0

Etcd將raft協議實現為一個library,然後本身作為一個應用使用它。這個庫僅僅實現了對應的raft演算法,對於網路傳輸,磁碟存儲,raft庫沒有做具體的實現,需要用戶自己去實現。

raft實現

先來看幾個源碼中定義的一些變數概念

  • Node: 對etcd-raft模組具體實現的一層封裝,方便上層模組使用etcd-raft模組;

  • 上層模組: etcd-raft的調用者,上層模組通過Node提供的API與底層的etcd-raft模組進行交互;

  • Cluster: 表示一個集群,其中記錄了該集群的基礎資訊;

  • Member: 組層Cluster的元素之一,其中封裝了一個節點的基本資訊;

  • Peer: 集群中某個節點對集群中另一個節點的稱呼;

  • Entry記錄: 節點之間的傳遞是通過message進行的,每條消息中可以攜帶多條Entry記錄,每條Entry對應一條一個獨立的操作

type Entry struct {
	// Term:表示該Entry所在的任期。
	Term  uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
	// Index:當前這個entry在整個raft日誌中的位置索引,有了Term和Index之後,一個`log entry`就能被唯一標識。  
	Index uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
	// 當前entry的類型
	// 目前etcd支援兩種類型:EntryNormal和EntryConfChange 
	// EntryNormaln表示普通的數據操作
	// EntryConfChange表示集群的變更操作
	Type  EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
	// 具體操作使用的數據
	Data  []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}
  • Message: 是所有消息的抽象,包括各種消息所需要的欄位,raft集群中各個節點之前的通訊都是通過這個message進行的。
type Message struct {
	// 該欄位定義了不同的消息類型,etcd-raft就是通過不同的消息類型來進行處理的,etcd中一共定義了19種類型
	Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
	// 消息的目標節點 ID,在急群中每個節點都有一個唯一的id作為標識
	To   uint64      `protobuf:"varint,2,opt,name=to" json:"to"`
	// 發送消息的節點ID
	From uint64      `protobuf:"varint,3,opt,name=from" json:"from"`
	// 整個消息發出去時,所處的任期
	Term uint64      `protobuf:"varint,4,opt,name=term" json:"term"`
	// 該消息攜帶的第一條Entry記錄的的Term值
	LogTerm    uint64   `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
	// 索引值,該索引值和消息的類型有關,不同的消息類型代表的含義不同
	Index      uint64   `protobuf:"varint,6,opt,name=index" json:"index"`
	// 需要存儲的日誌資訊
	Entries    []Entry  `protobuf:"bytes,7,rep,name=entries" json:"entries"`
	// 已經提交的日誌的索引值,用來向別人同步日誌的提交資訊。
	Commit     uint64   `protobuf:"varint,8,opt,name=commit" json:"commit"`
	// 在傳輸快照時,該欄位保存了快照數據
	Snapshot   Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
	// 主要用於響應類型的消息,表示是否拒絕收到的消息。  
	Reject     bool     `protobuf:"varint,10,opt,name=reject" json:"reject"`
	// Follower 節點拒絕 eader 節點的消息之後,會在該欄位記錄 一個Entry索引值供Leader節點。
	RejectHint uint64   `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
	// 攜帶的一些上下文的資訊
	Context    []byte   `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
}
  • raftLog: Raft中日誌同步的核心就是集群中leader如何同步日誌到各個follower。日誌的管理是在raftLog結構上完成的。
type raftLog struct {
	// 用於保存自從最後一次snapshot之後提交的數據
	storage Storage

	// 用於保存還沒有持久化的數據和快照,這些數據最終都會保存到storage中
	unstable unstable

	// 當天提交的日誌數據索引
	committed uint64
	// committed保存是寫入持久化存儲中的最高index,而applied保存的是傳入狀態機中的最高index
	// 即一條日誌首先要提交成功(即committed),才能被applied到狀態機中
	// 因此以下不等式一直成立:applied <= committed
	applied uint64

	logger Logger

	// 調用 nextEnts 時,返回的日誌項集合的最大的大小
	// nextEnts 函數返回應用程式已經可以應用到狀態機的日誌項集合
	maxNextEntsSize uint64
}

看下etcd中的raftexample

這裡先看下etcd中提供的raftexample來簡單連接下etcd中raft的使用

這裡放一張raftexample總體的架構圖

etcd

raftexample 是一個etcd raft library的使用示例。它為Raft一致性演算法的鍵值對集群存儲提供了一個簡單的REST API

該包提供了goreman啟動集群的方式,使用goreman start啟動,可以很清楚的看到raft在啟動過程中的選舉過程,能夠很好的幫助我們理解raft的選舉過程

etcd

來看下幾個主要的函數實現

newRaftNode

在該函數中主要完成了raftNode的初始化 。在該方法中會使用上層模組傳入的配置資訊(其中包括proposeC通道和confChangeC通道)來創建raftNode實例,同時會創建commitC通道和errorC通道返回給上層模組使用 。這樣,上層模組就可以通過這幾個通道與rafeNode實例進行 交互了。另外,newRaftNode()函數中還會啟動一個獨立的後台goroutine來完成回放WAL日誌、 啟動網路組件等初始化操作。

// 主要完成了raftNode的初始化
// 使用上層模組傳入的配置資訊來創建raftNode實例,同時創建commitC 通道和errorC通道返回給上層模組使用
// 上層的應用通過這幾個channel就能和raftNode進行交互
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
	confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
	// channel,主要傳輸Entry記錄
	// raftNode會將etcd-raft模組返回的待應用Entry記
	// 錄(封裝在 Ready實例中〉寫入commitC通道,另一方面,kvstore會從commitC通
	// 道中讀取這些待應用的 Entry 記錄井保存其中的鍵值對資訊。
	commitC := make(chan *commit)
	errorC := make(chan error)

	rc := &raftNode{
		proposeC:    proposeC,
		confChangeC: confChangeC,
		commitC:     commitC,
		errorC:      errorC,
		id:          id,
		peers:       peers,
		join:        join,
		// 初始化存放 WAL 日誌和 Snapshot 文件的的目錄
		waldir:      fmt.Sprintf("raftexample-%d", id),
		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
		getSnapshot: getSnapshot,
		snapCount:   defaultSnapshotCount,
		stopc:       make(chan struct{}),
		httpstopc:   make(chan struct{}),
		httpdonec:   make(chan struct{}),

		logger: zap.NewExample(),

		snapshotterReady: make(chan *snap.Snapshotter, 1),
		// rest of structure populated after WAL replay
	}
	// 啟動一個goroutine,完成剩餘的初始化工作
	go rc.startRaft()
	return commitC, errorC, rc.snapshotterReady
}

startRaft

1、創建 Snapshotter,並將該 Snapshotter 實例返回給上層模組;

2、創建 WAL 實例,然後載入快照並回放 WAL 日誌;

3、創建 raft.Config 實例,其中包含了啟動 etcd-raft 模組的所有配置;

4、初始化底層 etcd-raft 模組,得到 node 實例;

5、創建 Transport 實例,該實例負責集群中各個節點之間的網路通訊,其具體實現在 raft-http 包中;

6、建立與集群中其他節點的網路連接;

7、啟動網路組件,其中會監聽當前節點與集群中其他節點之間的網路連接,並進行節點之間的消息讀寫;

8、啟動兩個後台的 goroutine,它們主要工作是處理上層模組與底層 etcd-raft 模組的交互,但處理的具體內容不同,後面會詳細介紹這兩個 goroutine 的處理流程。

func (rc *raftNode) startRaft() {
	if !fileutil.Exist(rc.snapdir) {
		if err := os.Mkdir(rc.snapdir, 0750); err != nil {
			log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
		}
	}
	rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
	// 創建 WAL 實例,然後載入快照並回放 WAL 日誌
	oldwal := wal.Exist(rc.waldir)

	// raftNode.replayWAL() 方法首先會讀取快照數據,
	//在快照數據中記錄了該快照包含的最後一條 Entry 記錄的 Term 值 和 索引值。
	//然後根據 Term 值 和 索引值確定讀取 WAL 日誌文件的位置, 並進行日誌記錄的讀取。
	rc.wal = rc.replayWAL()

	// signal replay has finished
	rc.snapshotterReady <- rc.snapshotter

	rpeers := make([]raft.Peer, len(rc.peers))
	for i := range rpeers {
		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
	}
	// 創建 raft.Config 實例
	c := &raft.Config{
		ID: uint64(rc.id),
		// 選舉超時
		ElectionTick: 10,
		// 心跳超時
		HeartbeatTick:             1,
		Storage:                   rc.raftStorage,
		MaxSizePerMsg:             1024 * 1024,
		MaxInflightMsgs:           256,
		MaxUncommittedEntriesSize: 1 << 30,
	}
	// 初始化底層的 etcd-raft 模組,這裡會根據 WAL 日誌的回放情況,
	// 判斷當前節點是首次啟動還是重新啟動
	if oldwal || rc.join {
		rc.node = raft.RestartNode(c)
	} else {
		// 初次啟動
		rc.node = raft.StartNode(c, rpeers)
	}
	// 創建 Transport 實例並啟動,他負責 raft 節點之間的網路通訊服務
	rc.transport = &rafthttp.Transport{
		Logger:      rc.logger,
		ID:          types.ID(rc.id),
		ClusterID:   0x1000,
		Raft:        rc,
		ServerStats: stats.NewServerStats("", ""),
		LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
		ErrorC:      make(chan error),
	}
	// 啟動網路服務相關組件
	rc.transport.Start()
	// 建立與集群中其他各個節點的連接
	for i := range rc.peers {
		if i+1 != rc.id {
			rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
		}
	}
	// 啟動一個goroutine,其中會監聽當前節點與集群中其他節點之間的網路連接
	go rc.serveRaft()
	// 啟動後台 goroutine 處理上層應用與底層 etcd-raft 模組的交互
	go rc.serveChannels()
}

serveChannels

處理上層應用與底層etcd-raft模組的交互

// 會單獨啟動一個後台 goroutine來負責上層模組 傳遞給 etcd-ra企 模組的數據,
// 主要 處理前面介紹的 proposeC、 confChangeC 兩個通道
func (rc *raftNode) serveChannels() {
	// 這裡是獲取快照數據和快照的元數據
	snap, err := rc.raftStorage.Snapshot()
	if err != nil {
		panic(err)
	}
	rc.confState = snap.Metadata.ConfState
	rc.snapshotIndex = snap.Metadata.Index
	rc.appliedIndex = snap.Metadata.Index

	defer rc.wal.Close()

	// 創建一個每隔 lOOms 觸發一次的定時器,那麼在邏輯上,lOOms 即是 etcd-raft 組件的最小時間單位 ,
	// 該定時器每觸發一次,則邏輯時鐘推進一次
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	// 單獨啟 動一個 goroutine 負責將 proposeC、 confChangeC 遠遠上接收到
	// 的數據傳遞給 etcd-raft 組件進行處理
	go func() {
		confChangeCount := uint64(0)

		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
			case prop, ok := <-rc.proposeC:
				if !ok {
					// 發生異常將proposeC置空
					rc.proposeC = nil
				} else {
					// 阻塞直到消息被處理
					rc.node.Propose(context.TODO(), []byte(prop))
				}
				// 收到上層應用通過 confChangeC遠遠傳遞過來的數據
			case cc, ok := <-rc.confChangeC:
				if !ok {
					// 如果發生異常將confChangeC置空
					rc.confChangeC = nil
				} else {
					confChangeCount++
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// 關閉 stopc 通道,觸發 rafeNode.stop() 方法的調用
		close(rc.stopc)
	}()

	// 處理 etcd-raft 模組返回給上層模組的數據及其他相關的操作
	for {
		select {
		case <-ticker.C:
			// 上述 ticker 定時器觸發一次
			rc.node.Tick()

		// 讀取 node.readyc 通道
		// 該通道是 etcd-raft 組件與上層應用交互的主要channel之一
		// 其中傳遞的 Ready 實例也封裝了很多資訊
		case rd := <-rc.node.Ready():
			// 將當前 etcd raft 組件的狀態資訊,以及待持久化的 Entry 記錄先記錄到 WAL 日誌文件中,
			// 即使之後宕機,這些資訊也可以在節點下次啟動時,通過前面回放 WAL 日誌的方式進行恢復
			rc.wal.Save(rd.HardState, rd.Entries)
			// 檢測到 etcd-raft 組件生成了新的快照數據
			if !raft.IsEmptySnap(rd.Snapshot) {
				// 將新的快照數據寫入快照文件中
				rc.saveSnap(rd.Snapshot)
				// 將新快照持久化到 raftStorage
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				// 通知上層應用載入新快照
				rc.publishSnapshot(rd.Snapshot)
			}
			// 將待持久化的 Entry 記錄追加到 raftStorage 中完成持久化
			rc.raftStorage.Append(rd.Entries)
			// 將待發送的消息發送到指定節點
			rc.transport.Send(rd.Messages)
			// 將已提交、待應用的 Entry 記錄應用到上層應用的狀態機中
			applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
			if !ok {
				rc.stop()
				return
			}

			// 隨著節點的運行, WAL 日誌量和 raftLog.storage 中的 Entry 記錄會不斷增加 ,
			// 所以節點每處理 10000 條(默認值) Entry 記錄,就會觸發一次創建快照的過程,
			// 同時 WAL 會釋放一些日誌文件的句柄,raftLog.storage 也會壓縮其保存的 Entry 記錄
			rc.maybeTriggerSnapshot(applyDoneC)
			// 上層應用處理完該 Ready 實例,通知 etcd-raft 紐件準備返回下一個 Ready 實例
			rc.node.Advance()

		case err := <-rc.transport.ErrorC:
			rc.writeError(err)
			return

		case <-rc.stopc:
			rc.stop()
			return
		}
	}
}

領導者選舉

啟動並初始化node節點

對於node來講,剛被出初始化的時候就是follower狀態,當集群中的節點初次啟動時會通過StartNode()函數啟動創建對應的node實例和底層的raft實例。在StartNode()方法中,主要是根據傳入的config配置創建raft實例並初始raft負使用的相關組件。

// etcd/raft/node.go
// Peer封裝了節點的ID, peers記錄了當前集群中全部節點的ID
func StartNode(c *Config, peers []Peer) Node {
	if len(peers) == 0 {
		panic("no peers given; use RestartNode instead")
	}
	// 根據config資訊初始化RawNode
	// 同時也會初始化一個raft
	rn, err := NewRawNode(c)
	if err != nil {
		panic(err)
	}
	// 第一次使用初始化RawNode
	err = rn.Bootstrap(peers)
	if err != nil {
		c.Logger.Warningf("error occurred during starting a new node: %v", err)
	}
	// 初始化node實例
	n := newNode(rn)

	go n.run()
	return &n
}

func NewRawNode(config *Config) (*RawNode, error) {
	// 這裡調用初始化newRaft
	r := newRaft(config)
	rn := &RawNode{
		raft: r,
	}
	rn.prevSoftSt = r.softState()
	rn.prevHardSt = r.hardState()
	return rn, nil
}

func newRaft(c *Config) *raft {
	...
	r := &raft{
		id:                        c.ID,
		lead:                      None,
		isLearner:                 false,
		raftLog:                   raftlog,
		maxMsgSize:                c.MaxSizePerMsg,
		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
		electionTimeout:           c.ElectionTick,
		heartbeatTimeout:          c.HeartbeatTick,
		logger:                    c.Logger,
		checkQuorum:               c.CheckQuorum,
		preVote:                   c.PreVote,
		readOnly:                  newReadOnly(c.ReadOnlyOption),
		disableProposalForwarding: c.DisableProposalForwarding,
	}

	...
	// 啟動都是follower狀態
	r.becomeFollower(r.Term, None)

	var nodesStrs []string
	for _, n := range r.prs.VoterNodes() {
		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
	}

	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
	return r
}

總結:

進行node節點初始化工作,所有的Node開始都被初始化為Follower狀態

重點來看下run

func (n *node) run() {
	...

	for {
		...
		select {
		case pm := <-propc:
			...
			r.Step(m)
		case m := <-n.recvc:
			...
			r.Step(m)
		case cc := <-n.confc:
			...
		case <-n.tickc:
			n.rn.Tick()
		case readyc <- rd:
			n.rn.acceptReady(rd)
			advancec = n.advancec
		case <-advancec:
			n.rn.Advance(rd)
			rd = Ready{}
			advancec = nil
		case c := <-n.status:
			c <- getStatus(r)
		case <-n.stop:
			close(n.done)
			return
		}
	}
}

總結:

主要是通過for-select-channel監聽channel資訊,來處理不同的請求

來看下幾個主要的channel資訊

propc和recvc中拿到的是從上層應用傳進來的消息,這個消息會被交給raft層的Step函數處理。

func (r *raft) Step(m pb.Message) error {
	//...
	switch m.Type {
	case pb.MsgHup:
	//...
	case pb.MsgVote, pb.MsgPreVote:
	//...
	default:
		r.step(r, m)
	}
}

總結:

Step是etcd-raft模組負責各類資訊的入口

default後面的step,被實現為一個狀態機,它的step屬性是一個函數指針,根據當前節點的不同角色,指向不同的消息處理函數:stepLeader/stepFollower/stepCandidate。與它類似的還有一個tick函數指針,根據角色的不同,也會在tickHeartbeat和tickElection之間來回切換,分別用來觸發定時心跳和選舉檢測。

發送心跳包

作為leader

當一個節點成為leader的時候,會將節點的定時器設置為tickHeartbeat,然後周期性的調用,維持leader的地位

func (r *raft) becomeLeader() {
	// 檢測當 前節點的狀態,禁止從 follower 狀態切換成 leader 狀態
	if r.state == StateFollower {
		panic("invalid transition [follower -> leader]")
	}
	// 將step 欄位設置成 stepLeader
	r.step = stepLeader
	r.reset(r.Term)
	// 設置心跳的函數
	r.tick = r.tickHeartbeat
	// 設置lead的id值
	r.lead = r.id
	// 更新當前的角色
	r.state = StateLeader
	...
}

func (r *raft) tickHeartbeat() {
	// 遞增心跳計數器
	r.heartbeatElapsed++
	// 遞增選舉計數器
	r.electionElapsed++
	...

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		// 檢測當前節點時候大多數節點保持連通
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
		if r.state == StateLeader && r.leadTransferee != None {
			r.abortLeaderTransfer()
		}
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

becomeLeader中的step被設置成stepLeader,所以將會調用stepLeader來處理leader中對應的消息

通過調用bcastHeartbeat向所有的節點發送心跳

func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgBeat:
		// 向所有節點發送心跳
		r.bcastHeartbeat()
		return nil
	case pb.MsgCheckQuorum:
		// 檢測是否和大部分節點保持連通
		// 如果不連通切換到follower狀態
		if !r.prs.QuorumActive() {
			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
			r.becomeFollower(r.Term, None)
		}
		return nil
		...
	}
}

// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
	lastCtx := r.readOnly.lastPendingRequestCtx()
	// 這兩個函數最終都將調用sendHeartbeat
	if len(lastCtx) == 0 {
		r.bcastHeartbeatWithCtx(nil)
	} else {
		r.bcastHeartbeatWithCtx([]byte(lastCtx))
	}
}

// 向指定的節點發送資訊
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
	m := pb.Message{
		To:      to,
		// 發送MsgHeartbeat類型的數據
		Type:    pb.MsgHeartbeat,
		Commit:  commit,
		Context: ctx,
	}

	r.send(m)
}

最終的心跳通過MsgHeartbeat的消息類型進行發送,通知它們目前Leader的存活狀態,重置所有Follower持有的超時計時器

作為follower

1、接收到來自leader的RPC消息MsgHeartbeat;

2、然後重置當前節點的選舉超時時間;

3、回復leader自己的存活。

func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		...
	case pb.MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)
		...
	}
	return nil
}

func (r *raft) handleHeartbeat(m pb.Message) {
	r.raftLog.commitTo(m.Commit)
	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
作為candidate

candidate來處理MsgHeartbeat的資訊,是先把自己變成follower,然後和上面的follower一樣,回復leader自己的存活。

func stepCandidate(r *raft, m pb.Message) error {
	...
	switch m.Type {
		...
	case pb.MsgHeartbeat:
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleHeartbeat(m)
	}
	...
	return nil
}

func (r *raft) handleHeartbeat(m pb.Message) {
	r.raftLog.commitTo(m.Commit)
	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

當leader收到返回的資訊的時候,會將對應的節點設置為RecentActive,表示該節點目前存活

func stepLeader(r *raft, m pb.Message) error {
	...
	// 根據from,取出當前的follower的Progress
	pr := r.prs.Progress[m.From]
	if pr == nil {
		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
		return nil
	}
	switch m.Type {
	case pb.MsgHeartbeatResp:
		pr.RecentActive = true
		...
	}
	return nil
}

如果follower在一定的時間內,沒有收到leader節點的消息,就會發起新一輪的選舉,重新選一個leader節點

leader選舉

1、接收leader的心跳
func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

// follower以及candidate的tick函數,在r.electionTimeout之後被調用
func (r *raft) tickElection() {
	r.electionElapsed++
	// promotable返回是否可以被提升為leader
	// pastElectionTimeout檢測當前的候選超時間是否過期
	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		// 發起選舉
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

總結:

1、如果可以成為leader;

2、沒有收到leader的心跳,候選超時時間過期了;

3、重新發起新的選舉請求。

2、發起競選

Step函數看到MsgHup這個消息後會調用campaign函數,進入競選狀態

func (r *raft) Step(m pb.Message) error {
	//...
	switch m.Type {
	case pb.MsgHup:
		if r.preVote {
			r.hup(campaignPreElection)
		} else {
			r.hup(campaignElection)
		}
	}
}

func (r *raft) hup(t CampaignType) {
	...
	r.campaign(t)
}

func (r *raft) campaign(t CampaignType) {
	...
	if t == campaignPreElection {
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1
	} else {
		// 切換到Candidate狀態
		r.becomeCandidate()
		voteMsg = pb.MsgVote
		term = r.Term
	}
	// 統計當前節點收到的選票 並統計其得票數是否超過半數,這次檢測主要是為單節點設置的
	// 判斷是否是單節點
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			// 是單節點直接,變成leader
			r.becomeLeader()
		}
		return
	}
	...
	// 向集群中的所有節點發送資訊,請求投票
	for _, id := range ids {
		// 跳過自身的節點
		if id == r.id {
			continue
		}
		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

總結:

主要是切換到campaign狀態,然後將自己的term資訊發送出去,請求投票。

這裡我們能看到對於Candidate會有一個PreCandidate,PreCandidate這個狀態的作用的是什麼呢?

當系統曾出現分區,分區消失後恢復的時候,可能會造成某個被split的Follower的Term數值很大。

對伺服器進行分區時,它將不會收到heartbeat包,每次electionTimeout後成為Candidate都會遞增Term。

當伺服器在一段時間後恢復連接時,Term的值將會變得很大,然後引入的重新選舉會導致導致臨時的延遲與可用性問題。

PreElection階段並不會真正增加當前節點的Term,它的主要作用是得到當前集群能否成功選舉出一個Leader的答案,避免上面這種情況的發生。

接著Candidate的狀態來分析

3、其他節點收到資訊,進行投票

能夠投票需要滿足下麵條件:

1、當前節點沒有給任何節點投票 或 投票的節點term大於本節點的 或者 是之前已經投票的節點;

2、該節點的消息是最新的;

func (r *raft) Step(m pb.Message) error {
	...
	switch m.Type {
	case pb.MsgVote, pb.MsgPreVote:
		// We can vote if this is a repeat of a vote we've already cast...
		canVote := r.Vote == m.From ||
			// ...we haven't voted and we don't think there's a leader yet in this term...
			(r.Vote == None && r.lead == None) ||
			// ...or this is a PreVote for a future term...
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
		// ...and we believe the candidate is up to date.
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			// 如果當前沒有給任何節點投票(r.Vote == None)或者投票的節點term大於本節點的(m.Term > r.Term)
			// 或者是之前已經投票的節點(r.Vote == m.From)
			// 同時還滿足該節點的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那麼就接收這個節點的投票
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// 保存下來給哪個節點投票了
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}

		...
	}
	return nil
}
4、candidate節點統計投票的結果

candidate節點接收到投票的資訊,然後統計投票的數量

1、如果投票數大於節點數的一半,成為leader;

2、如果達不到,變成follower;

func stepCandidate(r *raft, m pb.Message) error {
	// Only handle vote responses corresponding to our candidacy (while in
	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	// our pre-candidate state).
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case myVoteRespType:
		// 計算當前集群中有多少節點給自己投了票
		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
		switch res {
		// 大多數投票了
		case quorum.VoteWon:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				// 如果進行投票的節點數量正好是半數以上節點數量
				r.becomeLeader()
				// 向集群中其他節點廣 MsgApp 消息
				r.bcastAppend()
			}
			// 票數不夠
		case quorum.VoteLost:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			// 切換到follower
			r.becomeFollower(r.Term, None)
		}
		...
	}
	return nil
}

每當收到一個MsgVoteResp類型的消息時,就會設置當前節點持有的votes數組,更新其中存儲的節點投票狀態,如果收到大多數的節點票數,切換成leader,向其他的節點發送當前節點當選的消息,通知其餘節點更新Raft結構體中的Term等資訊。

上面涉及到幾種狀態的切換

正常情況只有3種狀態

etcd

為了防止在分區的情況下,某個split的Follower的Term數值變得很大的場景,引入了PreCandidate

etcd

對於不同節點之間的切換,調用的對應的bacome*方法就可以了

這裡需要注意的就是對應的每個bacome*中的step和tick

func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateLeader {
		panic("invalid transition [leader -> candidate]")
	}
	r.step = stepCandidate
	r.reset(r.Term + 1)
	r.tick = r.tickElection
	r.Vote = r.id
	r.state = StateCandidate
	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

它的step屬性是一個函數指針,根據當前節點的不同角色,指向不同的消息處理函數:stepLeader/stepFollower/stepCandidate。

tick也是一個函數指針,根據角色的不同,也會在tickHeartbeat和tickElection之間來回切換,分別用來觸發定時心跳和選舉檢測。

日誌同步

WAL日誌

WAL(Write Ahead Log)最大的作用是記錄了整個數據變化的全部歷程。在etcd中,所有數據的修改在提交前,都要先寫入到WAL中。使用WAL進行數據的存儲使得etcd擁有兩個重要功能。

  • 故障快速恢復: 當你的數據遭到破壞時,就可以通過執行所有WAL中記錄的修改操作,快速從最原始的數據恢復到數據損壞前的狀態。

  • 數據回滾(undo)/重做(redo):因為所有的修改操作都被記錄在WAL中,需要回滾或重做,只需要反向或正向執行日誌中的操作即可。

這裡發放一張關於etcd中處理Entry記錄的流程圖(圖片摘自【etcd技術內幕】)

etcd

具體的流程:

  • 1、客戶端向etcd集群發起一次請求,請求中封裝的Entry首先會交給etcd-raft處理,etcd-raft會將Entry記錄保存到raftLog.unstable中;

  • 2、etcd-raft將Entry記錄封裝到Ready實例中,返回給上層模組進行持久化;

  • 3、上層模組收到持久化的Ready記錄之後,會記錄到WAL文件中,然後進行持久化,最後通知etcd-raft模組進行處理;

  • 4、etcd-raft將該Entry記錄從unstable中移到storage中保存;

  • 5、當該Entry記錄被複制到集區中的半數以上節點的時候,該Entry記錄會被Lader節點認為是已經提交了,封裝到Ready實例中通知上層模組;

  • 6、此時上層模組將該Ready實例封裝的Entry記錄應用到狀態機中。

leader同步follower日誌

這裡放一張etcd中leader節點同步數據到follower的流程圖

etcd

etcd日誌的保存總體流程如下:

1、集群某個節點收到client的put請求要求修改數據。節點會生成一個Type為MsgProp的Message,發送給leader。

// 生成MsgProp消息
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return ErrProposalDropped
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
			return ErrProposalDropped
		}
		// 設置發送的目標為leader
		// 將資訊發送給leader
		m.To = r.lead
		r.send(m)
	}
	return nil
}

2、leader收到Message以後,會處理Message中的日誌條目,將其append到raftLog的unstable的日誌中,並且調用bcastAppend()廣播append日誌的消息。

func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
		...
	case pb.MsgProp:
		...
		// 將Entry記錄追加到當前節點的raftlog中
		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		// 向其他節點複製Entry記錄
		r.bcastAppend()
		return nil
		...
	}
	return nil
}

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.prs.Progress[to]
	if pr.IsPaused() {
		return false
	}
	m := pb.Message{}
	m.To = to
	...
	m.Type = pb.MsgApp
	m.Index = pr.Next - 1
	m.LogTerm = term
	m.Entries = ents
	m.Commit = r.raftLog.committed
	if n := len(m.Entries); n != 0 {
		switch pr.State {
		// optimistically increase the next when in StateReplicate
		case tracker.StateReplicate:
			last := m.Entries[n-1].Index
			pr.OptimisticUpdate(last)
			pr.Inflights.Add(last)
		case tracker.StateProbe:
			pr.ProbeSent = true
		default:
			r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
		}
	}
	r.send(m)
	return true
}

3、leader中的消息最終會以MsgApp類型的消息通知follower,follower收到這些資訊之後,同leader一樣,先將快取中的日誌條目持久化到磁碟中並將當前已經持久化的最新日誌index返回給leader。

func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgApp:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleAppendEntries(m)
	}
	return nil
}

func (r *raft) handleAppendEntries(m pb.Message) {
	....
	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	}
	...
}

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
	...
	l.commitTo(min(committed, lastnewi))
	...
	return 0, false
}

func (l *raftLog) commitTo(tocommit uint64) {
	// never decrease commit
	if l.committed < tocommit {
		if l.lastIndex() < tocommit {
			l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
		}
		l.committed = tocommit
	}
}

4、最後leader收到大多數的follower的確認,commit自己的log,同時再次廣播通知follower自己已經提交了。

func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
		...
	case pb.MsgAppResp:
		pr.RecentActive = true
		if r.maybeCommit() {
			releasePendingReadIndexMessages(r)
			// 如果可以commit日誌,那麼廣播append消息
			r.bcastAppend()
		} else if oldPaused {
			// 如果該節點之前狀態是暫停,繼續發送append消息給它
			r.sendAppend(m.From)
		}
		...
	}
	return nil
}

// 嘗試提交索引,如果已經提交返回true
// 然後應該調用bcastAppend通知所有的follower
func (r *raft) maybeCommit() bool {
	mci := r.prs.Committed()
	return r.raftLog.maybeCommit(mci, r.Term)
}

// 提交修改committed就可以了
func (l *raftLog) commitTo(tocommit uint64) {
	// never decrease commit
	if l.committed < tocommit {
		if l.lastIndex() < tocommit {
			l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
		}
		l.committed = tocommit
	}
}

總結

1、etcd中的raft是作為一個library,然後本身作為一個應用使用它。這個庫僅僅實現了對應的raft演算法;

2、etcd-raft這種實現的過程,其中的step和tick被設計成了函數指針,根據不同的角色來防止不同的函數;

3、為了防止出現網路分區Term數值變得很大的場景,引入了PreCandidate;

4、etcd中所有的數據都是通過leader分發到follower,通過日誌的複製確認機制,保證絕大多數的follower都能同步到消息。

參考

【etcd技術內幕】一本關於etcd不錯的書籍
【高可用分散式存儲 etcd 的實現原理】//draveness.me/etcd-introduction/
【Raft 在 etcd 中的實現】//blog.betacat.io/post/raft-implementation-in-etcd/
【etcd Raft庫解析】//www.codedump.info/post/20180922-etcd-raft/
【etcd raft 設計與實現《一》】//zhuanlan.zhihu.com/p/51063866
【raftexample 源碼解讀】//zhuanlan.zhihu.com/p/91314329
【etcd實現-全流程分析】//zhuanlan.zhihu.com/p/135891186
【線性一致性和Raft】//pingcap.com/zh/blog/linearizability-and-raft
【etcd raft 設計與實現《二》】//zhuanlan.zhihu.com/p/51065416
【《深入淺出etcd》part 3 – 解析etcd的日誌同步機制】//mp.weixin.qq.com/s/o_g5z77VZbImgTqjNBSktA

Tags: