Elasticsearch 模組 – Shard Allocation 機制
1. 背景
shard allocation
意思是分片分配
, 是一個將分片分配到節點的過程; 可能發生該操作的過程包括:
- 初始恢復(
initial recovery
) - 副本分配(
replica allocation
) - 重新平衡(
rebalance
) - 節點的新增和刪除
分片的分配操作, 是由 master
角色的節點來決定什麼時候移動分片, 以及移動到哪個節點上, 以達到集群的均衡;
說明
本文基於 Elasticsearch 7.4.0 版本
2. 機制分析
2.1. Allocation
觸發條件
- 新增或刪除
index
索引 node
節點的新增或刪除- 執行
reroute
命令 - 修改
replica
副本數量 - 集群重啟
具體對應源碼解釋:來源
序號 | 調用函數 | 說明 |
---|---|---|
1 | AllocationService.applyStartedShards | Shard 啟動狀態修改 |
2 | AllocationService.applyFailedShards | Shard 失效狀態修改 |
3 | AllocationService.deassociateDeadNodes | Node 節點離開集群 |
4 | AllocationService.reroute(AllocationCommands) | 執行 relocation 命令 |
5 | TransportClusterUpdateSettingsAction.masterOperation | 集群配置修改操作 |
6 | MetaDataCreateIndexService.onlyCreateIndex | 創建新索引 index 請求 |
7 | MetaDataDeleteIndexService.deleteIndexs | 刪除索引 index 操作 |
8 | MetaDataIndexStateService.closeIndex | 關閉 index 操作 |
9 | MetaDataIndexStateService.openIndex | 打開 index操作 |
10 | NodeJoinController.JoinTaskExecutor | 通過集群發現的節點加入集群 |
11 | GatewayService.GatewayRecoveryListener | 通過 GatewayRecovery 恢復的節點加入集群 |
12 | LocalAllocateDangledIndices.submitStateUpdateTask | 恢復磁碟記憶體而在 MateDate 內不存在的 index |
13 | RestoreService.restoreSnapshot | 從 snapshot 中恢復的 index |
2.2. Rebalance
的觸發條件
在 rebalance
之前會經過 2.3.2
中介紹的所有策略里實現的 canRebalance
方法, 全部通過後才會執行下面的 Rebalance
過程;
Rebalance
過程是通過調用 balanceByWeights()
方法, 計算 shard 所在的每個 node 的 weight
值,
weightIndex = node.numShards(index) + numAdditionalShards – balancer.avgShardsPerNode(index) \\
weight = theta0 * weightShard + theta1 * weightIndex \\
\]
其中:
numAdditionalShards
一般為 0, 調用weightShardAdded
,weightShardRemoved
方法時分別取值為1
和-1
;- theta0 =
cluster.routing.allocation.balance.shard
系統動態配置項, 默認值為0.45f
; - theta1 =
cluster.routing.allocation.balance.index
系統動態配置項, 默認值為0.55f
;
源碼如下:
private static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
}
float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
2.3. 源碼分析
分片分配就是把一個分片分配到集群中某個節點的過程, 其中分配決策包含了兩個方面:
- 哪些分片應該分配到哪些節點上
- 哪個分片作為主分片, 哪個作為副本分片
Elasticsearch 主要通過兩個基礎組件來完成分片分配這個過程的: allocator
和 deciders
;
allocator
尋找最優的節點來分配分片;deciders
負責判斷並決定是否要進行分配;
- 新建的索引
allocator
負責找出擁有分片數量最少的節點列表, 按分片數量遞增排序, 分片數量較少的會被優先選擇; 對於新建索引, allocator
的目標是以更為均衡的方式把新索引的分片分配到集群的節點中;
deciders
依次遍歷 allocator
給出的節點列表, 判斷是否要把分片分配給該節點, 比如是否滿足分配過濾規則, 分片是否將超出節點磁碟容量閾值等等;
- 已有的索引
allocator
對於主分片, 只允許把主分片指定在已經擁有該分片完整數據的節點上; 對於副本分片, 則是先判斷其他節點上是否已有該分片的數據的拷貝, 如果有這樣的節點, allocator
則優先把分片分配到這其中一個節點上;
2.3.1. Allocator
PrimaryShardAllocator
找到擁有某Shard
最新數據(主分片)的節點;ReplicaShardAllocator
找到磁碟上擁有這個Shard
數據(副本分片)的節點;BalancedShardsAllocator
找到擁有最少Shard
個數的節點;
public class BalancedShardsAllocator implements ShardsAllocator {
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope);
private volatile WeightFunction weightFunction;
private volatile float threshold;
}
2.3.2. Deciders
Deciders
決策期基礎組件的抽象類為 AllocationDecider
:
public abstract class AllocationDecider {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRebalance(RoutingAllocation allocation) {
return Decision.ALWAYS;
}
}
ES 7.4.0 中的 Decider
決策器包括以下所示, 他們均實現上面的 AllocationDecider
抽象類, 並重寫 canRebalance
, canAllocate
, canRemain
, canForceAllocatePrimary
等方法;
決策器比較多, 大致分類如下, 並列舉決策器對應的配置項:
2.3.2.1. 負載均衡類
-
SameShardAllocationDecider
: 避免主副分片分配到同一個節點; -
AwarenessAllocationDecider
: 感知分配器, 感知伺服器, 機架等, 盡量分散存儲 Shard;對應的配置參數有:
cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.attributes: zone
-
ShardsLimitAllocationDecider
: 同一個節點上允許存在同一個index
的shard
數目;index.routing.allocation.total_shards_per_node
: 表示該索引每個節點上允許最多的shard
數量; 默認值=-1, 表示無限制;cluster.routing.allocation.total_shards_per_node
:cluster
級別, 表示集群範圍內每個節點上允許最多的shard
數量, 默認值=-1, 表示無限制;index
級別會覆蓋cluster
級別;
2.3.2.2. 並發控制類
-
ThrottlingAllocationDecider
:recovery
階段的限速配置, 避免過多的recovering allocation
導致該節點的負載過高;cluster.routing.allocation.node_initial_primaries_recoveries
: 當前節點在進行主分片恢復時的數量, 默認值=4;cluster.routing.allocation.node_concurrent_incoming_recoveries
: 默認值=2, 通常是其他節點上的副本 shard 恢復到該節點上;cluster.routing.allocation.node_concurrent_outgoing_recoveries
: 默認值=2, 通常是當前節點上的主分片 shard 恢復副本分片到其他節點上;cluster.routing.allocation.node_concurrent_recoveries
: 統一配置上面兩個配置項; -
ConcurrentRebalanceAllocationDecider
:rebalace
並發控制, 表示集群同時允許進行rebalance
操作的並發數量;cluster.routing.allocation.cluster_concurrent_rebalance
, 默認值=2通過檢查
RoutingNodes
類中維護的reloadingShard
計數器, 看是否超過配置的並發數; -
DiskThresholdDecider
: 根據節點的磁碟剩餘量來決定是否分配到該節點上;cluster.routing.allocation.disk.threshold_enabled
, 默認值=true;cluster.routing.allocation.disk.watermark.low
: 默認值=85%, 達到這個值後, 新索引的分片不會分配到該節點上;cluster.routing.allocation.disk.watermark.high
: 默認值=90%, 達到這個值後, 會觸發已分配到該節點上的Shard
會rebalance
到其他節點上去;
2.3.2.3. 條件限制類
-
RebalanceOnlyWhenActiveAllocationDecider
: 所有Shard
都處於active
狀態下才可以執行rebalance
操作; -
FilterAllocationDecider
: 通過介面動態設置的過濾器;cluster
級別會覆蓋index
級別;index.routing.allocation.require.{attribute}
index.routing.allocation.include.{attribute}
index.routing.allocation.exclude.{attribute}
cluster.routing.allocation.require.{attribute}
cluster.routing.allocation.include.{attribute}
cluster.routing.allocation.exclude.{attribute}
- require 表示必須滿足, include 表示可以分配到指定節點, exclude 表示不允許分配到指定節點;
- {attribute} 還有 ES 內置的幾個選擇, _name, _ip, _host;
-
ReplicaAfterPrimaryActiveAllocationDecider
: 保證只在主分片分配完成後(active
狀態)才開始分配副本分片; -
ClusterRebalanceAllocationDecider
: 通過集群中active
的shard
狀態來決定是否可以執行rebalance
;cluster.routing.allocation.allow_rebalance
indices_all_active
(默認): 當集群所有的節點分配完成, 才可以執行rebalance
操作;
indices_primaries_active
: 只要所有主分片分配完成, 才可以執行rebalance
操作;
always
: 任何情況下都允許 rebalance 操作; -
MaxRetryAllocationDecider
: 防止 shard 在失敗次數達到上限後繼續分配;index.allocation.max_retries
: 設置分配的最大失敗重試次數, 默認值=5;
2.3.2.4. 其他決策類
-
EnableAllocationDecider
: 設置允許分配的分片類型;index
級別配置會覆蓋cluster
級別配置;all
(默認): 允許所有類型的分片;primaries
: 僅允許主分片;new_primaries
: 僅允許新建索引的主分片;none
: 禁止分片分配操作; -
NodeVersionAllocationDecider
: 檢查分片所在 Node 的版本是否高於目標 Node 的 ES 版本; -
SnapshotInProgressAllocationDecider
: 決定snapshot
期間是否允許allocation
, 因為snapshot
只會發生在主分片上, 所以該配置只會限制主分片的allocation
;cluster.routing.allocation.snapshot.relocation_enabled
接下來介紹一下在 Elasticsearch 中涉及到 Allocation
和 Rebalance
的相關配置項;
3. cluster-level 配置
3.1. Shard allocation 配置
控制分片的分配和恢復;
配置 | 默認值 | 說明 |
---|---|---|
cluster.routing.allocation.enable | all | 啟用或禁用針對特定類型分片的分配; 1. all : 允許分配所有類型的分片; 2. primaries : 只允許分配主分片(primary shard ); 3. new_primaries : 只允許分配新索引的主分片(primary shard );4. none : 禁用分片分配;該設置不會影響重啟節點時本地主分片的恢復; |
cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一個節點允許並發的傳入分片(incoming shard )數量 |
cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一個節點允許並發的傳出分片(incoming shard )數量 |
cluster.routing.allocation.node_concurrent_recoveries | 上面兩者的合併配置 | |
cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 單個節點上同時初始化的主分片數量 |
cluster.routing.allocation.same_shard.host | false | 是否執行檢查, 以防止基於host name 和host address , 在單個主機上分配同一分片的多個實例; 該設置僅用於在同一台電腦上啟動多個節點的情況; |
3.2. Shard rebalancing 配置
控制集群之間的分片平衡;
配置 | 默認值 | 說明 |
---|---|---|
cluster.routing.rebalance.enable | all | 啟用或禁用針對特定類型分片的rebalancing ;1. all : 允許rebalancing 所有類型的分片;2. primaries : 只允許rebalancing 主分片;3. replicas : 只允許rebalancing 副本分片;4. none : 禁用rebalancing ; |
cluster.routing.allocation.allow_rebalance | indices_all_active | 指定何時允許執行rebalancing ;1. always : 總是允許;2. indices_primaries_active : 當集群中所有主分片已分配時才允許rebalancing ;3. indices_all_active : 當集群中所有分片(包括主分片和副本分片)都已分配時才允許rebalancing ; |
cluster.routing.allocation.cluster_concurrent_rebalance | 2 | 指定整個集群中允許同時在節點間移動的分片數量; 該配置僅控制由於集群不平衡引起的並發分片分配數量, 對分配過濾(allocation filtering )或強制感知(forced awareness )的分片分配不做限制; |
3.3. 分片平衡啟發式
以下配置用於決定每個分片的存放位置; 當rebalancing
操作不再使任何節點的權重超過balance.threshold
時, 集群即達到平衡;
配置 | 默認值 | 說明 |
---|---|---|
cluster.routing.allocation.balance.shard | 0.45f | 定義節點上分配的分片總數的權重因子; 提升該值會導致集群中所有節點趨向於分片數量相等; |
cluster.routing.allocation.balance.index | 0.55f | 定義節點上分配的每個索引的分片數量的權重因子; 提升該值會導致集群中所有節點上每個索引的分片數量趨向於相等; |
cluster.routing.allocation.balance.threshold | 1.0f | 定義應當執行操作的最小優化值(非負浮點數); 提升該值會導致集群在優化分片平衡方面不太積極; |
4. Index-level 配置
以下配置控制每個索引中的分片分配;
4.1. index-level 分片分配過濾(來源)
配置需要分兩步:
- 在每個 Elasticsearch 節點的
elasticsearch.yml
配置文件中添加自定義節點屬性, 比如以small
,medium
,big
區分節點類型, 則配置文件中可添加:
node.attr.size: medium
或者在啟動 Elasticsearch 服務時, 在命令行里添加 ./bin/elasticsearch -Enode.attr.size=medium
;
- 在新建索引的
mapping
時, 添加index.routing.allocation.include/exclude/require.size: medium
的過濾配置即可;
PUT <index_name>/_settings
{
"index.routing.allocation.include.size": "medium"
}
可以配置多個自定義節點屬性, 並且必須同時滿足索引里配置的多個過濾條件;
- index.routing.allocation.include.{attribute}: {values}
- index.routing.allocation.require.{attribute}: {values}
- index.routing.allocation.exclude.{attribute}: {values}
其中 {attribute}
可以是上面提到的自定義節點屬性, ES 自己也有一些內置的節點屬性:
attribute | 說明 |
---|---|
_name | 通過節點名稱進行匹配 |
_host_ip | 通過節點 IP 地址進行匹配 |
_publish_ip | 通過節點的發布 IP 地址進行匹配 |
_ip | 通過 _host_ip 或 _publish_ip 進行匹配 |
_host | 通過節點的hostname 進行匹配 |
_id | 通過節點的 id 進行匹配 |
其中 {values}
可以是單個值, 也可以是逗號分隔的多個值, 也可以使用通配符 *
進行模糊匹配;
4.2. 設置延遲分配, 當節點離開時(來源)
當某個節點由於突發原因, 比如網路中斷, 人為操作重啟等, 需要暫時離開集群時, 集群會立刻新建副本分片以替換丟失的副本, 然後在剩餘的所有節點之間進行rebalancing
, 這樣導致在短時間內該突發節點又恢復過來後, 原先的副本就無法再使用, 集群會將剛才新建的副本分片再拷貝回到該節點上; 這樣就會造成不必要的資源浪費, 以及節點分片rebalancing
帶來的波動;
可以使用 index.unassigned.node_left.delayed_timeout
動態設置來延遲由於節點離開而導致未分配的副本分片的分配問題; 該配置默認值 1m
;
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
修改成以上配置後, 如果在 5m
內, 該節點可以恢復重新加入集群, 則集群會自動恢復該節點的副本分片分配, 恢復速度很快;
注意
- 此設置不影響將副本分片升級為主分片;
- 此設置不影響之前未分配的副本分片;
- 在整個集群重新啟動後, 該延遲分配不會生效;
4.3. 索引恢復的優先順序(來源)
索引分片恢復的優先順序按照:
- 可選的
index.priority
配置, 值越大優先順序越高; index
索引的創建日期, 越新的索引優先順序越高;index
索引的名稱;
4.4. 每個節點的分片總數(來源)
配置 | 默認值 | 說明 |
---|---|---|
index.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(具體某個索引) |
cluster.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(與索引無關, 全局設置) |
這些配置是硬性配置, 可能會導致一些分片無法分配, 需要慎重配置;
5. 閱讀來源
- Shard allocation and cluster-level routing
- Elasticsearch底層系列之 Shard Allocation 機制
- ELASTICSEARCH ALLOCATION 分析
- Elasticsearch Shard Allocation 機制
- Elasticsearch Allocation&Rebalance