ElasticSearch版本控制–java實現
一、前言
最近工作中有這樣一個ElasticSearch(以下簡稱ES)寫入的場景,Flink處理完數據實時寫入ES。現在需要將一批歷史數據通過Flink載入到到ES,有兩個點需要保證:
- 對於歷史數據,ES已有文檔,則捨棄舊數據,ES沒有則插入歷史數據。
- 對於新數據,能對現有的ES數據進行更新。
參考ElasticSearch進階篇(一)–版本控制,可以使用ES的版本實現該需求的開發。
二、程式碼實現及驗證
程式碼實現
請求寫數據時加入version和version_type參數,主要程式碼如下:
IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.id("1")
// 指定版本比較的業務欄位,具體業務具體分析,一般取時間戳較為合適
.version(Long.parseLong(dataMap1.get("create").toString()))
// 指定使用外部版本號
.versionType(VersionType.EXTERNAL)
.source(dataMap);
驗證
驗證demo可使用當前時間的時間戳作為版本比較依據。驗證思路如下:
- 運行demo程式,在當前時間戳下,插入一條數據,通過kibana等工具檢驗數據是否插入成功。並記錄當前的時間戳。
- 更改某些欄位值對數據進行更新,再次運行程式,檢驗數據是否更新成功。
- 將時間版本比較的欄位值固定為第一次執行程式的時間戳,檢驗數據是否更新成功。
驗證結果如下圖:
三、總結
由截圖可看到,第一步和第二步都能執行成功,第三步執行會出現版本衝突的異常,根據提示很方便能識別出原因,即文章中得出的結論,使用version和version_type=EXTERNAL進行版本控制時,只有要寫入文檔的版本號大於已有文檔的版本號才能更新成功。
案例程式碼參考:elasticsearch_demo