Elasticsearch 模組 – Shard Allocation 機制

原文

1. 背景

shard allocation 意思是分片分配, 是一個將分片分配到節點的過程; 可能發生該操作的過程包括:

  • 初始恢復(initial recovery)
  • 副本分配(replica allocation)
  • 重新平衡(rebalance)
  • 節點的新增和刪除

來源

分片的分配操作, 是由 master 角色的節點來決定什麼時候移動分片, 以及移動到哪個節點上, 以達到集群的均衡;

說明

本文基於 Elasticsearch 7.4.0 版本

2. 機制分析

2.1. Allocation 觸發條件

  • 新增或刪除 index 索引
  • node 節點的新增或刪除
  • 執行 reroute 命令
  • 修改 replica 副本數量
  • 集群重啟

具體對應源碼解釋:來源

序號 調用函數 說明
1 AllocationService.applyStartedShards Shard 啟動狀態修改
2 AllocationService.applyFailedShards Shard 失效狀態修改
3 AllocationService.deassociateDeadNodes Node 節點離開集群
4 AllocationService.reroute(AllocationCommands) 執行 relocation 命令
5 TransportClusterUpdateSettingsAction.masterOperation 集群配置修改操作
6 MetaDataCreateIndexService.onlyCreateIndex 創建新索引 index 請求
7 MetaDataDeleteIndexService.deleteIndexs 刪除索引 index 操作
8 MetaDataIndexStateService.closeIndex 關閉 index 操作
9 MetaDataIndexStateService.openIndex 打開 index操作
10 NodeJoinController.JoinTaskExecutor 通過集群發現的節點加入集群
11 GatewayService.GatewayRecoveryListener 通過 GatewayRecovery 恢復的節點加入集群
12 LocalAllocateDangledIndices.submitStateUpdateTask 恢復磁碟記憶體而在 MateDate 內不存在的 index
13 RestoreService.restoreSnapshot 從 snapshot 中恢復的 index

2.2. Rebalance 的觸發條件

rebalance 之前會經過 2.3.2 中介紹的所有策略里實現的 canRebalance 方法, 全部通過後才會執行下面的 Rebalance 過程;

Rebalance 過程是通過調用 balanceByWeights() 方法, 計算 shard 所在的每個 node 的 weight 值,

\[weightShard = node.numShards() + numAdditionalShards – balancer.avgShardsPerNode() \\
weightIndex = node.numShards(index) + numAdditionalShards – balancer.avgShardsPerNode(index) \\
weight = theta0 * weightShard + theta1 * weightIndex \\
\]

其中:

  • numAdditionalShards 一般為 0, 調用 weightShardAdded, weightShardRemoved 方法時分別取值為 1-1;
  • theta0 = cluster.routing.allocation.balance.shard 系統動態配置項, 默認值為 0.45f;
  • theta1 = cluster.routing.allocation.balance.index 系統動態配置項, 默認值為 0.55f;

權重計算公式

源碼如下:

private static class WeightFunction {
    private final float indexBalance;
    private final float shardBalance;
    private final float theta0;
    private final float theta1;
    WeightFunction(float indexBalance, float shardBalance) {
        float sum = indexBalance + shardBalance;
        if (sum <= 0.0f) {
            throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
        }
        theta0 = shardBalance / sum;
        theta1 = indexBalance / sum;
        this.indexBalance = indexBalance;
        this.shardBalance = shardBalance;
    }
    float weight(Balancer balancer, ModelNode node, String index) {
        final float weightShard = node.numShards() - balancer.avgShardsPerNode();
        final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
        return theta0 * weightShard + theta1 * weightIndex;
    }
}

2.3. 源碼分析

分片分配就是把一個分片分配到集群中某個節點的過程, 其中分配決策包含了兩個方面:

  • 哪些分片應該分配到哪些節點上
  • 哪個分片作為主分片, 哪個作為副本分片

Elasticsearch 主要通過兩個基礎組件來完成分片分配這個過程的: allocatordeciders;

  • allocator 尋找最優的節點來分配分片;
  • deciders 負責判斷並決定是否要進行分配;
  1. 新建的索引

allocator 負責找出擁有分片數量最少的節點列表, 按分片數量遞增排序, 分片數量較少的會被優先選擇; 對於新建索引, allocator 的目標是以更為均衡的方式把新索引的分片分配到集群的節點中;

deciders 依次遍歷 allocator 給出的節點列表, 判斷是否要把分片分配給該節點, 比如是否滿足分配過濾規則, 分片是否將超出節點磁碟容量閾值等等;

  1. 已有的索引

allocator 對於主分片, 只允許把主分片指定在已經擁有該分片完整數據的節點上; 對於副本分片, 則是先判斷其他節點上是否已有該分片的數據的拷貝, 如果有這樣的節點, allocator 則優先把分片分配到這其中一個節點上;

2.3.1. Allocator

Allocator

  • PrimaryShardAllocator 找到擁有某 Shard 最新數據(主分片)的節點;
  • ReplicaShardAllocator 找到磁碟上擁有這個 Shard 數據(副本分片)的節點;
  • BalancedShardsAllocator 找到擁有最少 Shard 個數的節點;
public class BalancedShardsAllocator implements ShardsAllocator {
    public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
    public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
    public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope);

    private volatile WeightFunction weightFunction;
    private volatile float threshold;
}

2.3.2. Deciders

Deciders 決策期基礎組件的抽象類為 AllocationDecider:

public abstract class AllocationDecider {
    public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    public Decision canRebalance(RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
}

ES 7.4.0 中的 Decider 決策器包括以下所示, 他們均實現上面的 AllocationDecider 抽象類, 並重寫 canRebalance, canAllocate, canRemain, canForceAllocatePrimary 等方法;

AllocationDecider

決策器比較多, 大致分類如下, 並列舉決策器對應的配置項:

2.3.2.1. 負載均衡類
  • SameShardAllocationDecider: 避免主副分片分配到同一個節點;

  • AwarenessAllocationDecider: 感知分配器, 感知伺服器, 機架等, 盡量分散存儲 Shard;

    對應的配置參數有:

    cluster.routing.allocation.awareness.attributes: rack_id

    cluster.routing.allocation.awareness.attributes: zone

  • ShardsLimitAllocationDecider: 同一個節點上允許存在同一個 indexshard 數目;

    index.routing.allocation.total_shards_per_node: 表示該索引每個節點上允許最多的 shard 數量; 默認值=-1, 表示無限制;

    cluster.routing.allocation.total_shards_per_node: cluster 級別, 表示集群範圍內每個節點上允許最多的 shard 數量, 默認值=-1, 表示無限制;

    index 級別會覆蓋 cluster 級別;

2.3.2.2. 並發控制類
  • ThrottlingAllocationDecider: recovery 階段的限速配置, 避免過多的 recovering allocation 導致該節點的負載過高;

    cluster.routing.allocation.node_initial_primaries_recoveries: 當前節點在進行主分片恢復時的數量, 默認值=4;

    cluster.routing.allocation.node_concurrent_incoming_recoveries: 默認值=2, 通常是其他節點上的副本 shard 恢復到該節點上;

    cluster.routing.allocation.node_concurrent_outgoing_recoveries: 默認值=2, 通常是當前節點上的主分片 shard 恢復副本分片到其他節點上;

    cluster.routing.allocation.node_concurrent_recoveries: 統一配置上面兩個配置項;

  • ConcurrentRebalanceAllocationDecider: rebalace 並發控制, 表示集群同時允許進行 rebalance 操作的並發數量;

    cluster.routing.allocation.cluster_concurrent_rebalance, 默認值=2

    通過檢查 RoutingNodes 類中維護的 reloadingShard 計數器, 看是否超過配置的並發數;

  • DiskThresholdDecider: 根據節點的磁碟剩餘量來決定是否分配到該節點上;

    cluster.routing.allocation.disk.threshold_enabled, 默認值=true;

    cluster.routing.allocation.disk.watermark.low: 默認值=85%, 達到這個值後, 新索引的分片不會分配到該節點上;

    cluster.routing.allocation.disk.watermark.high: 默認值=90%, 達到這個值後, 會觸發已分配到該節點上的 Shardrebalance 到其他節點上去;

2.3.2.3. 條件限制類
  • RebalanceOnlyWhenActiveAllocationDecider: 所有 Shard 都處於 active 狀態下才可以執行 rebalance 操作;

  • FilterAllocationDecider: 通過介面動態設置的過濾器; cluster 級別會覆蓋 index 級別;

    index.routing.allocation.require.{attribute}
    index.routing.allocation.include.{attribute}
    index.routing.allocation.exclude.{attribute}
    cluster.routing.allocation.require.{attribute}
    cluster.routing.allocation.include.{attribute}
    cluster.routing.allocation.exclude.{attribute}

    • require 表示必須滿足, include 表示可以分配到指定節點, exclude 表示不允許分配到指定節點;
    • {attribute} 還有 ES 內置的幾個選擇, _name, _ip, _host;
  • ReplicaAfterPrimaryActiveAllocationDecider: 保證只在主分片分配完成後(active 狀態)才開始分配副本分片;

  • ClusterRebalanceAllocationDecider: 通過集群中 activeshard 狀態來決定是否可以執行 rebalance;

    cluster.routing.allocation.allow_rebalance

    indices_all_active(默認): 當集群所有的節點分配完成, 才可以執行 rebalance 操作;
    indices_primaries_active: 只要所有主分片分配完成, 才可以執行 rebalance 操作;
    always: 任何情況下都允許 rebalance 操作;

  • MaxRetryAllocationDecider: 防止 shard 在失敗次數達到上限後繼續分配;

    index.allocation.max_retries: 設置分配的最大失敗重試次數, 默認值=5;

2.3.2.4. 其他決策類
  • EnableAllocationDecider: 設置允許分配的分片類型; index 級別配置會覆蓋 cluster 級別配置;

    all(默認): 允許所有類型的分片;

    primaries: 僅允許主分片;

    new_primaries: 僅允許新建索引的主分片;

    none: 禁止分片分配操作;

  • NodeVersionAllocationDecider: 檢查分片所在 Node 的版本是否高於目標 Node 的 ES 版本;

  • SnapshotInProgressAllocationDecider: 決定 snapshot 期間是否允許 allocation, 因為 snapshot 只會發生在主分片上, 所以該配置只會限制主分片的 allocation;

    cluster.routing.allocation.snapshot.relocation_enabled

接下來介紹一下在 Elasticsearch 中涉及到 AllocationRebalance 的相關配置項;

3. cluster-level 配置

3.1. Shard allocation 配置

控制分片的分配和恢復;

配置 默認值 說明
cluster.routing.allocation.enable all 啟用或禁用針對特定類型分片的分配;
1. all: 允許分配所有類型的分片;
2. primaries: 只允許分配主分片(primary shard);
3. new_primaries: 只允許分配新索引的主分片(primary shard);
4. none: 禁用分片分配;
該設置不會影響重啟節點時本地主分片的恢復;
cluster.routing.allocation.node_concurrent_incoming_recoveries 2 一個節點允許並發的傳入分片(incoming shard)數量
cluster.routing.allocation.node_concurrent_outgoing_recoveries 2 一個節點允許並發的傳出分片(incoming shard)數量
cluster.routing.allocation.node_concurrent_recoveries 上面兩者的合併配置
cluster.routing.allocation.node_initial_primaries_recoveries 4 單個節點上同時初始化的主分片數量
cluster.routing.allocation.same_shard.host false 是否執行檢查, 以防止基於host namehost address, 在單個主機上分配同一分片的多個實例; 該設置僅用於在同一台電腦上啟動多個節點的情況;

3.2. Shard rebalancing 配置

控制集群之間的分片平衡;

配置 默認值 說明
cluster.routing.rebalance.enable all 啟用或禁用針對特定類型分片的rebalancing;
1. all: 允許rebalancing所有類型的分片;
2. primaries: 只允許rebalancing主分片;
3. replicas: 只允許rebalancing副本分片;
4. none: 禁用rebalancing;
cluster.routing.allocation.allow_rebalance indices_all_active 指定何時允許執行rebalancing;
1. always: 總是允許;
2. indices_primaries_active: 當集群中所有主分片已分配時才允許rebalancing;
3. indices_all_active: 當集群中所有分片(包括主分片和副本分片)都已分配時才允許rebalancing;
cluster.routing.allocation.cluster_concurrent_rebalance 2 指定整個集群中允許同時在節點間移動的分片數量; 該配置僅控制由於集群不平衡引起的並發分片分配數量, 對分配過濾(allocation filtering)或強制感知(forced awareness)的分片分配不做限制;

3.3. 分片平衡啟發式

以下配置用於決定每個分片的存放位置; 當rebalancing操作不再使任何節點的權重超過balance.threshold時, 集群即達到平衡;

配置 默認值 說明
cluster.routing.allocation.balance.shard 0.45f 定義節點上分配的分片總數的權重因子; 提升該值會導致集群中所有節點趨向於分片數量相等;
cluster.routing.allocation.balance.index 0.55f 定義節點上分配的每個索引的分片數量的權重因子; 提升該值會導致集群中所有節點上每個索引的分片數量趨向於相等;
cluster.routing.allocation.balance.threshold 1.0f 定義應當執行操作的最小優化值(非負浮點數); 提升該值會導致集群在優化分片平衡方面不太積極;

4. Index-level 配置

以下配置控制每個索引中的分片分配;

4.1. index-level 分片分配過濾(來源)

配置需要分兩步:

  1. 在每個 Elasticsearch 節點的 elasticsearch.yml 配置文件中添加自定義節點屬性, 比如以 small, medium, big區分節點類型, 則配置文件中可添加:
node.attr.size: medium

或者在啟動 Elasticsearch 服務時, 在命令行里添加 ./bin/elasticsearch -Enode.attr.size=medium;

  1. 在新建索引的 mapping 時, 添加index.routing.allocation.include/exclude/require.size: medium 的過濾配置即可;
PUT <index_name>/_settings
{
  "index.routing.allocation.include.size": "medium"
}

可以配置多個自定義節點屬性, 並且必須同時滿足索引里配置的多個過濾條件;

  • index.routing.allocation.include.{attribute}: {values}
  • index.routing.allocation.require.{attribute}: {values}
  • index.routing.allocation.exclude.{attribute}: {values}

其中 {attribute} 可以是上面提到的自定義節點屬性, ES 自己也有一些內置的節點屬性:

attribute 說明
_name 通過節點名稱進行匹配
_host_ip 通過節點 IP 地址進行匹配
_publish_ip 通過節點的發布 IP 地址進行匹配
_ip 通過 _host_ip_publish_ip 進行匹配
_host 通過節點的hostname進行匹配
_id 通過節點的 id 進行匹配

其中 {values} 可以是單個值, 也可以是逗號分隔的多個值, 也可以使用通配符 * 進行模糊匹配;

4.2. 設置延遲分配, 當節點離開時(來源)

當某個節點由於突發原因, 比如網路中斷, 人為操作重啟等, 需要暫時離開集群時, 集群會立刻新建副本分片以替換丟失的副本, 然後在剩餘的所有節點之間進行rebalancing, 這樣導致在短時間內該突發節點又恢復過來後, 原先的副本就無法再使用, 集群會將剛才新建的副本分片再拷貝回到該節點上; 這樣就會造成不必要的資源浪費, 以及節點分片rebalancing帶來的波動;

可以使用 index.unassigned.node_left.delayed_timeout 動態設置來延遲由於節點離開而導致未分配的副本分片的分配問題; 該配置默認值 1m;

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

修改成以上配置後, 如果在 5m 內, 該節點可以恢復重新加入集群, 則集群會自動恢復該節點的副本分片分配, 恢復速度很快;

注意

  1. 此設置不影響將副本分片升級為主分片;
  2. 此設置不影響之前未分配的副本分片;
  3. 在整個集群重新啟動後, 該延遲分配不會生效;

4.3. 索引恢復的優先順序(來源)

索引分片恢復的優先順序按照:

  • 可選的 index.priority 配置, 值越大優先順序越高;
  • index 索引的創建日期, 越新的索引優先順序越高;
  • index 索引的名稱;

4.4. 每個節點的分片總數(來源)

配置 默認值 說明
index.routing.allocation.total_shards_per_node unbounded(-1) 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(具體某個索引)
cluster.routing.allocation.total_shards_per_node unbounded(-1) 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(與索引無關, 全局設置)

這些配置是硬性配置, 可能會導致一些分片無法分配, 需要慎重配置;

5. 閱讀來源

  1. Shard allocation and cluster-level routing
  2. Elasticsearch底層系列之 Shard Allocation 機制
  3. ELASTICSEARCH ALLOCATION 分析
  4. Elasticsearch Shard Allocation 機制
  5. Elasticsearch Allocation&Rebalance