一日一技:Elasticsearch批量插入時,存在就不插入
- 2019 年 10 月 8 日
- 筆記
攝影:產品經理
買單:kingname
當我們使用 Elasticsearch-py 批量插入數據到 ES 的時候,我們常常使用它的 helpers
模塊裏面的bulk
函數。其使用方法如下:
from elasticsearch import helpers, Elasticsearch es = Elasticsearch(xxx) def generator(): datas = [1, 2, 3] for data in datas: yield { '_id': "xxx", '_source': { 'age': data } } helpers.bulk(es, index='xxx', generator(), doc_type='doc',)
但這種方式有一個問題,它默認相當於upsert
操作。如果_id
對應的文檔已經在 ES 裏面了,那麼數據會被更新。如果_id
對應的文檔不在 ES 中,那麼就插入。
如果我想實現,不存在就插入,存在就跳過怎麼辦?此時就需要在文檔裏面添加_op_type
指定操作類型為create
:
from elasticsearch import helpers, Elasticsearch es = Elasticsearch(xxx) def generator(): datas = [1, 2, 3] for data in datas: yield { '_op_type': 'create', '_id': "xxx", '_source': { 'age': data } } helpers.bulk(es, generator(), index='xxx', doc_type='doc')
此時,如果_id
對應的文檔不在 ES 中,那麼就會正常插入,如果ES
裏面已經有_id
對應的數據了,那麼就會報錯。由於bulk
一次性默認插入500條數據,假設其中有2條數據已經存在了,那麼剩下的498條會被正常插入。然後程序報錯退出,告訴你有兩條寫入失敗,因為已經存在。
如果你不想讓程序報錯終止,那麼可以增加2個參數:
helpers.bulk(es, generator(), index='xxx', doc_type='doc', raise_on_exception=False, raise_on_error=False)
其中raise_on_exception=False
表示在插入數據失敗時,不需要拋出異常。raise_on_error=False
表示不拋出BulkIndexError
。