ETCD核心機制解析

ETCD整體機制

etcd 是一個分散式的、可靠的 key-value 存儲系統,它適用於存儲分散式系統中的關鍵數據。

etcd 集群中多個節點之間通過Raft演算法完成分散式一致性協同,演算法會選舉出一個主節點作為 leader,由 leader 負責數據的同步與分發。當 leader 出現故障後系統會自動地重新選取另一個節點成為 leader,並重新完成數據的同步。

etcd集群實現高可用主要是基於quorum機制,即:集群中半數以上的節點可用時,集群才可繼續提供服務,quorum機制在分散式一致性演算法中應用非常廣泛,此處不再詳細闡述。

raft數據更新和etcd調用是基於兩階段機制:

第一階段 leader記錄log (uncommited);日誌複製到follower;follower響應,操作成功,響應客戶端;調用者調用leader,leader會將kv數據存儲在日誌中,並利用實時演算法raft進行複製

第二階段 leader commit;通知follower;當複製給了N+1個節點後,本地提交,返回給客戶端,最後leader非同步通知follower完成通知

 

ETCD核心API分析

etcd提供的api主要有kv相關、lease相關及watch,查看其源碼可知:

kv相關介面:

type KV interface {
	// Put puts a key-value pair into etcd.
	// Note that key,value can be plain bytes array and string is
	// an immutable representation of that bytes array.
	// To get a string of bytes, do string([]byte{0x10, 0x20}).
	Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

	// Get retrieves keys.
	// By default, Get will return the value for "key", if any.
	// When passed WithRange(end), Get will return the keys in the range [key, end).
	// When passed WithFromKey(), Get returns keys greater than or equal to key.
	// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
	// if the required revision is compacted, the request will fail with ErrCompacted .
	// When passed WithLimit(limit), the number of returned keys is bounded by limit.
	// When passed WithSort(), the keys will be sorted.
	Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

	// Delete deletes a key, or optionally using WithRange(end), [key, end).
	Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

	// Compact compacts etcd KV history before the given rev.
	Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

	// Txn creates a transaction.
	Txn(ctx context.Context) Txn
}

主要有Put、Get、Delete、Compact、Do和Txn方法;Put用於向etcd集群中寫入消息,以key value的形式存儲;Get可以根據key查看其對應存儲在etcd中的數據;Delete通過刪除key來刪除etcd中的數據;Compact 方法用於壓縮 etcd 鍵值對存儲中的事件歷史,避免事件歷史無限制的持續增長;Txn 方法在單個事務中處理多個請求,etcd事務模式為:

if compare

then op

else op

commit

lease相關介面:

type Lease interface {
	// Grant creates a new lease.
	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

	// Revoke revokes the given lease.
	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

	// TimeToLive retrieves the lease information of the given lease ID.
	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

	// Leases retrieves all leases.
	Leases(ctx context.Context) (*LeaseLeasesResponse, error)

	// KeepAlive keeps the given lease alive forever. If the keepalive response
	// posted to the channel is not consumed immediately, the lease client will
	// continue sending keep alive requests to the etcd server at least every
	// second until latest response is consumed.
	//
	// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
	// alive stream is interrupted in some way the client cannot handle itself;
	// given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
	// from this closed channel is nil.
	//
	// If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
	// no leader") or canceled by the caller (e.g. context.Canceled), the error
	// is returned. Otherwise, it retries.
	//
	// TODO(v4.0): post errors to last keep alive message before closing
	// (see //github.com/coreos/etcd/pull/7866)
	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

	// KeepAliveOnce renews the lease once. The response corresponds to the
	// first message from calling KeepAlive. If the response has a recoverable
	// error, KeepAliveOnce will retry the RPC with a new keep alive message.
	//
	// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

	// Close releases all resources Lease keeps for efficient communication
	// with the etcd server.
	Close() error
}

lease 是分散式系統中一個常見的概念,用於代表一個分散式租約。典型情況下,在分散式系統中需要去檢測一個節點是否存活的時,就需要租約機制。

Grant方法用於創建一個租約,當伺服器在給定 time to live 時間內沒有接收到 keepAlive 時租約過期;Revoke撤銷一個租約,所有附加到租約的key將過期並被刪除;TimeToLive 獲取租約資訊;KeepAlive 通過從客戶端到伺服器端的流化的 keep alive 請求和從伺服器端到客戶端的流化的 keep alive 應答來維持租約;檢測分散式系統中一個進程是否存活,可以在進程中去創建一個租約,並在該進程中周期性的調用 KeepAlive 的方法。如果一切正常,該節點的租約會一致保持,如果這個進程掛掉了,最終這個租約就會自動過期, etcd 中,允許將多個 key 關聯在同一個 lease 之上,可以大幅減少 lease 對象刷新帶來的開銷。

watch相關介面:

type Watcher interface {
	// Watch watches on a key or prefix. The watched events will be returned
	// through the returned channel. If revisions waiting to be sent over the
	// watch are compacted, then the watch will be canceled by the server, the
	// client will post a compacted error watch response, and the channel will close.
	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
	// to release the associated resources.
	//
	// If the context is "context.Background/TODO", returned "WatchChan" will
	// not be closed and block until event is triggered, except when server
	// returns a non-recoverable error (e.g. ErrCompacted).
	// For example, when context passed with "WithRequireLeader" and the
	// connected server has no leader (e.g. due to network partition),
	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
	// and then "WatchChan" is closed with non-nil "Err()".
	// In order to prevent a watch stream being stuck in a partitioned node,
	// make sure to wrap context with "WithRequireLeader".
	//
	// Otherwise, as long as the context has not been canceled or timed out,
	// watch will retry on other recoverable errors forever until reconnected.
	//
	// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
	// Currently, client contexts are overwritten with "valCtx" that never closes.
	// TODO(v3.4): configure watch retry policy, limit maximum retry number
	// (see //github.com/etcd-io/etcd/issues/8980)
	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

	// RequestProgress requests a progress notify response be sent in all watch channels.
	RequestProgress(ctx context.Context) error

	// Close closes the watcher and cancels all watch requests.
	Close() error
}

etcd 的Watch 機制可以實時地訂閱到 etcd 中增量的數據更新,watch 支援指定單個 key,也可以指定一個 key 的前綴。Watch 觀察將要發生或者已經發生的事件,輸入和輸出都是流;輸入流用於創建和取消觀察,輸出流發送事件。一個觀察 RPC 可以在一次性在多個key範圍上觀察,並為多個觀察流化事件,整個事件歷史可以從最後壓縮修訂版本開始觀察。

 

ETCD數據版本機制

etcd數據版本中主要有term表示leader的任期,revision 代表的是全局數據的版本。當集群發生 Leader 切換,term 的值就會 +1,在節點故障,或者 Leader 節點網路出現問題,再或者是將整個集群停止後再次拉起,都會發生 Leader 的切換;當數據發生變更,包括創建、修改、刪除,其 revision 對應的都會 +1,在集群中跨 Leader 任期之間,revision 都會保持全局單調遞增,集群中任意一次的修改都對應著一個唯一的 revision,因此我們可以通過 revision 來支援數據的 MVCC,也可以支援數據的 Watch。

對於每一個 KeyValue 數據節點,etcd 中都記錄了三個版本:

  • 第一個版本叫做 create_revision,是 KeyValue 在創建時對應的 revision;
  • 第二個叫做 mod_revision,是其數據被操作的時候對應的 revision;
  • 第三個 version 就是一個計數器,代表了 KeyValue 被修改了多少次。

在同一個 Leader 任期之內,所有的修改操作,其對應的 term 值始終相等,而 revision 則保持單調遞增。當重啟集群之後,所有的修改操作對應的 term 值都加1了。

 

ETCD之MVCC並發控制

說起mvcc大家都不陌生,mysql的innodb中就使用mvcc實現高並發的數據訪問,對數據進行多版本處理,並通過事務的可見性來保證事務能看到自己應該看到的數據版本,同樣,在etcd中也使用mvcc進行並發控制。

etcd支援對同一個 Key 發起多次數據修改,每次數據修改都對應一個版本號。etcd記錄了每一次修改對應的數據,即一個 key 在 etcd 中存在多個歷史版本。在查詢數據的時候如果不指定版本號,etcd 會返回 Key 對應的最新版本,同時etcd 也支援指定一個版本號來查詢歷史數據。

etcd將每一次修改都記錄下來,使用 watch訂閱數據時,可以支援從任意歷史時刻(指定 revision)開始創建一個 watcher,在客戶端與 etcd 之間建立一個數據管道,etcd 會推送從指定 revision 開始的所有數據變更。etcd 提供的 watch 機制保證,該 Key 的數據後續的被修改之後,通過這個數據管道即時的推送給客戶端。

分析其源碼可知:

type revision struct {
	// main is the main revision of a set of changes that happen atomically.
	main int64

	// sub is the the sub revision of a change in a set of changes that happen
	// atomically. Each change has different increasing sub revision in that
	// set.
	sub int64
}

func (a revision) GreaterThan(b revision) bool {
	if a.main > b.main {
		return true
	}
	if a.main < b.main {
		return false
	}
	return a.sub > b.sub
}

在etcd的mvcc實現中有一個revision結構體,main 表示當前操作的事務 id,全局自增的邏輯時間戳,sub 表示當前操作在事務內部的子 id,事務內自增,從 0 開始;通過GreaterThan方法進行事務版本的比較。

 

ETCD存儲數據結構

etcd 中所有的數據都存儲在一個 btree的數據結構中,該btree保存在磁碟中,並通過mmap的方式映射到記憶體用來支援快速的訪問,treeIndex的定義如下:

type treeIndex struct {
	sync.RWMutex
	tree *btree.BTree
}

func newTreeIndex() index {
	return &treeIndex{
		tree: btree.New(32),
	}
}

index所綁定對btree的操作有Put、Get、Revision、Range及Visit等,以Put方法為例,其源碼如下:

func (ti *treeIndex) Put(key []byte, rev revision) {
	keyi := &keyIndex{key: key}

	ti.Lock()
	defer ti.Unlock()
	item := ti.tree.Get(keyi)
	if item == nil {
		keyi.put(rev.main, rev.sub)
		ti.tree.ReplaceOrInsert(keyi)
		return
	}
	okeyi := item.(*keyIndex)
	okeyi.put(rev.main, rev.sub)
}

通過源碼可知對btree數據的讀寫操作都是在加鎖下完成的,從而來保證並發下數據的一致性。

Tags: