【elasticsearch】搜索過程詳解

本文基於elasticsearch8.1。在es搜索中,經常會使用索引+星號,採用時間戳來進行搜索,比如aaaa-*在es中是怎麼處理這類請求的呢?是對匹配的進行搜索呢還是僅僅根據時間找出索引,然後才遍歷索引進行搜索。在了解其原理前先了解一些基本知識。

SearchType

QUERY_THEN_FETCH(默認):第一步,先向所有的shard發出請求,各分片只返回排序和排名相關的信息(注意,不包括文檔document),然後按照各分片返回的分數進行重新排序和排名,取前size個文檔。然後進行第二步,去相關的shard取document。這種方式返回的document與用戶要求的size是相等的。
DFS_QUERY_THEN_FETCH:比第1種方式多了一個初始化散發(initial scatter)步驟。

為了能夠深刻了解es的搜索過程,首先創建3個索引,每個索引指定一天的一條記錄。

POST aaaa-16/_doc
{
  "@timestamp": "2022-02-16T16:21:15.000Z",
  "word":"16"
}


POST aaaa-17/_doc
{
  "@timestamp": "2022-02-17T16:21:15.000Z",
  "word":"17"
}

POST aaaa-18/_doc
{
  "@timestamp": "2022-02-18T16:21:15.000Z",
  "word":"18"
}

即可在kibana上看到3條數據

image-20220219195141327

此時,假設我們用一個索引+星號來搜索,es內部的搜索是怎麼樣的呢?

GET aaaa*/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2022-02-18T10:21:15.000Z",
        "lte": "2022-02-18T17:21:15.000Z"
      }
    }
  }
}

正好命中一條記錄返回。

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "aaaa-18",
        "_id" : "0zB2O38BoMIMP8QzHgdq",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2022-02-18T16:21:15.000Z",
          "word" : "18"
        }
      }
    ]
  }
}

一、es的分佈式搜索過程

一個搜索請求必須詢問請求的索引中所有分片的某個副本來進行匹配。假設一個索引有5個主分片,每個主分片有1個副分片,共10個分片,一次搜索請求會由5個分片來共同完成,它們可能是主分片,也可能是副分片。也就是說,一次搜索請求只會命中所有分片副本中的一個。當搜索任務執行在分佈式系統上時,整體流程如下圖所示。圖片來源Elasitcsearch源碼解析與優化實戰

image-20220220170111657

搜索入口:

整個http請求的入口,主要使用的是Netty4HttpRequestHandler。

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
        final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
        boolean success = false;
        try {
            serverTransport.incomingRequest(httpRequest, channel);
            success = true;
        } finally {
            if (success == false) {
                httpRequest.release();
            }
        }
    }
}

二、初步調用流程

調用鏈路過程:Netty4HttpRequestHandler.channelRead0->AbstractHttpServerTransport.incomingRequest->AbstractHttpServerTransport.handleIncomingRequest->AbstractHttpServerTransport.dispatchRequest->RestController.dispatchRequest(實現了HttpServerTransport.Dispatcher)->SecurityRestFilter.handleRequest->BaseRestHandler.handleRequest->action.accept(channel)->RestCancellableNodeClient.doExecute->NodeClient.executeLocally->RequestFilterChain.proceed->TransportAction.proceed->TransportSearchAction.doExecute->TransportSearchAction.executeRequest(判斷是本地執行還是遠程執行)->TransportSearchAction.searchAsyncAction

協調節點的主要功能是接收請求,解析並構建目的的shard列表,然後異步發送到數據節點進行請求查詢。具體就不細講了,可按着debug的來慢慢調試。

特別注意下RestCancellableNodeClient.doExecute,從executeLocally執行所有的查詢過程,並註冊監聽listener.onResponse(response),然後響應。

public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(...) {
  ...
    TaskHolder taskHolder = new TaskHolder();
    Task task = client.executeLocally(action, request, new ActionListener<>() {
        @Override
        public void onResponse(Response response) {
            try {
                closeListener.unregisterTask(taskHolder);
            } finally {
                listener.onResponse(response);
            }
        }
    });
  ...
}

其次要注意的是:TransportSearchAction.searchAsyncAction才開始真正的搜索過程

private SearchPhase searchAsyncAction(...) {
  ...
    final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults();
  AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction = switch (searchRequest.searchType()) {
    case DFS_QUERY_THEN_FETCH -> new SearchDfsQueryThenFetchAsyncAction(...);
    case QUERY_THEN_FETCH -> new SearchQueryThenFetchAsyncAction(...);
  };
  return searchAsyncAction;
  ...
}

之後就是執行AbstractSearchAsyncAction.start,啟動AbstractSearchAsyncAction.executePhase的查詢動作。

此處的SearchPhase實現類為SearchQueryThenFetchAsyncAction
private void executePhase(SearchPhase phase) {
    try {
        phase.run();
    } catch (Exception e) {
        if (logger.isDebugEnabled()) {
            logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
        }
        onPhaseFailure(phase, "", e);
    }
}

三、協調節點

兩階段相應的實現位置:查詢(Query)階段—search.SearchQueryThenFetchAsyncAction;取回(Fetch)階段—search.FetchSearchPhase。它們都繼承自SearchPhase,如下圖所示。

image-20220319171622622

3.1 query階段

圖片來源官網,比較舊,但任然可用

image-20220315002620496

(1)客戶端發送一個search請求到node3,node3創建一個大小為from,to的優先級隊列。
(2)node3轉發轉發search請求至索引的主分片或者副本,每個分片執行查詢請求,並且將結果放到一個排序之後的from、to大小的優先級隊列。
(3)每個分片把文檔id和排序之後的值返回給協調節點node3,node3把結果合併然後創建一個全局排序之後的結果。

在RestSearchAction#prepareRequest方法中將請求體解析為SearchRequest 數據結構: 
public RestChannelConsumer prepareRequest(.. .) {
    SearchRequest searchRequest = new SearchRequest();
    request.withContentOrSourceParamParserOrNull (parser ->
        parseSearchRequest (searchRequest, request, parser, setSize));
    ...
}

3.1.1 構造目的shard列表

將請求涉及的本集群shard列表和遠程集群的shard列表(遠程集群用於跨集群訪問)合併:

private void executeSearch(.. .) {
  ...
    final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
    localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false).map(it -> {
    OriginalIndices finalIndices = finalIndicesMap.get(it.shardId().getIndex().getUUID());
    assert finalIndices != null;
    return new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), finalIndices);
    }).collect(Collectors.toList());
	...
}

查看結果

3.1.2 對所有分片進行搜索

AbstractSearchAsyncAction.run
對每個分片進行搜索查詢
for (int i = 0; i < shardsIts.size(); i++) {
    final SearchShardIterator shardRoutings = shardsIts.get(i);
    assert shardRoutings.skip() == false;
    assert shardIndexMap.containsKey(shardRoutings);
    int shardIndex = shardIndexMap.get(shardRoutings);
    performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
}

其中shardsIts是所有aaaa*的所有索引+其中一個副本

image-20220315000514770

3.1.3 分片具體的搜索過程

AbstractSearchAsyncAction.performPhaseOnShard
private void performPhaseOnShard(. ..) {
    executePhaseOnShard(.. .) {
        //收到執行成功的回復
        public void inne rOnResponse (FirstResult result) {
            maybeFork (thread, () -> onShardResult (result,shardIt) );
        }
        //收到執行失敗的回復
        public void onFailure (Exception t) {
            maybeFork(thread, () -> onShardFailure (shardIndex, shard, shard. currentNodeId(),shardIt, t));
        }
    });
}

分片結果,當前線程

//AbstractSearchAsyncAction.onShardResult
...
private void onShardResult (FirstResult result, SearchShardIterator shardIt) {
    onShardSuccess(result);
    success fulShardExecution(shardIt);
}
...
//AbstractSearchAsyncAction.onShardResultConsumed
private void successfulShardExecution (SearchShardIterator shardsIt) {
    //計數器累加.
    final int xTotalOps = totalOps.addAndGet (remainingOpsOnIterator);
    //檢查是否收到全部回復
    if (xTotalOps == expectedTotalOps) {
        onPhaseDone ();
    } else if (xTota1Ops > expectedTotal0ps) {
        throw new AssertionError(. ..);
    }
}

image-20220315001153465

此處忽略了搜索結果totalHits為0的結果,並將結果進行累加,當xTotalOps等於expectedTotalOps時開始AbstractSearchAsyncAction.onPhaseDone再進行AbstractSearchAsyncAction.executeNextPhase取回階段

3.2 Fetch階段

取回階段,圖片來自官網

image-20220315002725203

(1)各個shard 返回的只是各文檔的id和排序值 IDs and sort values ,coordinate node根據這些id&sort values 構建完priority queue之後,然後把程序需要的document 的id發送mget請求去所有shard上獲取對應的document

(2)各個shard將document返回給coordinate node

(3)coordinate node將合併後的document結果返回給client客戶端

3.2.1 FetchSearchPhase(對應上面的1)

Query階段的executeNextPhase方法觸發Fetch階段,Fetch階段的起點為FetchSearchPhase#innerRun函數,從查詢階段的shard列表中遍歷,跳過查詢結果為空的shard,對特定目標shard執行executeFetch來獲取數據,其中包括分頁信息。對scroll請求的處理也在FetchSearchPhase#innerRun函數中。

private void innerRun() throws Exception {
    final int numShards = context.getNumShards();
    final boolean isScrollSearch = context.getRequest().scroll() != null;
    final List<SearchPhaseResult> phaseResults = queryResults.asList();
    final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
    final boolean queryAndFetchOptimization = queryResults.length() == 1;
    final Runnable finishPhase = () -> moveToNextPhase(
        searchPhaseController,
        queryResults,
        reducedQueryPhase,
        queryAndFetchOptimization ? queryResults : fetchResults.getAtomicArray()
    );
            for (int i = 0; i < docIdsToLoad.length; i++) {
                IntArrayList entry = docIdsToLoad[i];
                SearchPhaseResult queryResult = queryResults.get(i);
                if (entry == null) { 
                    if (queryResult != null) {
                        releaseIrrelevantSearchContext(queryResult.queryResult());
                        progressListener.notifyFetchResult(i);
                    }
                    counter.countDown();
                }else{
                    executeFetch(
                            queryResult.getShardIndex(),
                            shardTarget,
                            counter,
                            fetchSearchRequest,
                            queryResult.queryResult(),
                            connection
                        );
                }
        }
    }
}

再看源碼:

啟動一個線程來fetch
AbstractSearchAsyncAction.executePhase->FetchSearchPhase.run->FetchSearchPhase.innerRun->FetchSearchPhase.executeFetch

private void executeFetch(...) {
    context.getSearchTransport()
        .sendExecuteFetch(
            connection,
            fetchSearchRequest,
            context.getTask(),
            new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
                @Override
                public void innerOnResponse(FetchSearchResult result) {
                  progressListener.notifyFetchResult(shardIndex);
                  counter.onResult(result);
                }

                @Override
                public void onFailure(Exception e) {
                  progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
                  counter.onFailure(shardIndex, shardTarget, e);
                }
            }
        );
}
  

image-20220316000309117

counter是一個收集器CountedCollector,onResult(result)主要是每次收到的shard數據存放,並且執行一次countDown,當所有shard數據收集完之後,然後觸發一次finishPhase。

# CountedCollector.class
void countDown() {
    assert counter.isCountedDown() == false : "more operations executed than specified";
    if (counter.countDown()) {
        onFinish.run();
    }
}

moveToNextPhase方法執行下一階段,下-階段要執行的任務定義在FetchSearchPhase構造 函數中,主要是觸發ExpandSearchPhase。

3.2.2 ExpandSearchPhase(對應上圖的2)

AbstractSearchAsyncAction.executePhase->ExpandSearchPhase.run。取回階段完成之後執行ExpandSearchPhase#run,主要判斷是否啟用字段摺疊,根據需要實現字段摺疊功能,如果沒有實現字段摺疊,則直接返回給客戶端。

image-20220317000858513

ExpandSearchPhase執行完之後回復客戶端,在AbstractSearchAsyncAction.sendSearchResponse方法中實現:

image-20220316004058222

四、數據節點

4.1 執行query、fetch流程

執行本流程的線程池: search。

對各種Query請求的處理入口註冊於SearchTransportService.registerRequestHandler。

public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
  ...
  transportService.registerRequestHandler(
    QUERY_ACTION_NAME,
    ThreadPool.Names.SAME,
    ShardSearchRequest::new,
    (request, channel, task) -> searchService.executeQueryPhase(
      request,
      (SearchShardTask) task,
      new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request)
    )
  );
  ...
}

4.1.1 執行query請求

# SearchService
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
    assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
        : "empty responses require more than one shard";
    final IndexShard shard = getShard(request);
    rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> {
        ensureAfterSeqNoRefreshed(shard, orig, () -> executeQueryPhase(orig, task), l);
    }));
}

其中ensureAfterSeqNoRefreshed是把request任務放到一個名為search的線程池裏面執行,容量大小為1000。

image-20220318003020110

主要是用來執行SearchService.executeQueryPhase->SearchService.loadOrExecuteQueryPhase->QueryPhase.execute。核心的查詢封裝在queryPhase.execute(context)中,其中調用Lucene實現檢索,同時實現聚合:

public void execute (SearchContext searchContext) {
    aggregationPhase.preProcess (searchContext);
    boolean rescore = execute ( searchContext, searchContext.searcher(), searcher::setCheckCancelled, indexSort);
    if (rescore) {
        rescorePhase.execute (searchContext);
        suggestPhase.execute (searchContext);
        aggregationPhase.execute (searchContext);
    }
}

其中包含幾個核心功能:

  • execute(),調用Lucene、searcher.search()實現搜索
  • rescorePhase,全文檢索且需要打分
  • suggestPhase,自動補全及糾錯
  • aggregationPhase,實現聚合

4.1.2 fetch流程

對各種Fetch請求的處理入口註冊於SearchTransportService.registerRequestHandler。

transportService.registerRequestHandler(
    QUERY_FETCH_SCROLL_ACTION_NAME,
    ThreadPool.Names.SAME,
    InternalScrollSearchRequest::new,
    (request, channel, task) -> {
        searchService.executeFetchPhase(
            request,
            (SearchShardTask) task,
            new ChannelActionListener<>(channel, QUERY_FETCH_SCROLL_ACTION_NAME, request)
        );
    }
);

對Fetch響應的實現封裝在SearchService.executeFetchPhase中,核心是調用fetchPhase.execute(context)。按照命中的doc取得相關數據,填充到SearchHits中,最終封裝到FetchSearchResult中。

# FetchPhase
public void execute(SearchContext context) {
    Profiler profiler = context.getProfilers() == null ? Profiler.NOOP : context.getProfilers().startProfilingFetchPhase();
    SearchHits hits = null;
    try {
      //lucene構建搜索的結果
        hits = buildSearchHits(context, profiler);
    } finally {
        ProfileResult profileResult = profiler.finish();
        // Only set the shardResults if building search hits was successful
        if (hits != null) {
            context.fetchResult().shardResult(hits, profileResult);
        }
    }
}

五、數據返回

入口:RestCancellableNodeClient.doExecute
Task task = client.executeLocally主要執行查詢,並使用了ActionListener來進行監聽
image-20220319003638991

其中onResponse的調用鏈路如下:RestActionListener.onResponse->RestResponseListener.processResponse->RestController.sendResponse->DefaultRestChannel.sendResponse->Netty4HttpChannel.sendResponse

public void sendResponse(RestResponse restResponse) {
  ...
   httpChannel.sendResponse(httpResponse, listener);
  ...
}

最後由Netty4HttpChannel.sendResponse來響應請求。

六、總結

當我們以aaaa*這樣來搜索的時候,實際上查詢了所有匹配以aaaa開頭的索引,並且對所有的索引的分片都進行了一次Query,再然後對有結果的分片進行一次fetch,最終才能展示結果。可能覺得好奇,對所有分片都進行一次搜索,遍歷分片所有的Lucene分段,會不會太過於消耗資源,因此合併Lucene分段對搜索性能有好處,這裡下篇文章在講吧。同時,聚合是發生在fetch過程中的,並不是lucene。

本文參考

  1. Elasitcsearch源碼解析與優化實戰
  2. Elasticsearch源碼分析-搜索分析(一)
  3. Elasticsearch源碼分析-搜索分析(二)
  4. Elasticsearch源碼分析-搜索分析(三)
  5. Elasticsearch 通信模塊的分析
  6. Elasticsearch 網絡通信線程分析