如何通過Elasticsearch Scroll快速取出數據,構造pandas dataframe — Python多進程實現

首先,python 多執行緒不能充分利用多核CPU的計算資源(只能共用一個CPU),所以得用多進程。筆者從3.7億數據的索引,取200多萬的數據,從取數據到構造pandas dataframe總共大概用時14秒左右。每個分片用一個進程查詢數據,最後拼接出完整的結果。

由於返回的json數據量較大,每次100多萬到200多萬,如何快速根據json構造pandas 的dataframe是個問題 — 筆者測試過read_json()、json_normalize()、DataFrame(eval(pandas_json))及DataFrame.from_dict(),from_dict()速度最快

轉載請註明出處//www.cnblogs.com/NaughtyCat/p/how-to-get-all-results-from-es-by-scroll-python-version.html

  • Elasticsearch scroll取數據— python版

源碼如下:

def es_scroll(index, min_timestamp, max_timestamp, slice_no):
    es = Elasticsearch('//localhost:9200', timeout = 30, max_retries=10, retry_on_timeout=True)
    page = es.search(
            index = index,
            doc_type = "tls_book",
            scroll = '1m',
            body={
            "slice": {
                "id": slice_no,
                "max": SLICES
            },
            "_source": [
            "SrcIP" 
            ],
            "sort": [
            "_doc"
            ],
            "query": {
                    "range" : {
                        "@timestamp" : {
                            "gte" : min_timestamp,
                            "lte" : max_timestamp,
                            "boost" : 2.0
                        }
                    }
                }
            },
            version = False,
            size = 10000)
    sid = page['_scroll_id']
    scroll_size = page['hits']['total']

    # Start scrolling
    df = pd.DataFrame()
    appended_data = []

    while (scroll_size > 0):
        frame = pd.DataFrame.from_dict([document['_source'] for document in page["hits"]["hits"]])
        appended_data.append(frame)
        page = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30)
        # Update the scroll ID
        sid = page['_scroll_id']
        # Get the number of results that we returned in the last scroll
        scroll_size = len(page['hits']['hits'])
    if len(appended_data) > 0: 
        df = pd.concat(appended_data, ignore_index=True, sort = False)
    del appended_data
    gc.collect() 
    es.clear_scroll(body={'scroll_id': sid})
    return df            

 註:

 (1)通過 “_source” 關鍵字,指定要取的欄位,可減少不必要的欄位,提高查詢速度

(2)官方文檔指出,通過 “sort”: [ “_doc”]按照_doc排序可提高查詢效率

(3)根據自己的環境,測試合理的 size ,效率會有數倍的差距。筆者環境(128G, 32核)一次取10000性能最好,網上大多測試,size取2000或者1000似乎較佳

(4)clear_scroll及時清理用完的scroll_id

(5)如果數據量較大,設置超時和重試次數(默認是10秒,否則超時會取不到數據),具體如下

 timeout = 30, max_retries=10, retry_on_timeout=True

(6)Sliced scroll

如果返回的數據量特別大,可通過slice讓多個分片獨自來處理請求,如下(id從0開始):

            "slice": {
                "id": slice_no,
                "max": SLICES
            },
參考: //www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html#sliced-scroll
  • python 多進程如何個函數傳多個參數

python多進程或者多執行緒要向調用的函數傳遞多個參數,需要構造參數元組集合,程式碼如下(本示例每個進程不同的只有es的slice_id):

def build_parameters(index, min_timestamp, max_timestamp):
    parmeters =[]
    for num in range(0, SLICES): 
        tuple_paremeter = (index, min_timestamp, max_timestamp, num)
        parmeters.append(tuple_paremeter)
    return parmeters
  • python多進程實例

 示例使用進程池,及starmap  傳遞調用的函數及參數 (with相當於try, excepion, finallly的集合,會自動做資源的釋放或關閉等)

            with multiprocessing.Pool(processes = SLICES) as pool:
                result = pool.starmap(es_scroll, parameters)

 

然後,拼接返回的dataframe 集合即可構造一個完整的dataframe,如下:

frame = pd.concat(result, ignore_index=True, sort = False)

 

*******************************************************************************************

精力有限,想法太多,專註做好一件事就行

  • 我只是一個程式猿。5年內把程式碼寫好,技術部落格字字推敲,堅持零拷貝和原創
  • 寫部落格的意義在於打磨文筆,訓練邏輯條理性,加深對知識的系統性理解;如果恰好又對別人有點幫助,那真是一件令人開心的事

*******************************************************************************************