記一次在線跨集群遷移ES數據

  • 2019 年 12 月 23 日
  • 筆記

背景

業務所有的伺服器日誌都是通過filebeat進行採集,然後寫入到一個公共的ES集群中。因為當前使用的集群無法繼續擴容了並且版本也較低(5.6.4), 所以需要把集群遷移到一個新的規模更大的集群,並且升級一下ES的版本,升級到6.4.3.

方案制定

遷移的需求是:

  1. 日誌數據不能停止寫入到ES
  2. 日誌查詢不受影響,延遲不能超過1分鐘

參考之前寫的關於數據遷移的文章Elasticsearch數據遷移與集群容災,制定出的遷移方案為:

  1. 先使用logstash或者snapshot全量同步一次數據到新集群中
  2. 使用logstash追平當天的日誌索引後,查詢入口切換到新的ES集群
  3. 日誌寫入入口切換到新的ES集群

實施步驟

1. 新建6.4.3版本ES集群

如果舊的日誌集群處理日誌時使用了ingest pipeline, 新的集群也需要同樣配置pipeline。

2.使用logstash/snapshot全同步數據

如果數據規模較小,比如幾十GB, 則可以使用logstash進行全量同步, logstash配置文件如下:

input {      elasticsearch {          hosts => "1.1.1.1:9200"          index => "*"          docinfo => true          size => 5000          scroll => "5m"        }  }  filter {       mutate {     remove_field => ["source", "@version"]   }  }  output {      elasticsearch {          hosts => ["http://2.2.2.2:9200"]          index => "%{[@metadata][_index]}"          pipeline => "timezone-pipeline"      }  }

實施過程中遇到的兩個問題:

  • 源集群(5.6.4)的.kibana索引也被同步到6.4.3版本的新集群了,造成不兼容,需要在新集群中刪除掉.kibana索引
  • 源集群中的日誌時間戳欄位@timestamp是增加了+08:00時區後綴的,經過上述遷移後,同步到新集群中的日誌數據中@timestamp沒有了時區後綴,這個問題在logstash側進行了嘗試沒有解決,所以通過在es側增加ingest pipeline進行解決:
    "description": "timezone-pipeline",      "processors": [        {          "date": {            "field": "@timestamp",            "formats": [              "ISO8601"            ],            "timezone": "Asia/Shanghai",            "ignore_failure": true          }        }      ],      "on_failure": [        {          "set": {            "field": "error",            "value": "{{ _ingest.on_failure_message }}"          }        }      ]

如果數據規模較大,幾百GB以上,則可以使用snapshot進行一次全量的遷移,速度較快。

3. 記錄新集群中當天索引中數據的最新時間戳

存量的舊的索引不會再寫入了,而當天的索引還在持續寫入,在步驟2的全量同步數據完成之後(logstash執行完畢後會自動終止進程), 需要查詢出當天索引的數據中已經同步完成的最新的時間戳,在新的集群中執行查詢:

GET es-runlog-2019-11-20/_search  {    "query": {      "match_all": {}    },    "size": 1,    "sort": [      {        "@timestamp": "desc"      }    ]  }

記執行上述查詢獲取到的時間戳為start.

4.增量遷移當天的索引

使用logstash增量遷移當天的索引,logstash配置如下:

input {      elasticsearch {          hosts => "1.1.1.1:9200"          index => "es-runlog-2019.11.20"          query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'          size => 5000          scroll => "5m"          docinfo => true          schedule => "* * * * *" #定時任務,每分鐘執行一次        }  }  filter {       mutate {     remove_field => ["source", "@version"]   }  }  output {      elasticsearch {          hosts => ["http://2.2.2.2:9200"]          index => "%{[@metadata][_index]}"          document_type => "%{[@metadata][_type]}"          document_id => "%{[@metadata][_id]}"          pipeline => "timezone-pipeline"      }  }

上述配置,每分鐘執行一次,從源集群中拉取5分鐘前到當前分鐘的所有數據,同步到新的集群中;因為查詢的粒度為分鐘,所以每次執行定時任務查詢時會有一部分重疊的數據,所以需要在output中配置document_id參數避免重複寫入到新集群中。

實施過程中遇到的問題有:

  1. 用於運行logstash的機器的規格要比較大,因為logstash比較消耗記憶體和cpu,機器性能不夠,很可能出現數據同步延遲增大
  2. 可以通過比較新舊集群當天的索引每分鐘doc數據量,判斷同步的延遲情況,如果延遲較大,可以通過調整logstash配置或者使用更大的機器運行logstash確保同步過程順利進行

5. 記錄開始遷移的時間

在新的集群中執行以下查詢,記錄開始進行增量遷移的時間戳:

GET es-runlog-2019-11-20/_search    {    "query": {      "range": {        "@timestamp": {          "gt": "{start}"        }      }    },    "size": 1,    "sort": [      {        "@timestamp": "asc"      }    ]  }

記獲取到的時間戳為end.

6. 追平start和end之間的數據

使用logstash從源集群中獲取start和end之間的日誌數據,同步到新集群中,配置文件如下:

input {      elasticsearch {          hosts => "1.1.1.1:9200"          index => "es-runlog-2019.11.20"          query => '{"query":{"range":{"@timestamp":{"gte":"start","lt":"end"}}}}'          docinfo => true        }  }  filter {       mutate {     remove_field => ["source", "@version"]   }  }  output {      elasticsearch {          hosts => ["http://2.2.2.2:9200"]          index => "%{[@metadata][_index]}"      	 document_type => "%{[@metadata][_type]}"          document_id => "%{[@metadata][_id]}"      	 pipeline => "timezone-pipeline"      }  }

7. 持續觀察數據同步過程是否穩定

待步驟6的數據追平過程結束之後,需要持續觀察步驟5的增量遷移數據的情況是否穩定,待一段時間,比如幾個小時之後,仍然可以穩定的進行同步,此時可以把日誌的查詢入口切換到新集群中,之後再把數據寫入入切換到新集群中,至此,一次在線跨集群遷移數據實施過程完畢。