一日一技:Elasticsearch批量插入時,存在就不插入

  • 2019 年 10 月 8 日
  • 筆記

攝影:產品經理

買單:kingname

當我們使用 Elasticsearch-py 批量插入數據到 ES 的時候,我們常常使用它的 helpers模塊裏面的bulk函數。其使用方法如下:

from elasticsearch import helpers, Elasticsearch    es = Elasticsearch(xxx)    def generator():      datas = [1, 2, 3]      for data in datas:          yield {              '_id': "xxx",              '_source': {                  'age': data              }          }    helpers.bulk(es,  index='xxx',  generator(),  doc_type='doc',)  

但這種方式有一個問題,它默認相當於upsert操作。如果_id 對應的文檔已經在 ES 裏面了,那麼數據會被更新。如果_id 對應的文檔不在 ES 中,那麼就插入。

如果我想實現,不存在就插入,存在就跳過怎麼辦?此時就需要在文檔裏面添加_op_type指定操作類型為create:

from elasticsearch import helpers, Elasticsearch    es = Elasticsearch(xxx)    def generator():      datas = [1, 2, 3]      for data in datas:          yield {              '_op_type': 'create',              '_id': "xxx",              '_source': {                  'age': data              }          }    helpers.bulk(es,  generator(),  index='xxx',  doc_type='doc')  

此時,如果_id 對應的文檔不在 ES 中,那麼就會正常插入,如果ES裏面已經有_id對應的數據了,那麼就會報錯。由於bulk一次性默認插入500條數據,假設其中有2條數據已經存在了,那麼剩下的498條會被正常插入。然後程序報錯退出,告訴你有兩條寫入失敗,因為已經存在。

如果你不想讓程序報錯終止,那麼可以增加2個參數:

helpers.bulk(es,      generator(),      index='xxx',      doc_type='doc',      raise_on_exception=False,               raise_on_error=False)  

其中raise_on_exception=False表示在插入數據失敗時,不需要拋出異常。raise_on_error=False表示不拋出BulkIndexError