您当前的位置: 首页 >  ar

衣舞晨风

暂无认证

  • 2浏览

    0关注

    1156博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Elasticsearch源码】 检索分析

衣舞晨风 发布时间:2022-03-16 17:09:41 ,浏览量:2

带着疑问学源码,第二篇:Elasticsearch 搜索 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于检索流程疑惑的点: 当索引是按照日期拆分之后,在使用-* 检索,会不会通过索引层面的时间配置直接跳过无关索引?使用*会对性能造成多大的影响?

源码分析

第二部分是代码分析的过程,不想看的朋友可以跳过直接看第三部分总结。

分析的话,咱们就以_search操作为主线。

在RestSearchAction可以看到:

  • 路由注册
  • 请求参数转换

真正执行的是TransportSearchAction,类图如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JdxNHrba-1647421734887)(/images/elasticsearch-search-source-code-analysis/TransportSearchAction.png)]

TransportSearchAction doExecute =>
// executeRequest中会判断是local请求还是remote请求
// local请求会执行executeLocalSearch,在executeLocalSearch中会将remote相关参数置空,然后在调用executeSearch
// remote请求会执行executeSearch
TransportSearchAction executeRequest =>
TransportSearchAction executeLocalSearch|executeSearch =>
// executeSearch会合并remoteShardIterators(跨集群访问)与localShardIterators得到shardIterators
// 校验shard数是否超限
TransportSearchAction executeSearch =>

下面先看一下:

    private void executeRequest(Task task, SearchRequest searchRequest,
                                SearchAsyncActionProvider searchAsyncActionProvider, ActionListener listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
            new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            final SearchContextId searchContext;
            final Map remoteClusterIndices;
            if (searchRequest.pointInTimeBuilder() != null) {
                searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
                remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
            } else {
                searchContext = null;
                remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices());
            }
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            final ClusterState clusterState = clusterService.state();
            if (remoteClusterIndices.isEmpty()) {
                executeLocalSearch(
                    task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
            } else {
                // 对应 ccs_minimize_roundtrips
                // https://www.elastic.co/guide/en/elasticsearch/reference/7.9/modules-cross-cluster-search.html
                if (shouldMinimizeRoundtrips(searchRequest)) {
                    final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
                    ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
                        searchService.aggReduceContextBuilder(searchRequest),
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(
                            task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    // 针对每个集群将搜索请求发送出去,
                    // 目标集群TransportSearchAction收到请求调用doExecute方法处理
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                final BiFunction clusterNodeLookup =
                                    getRemoteClusterNodeLookup(searchShardsResponses);
                                final Map remoteAliasFilters;
                                final List remoteShardIterators;
                                if (searchContext != null) {
                                    remoteAliasFilters = searchContext.aliasFilter();
                                    remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
                                        searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
                                } else {
                                    remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
                                    remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
                                        remoteAliasFilters);
                                }
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
                                    clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
                                    searchContext, searchAsyncActionProvider);
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }

下面再看一下executeSearch:

 private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, List remoteShardIterators,
                               BiFunction remoteConnections, ClusterState clusterState,
                               Map remoteAliasMap, ActionListener listener,
                               SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext,
                               SearchAsyncActionProvider searchAsyncActionProvider) {
        // red状态也可以查询
        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

        // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
        // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
        // of just for the _search api
        final List localShardIterators;
        final Map aliasFilter;

        final String[] concreteLocalIndices;
        if (searchContext != null) {
            assert searchRequest.pointInTimeBuilder() != null;
            aliasFilter = searchContext.aliasFilter();
            concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
            localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
                searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
        } else {
            final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
            Map routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
                searchRequest.indices());
            routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
            concreteLocalIndices = new String[indices.length];
            for (int i = 0; i < indices.length; i++) {
                concreteLocalIndices[i] = indices[i].getName();
            }
            Map nodeSearchCounts = searchTransportService.getPendingSearchRequests();
            GroupShardsIterator localShardRoutings = clusterService.operationRouting().searchShards(clusterState,
                concreteLocalIndices, routingMap, searchRequest.preference(),
                searchService.getResponseCollectorService(), nodeSearchCounts);
            localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
                .map(it -> new SearchShardIterator(
                    searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
                .collect(Collectors.toList());
            aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
        }
        final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);

        failIfOverShardCountLimit(clusterService, shardIterators.size());

        Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

        // optimize search type for cases where there is only one shard group to search on
        if (shardIterators.size() == 1) {
            // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
            searchRequest.searchType(QUERY_THEN_FETCH);
        }
        if (searchRequest.allowPartialSearchResults() == null) {
            // No user preference defined in search request - apply cluster service default
            searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
        }
        if (searchRequest.isSuggestOnly()) {
            // disable request cache if we have only suggest
            searchRequest.requestCache(false);
            switch (searchRequest.searchType()) {
                case DFS_QUERY_THEN_FETCH:
                    // convert to Q_T_F if we have only suggest
                    searchRequest.searchType(QUERY_THEN_FETCH);
                    break;
            }
        }
        final DiscoveryNodes nodes = clusterState.nodes();
        BiFunction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
            nodes::get, remoteConnections, searchTransportService::getConnection);
        final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
        // 判断是否需要在查询前做目标分片过滤
        // pre_filter_shard_size
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
        // shouldPreFilterSearchShards,判断为true需要同时满足3个条件:
		// 1、查询类型为QUERY_THEN_FETCH
        // 2、是否能通过查询重写预判出查询结果为空或者有字段排序
        // 3、实际的查询分片数量> preFilterShardSize(默认128)
        // 需要注意的是:
        // pre-filter 最主要的作用不是降低查询延迟,而是 pre-filter 阶段可以不占用search theadpool,减少了这个线程池的占用情况。
        final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
            localShardIterators.size() + remoteShardIterators.size());
        // 调用searchAsyncAction进行异步搜索,search操作是由action的start方法来处理的。
        searchAsyncActionProvider.asyncSearchAction(
            task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
            Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener,
            preFilterSearchShards, threadPool, clusters).start();
    }

在看searchAsyncAction之前先看一下AbstractSearchAsyncAction的继承及实现类: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f2EbF9Qb-1647421734890)(/images/elasticsearch-search-source-code-analysis/AbstractSearchAsyncAction.png)]

searchAsyncAction主要是生成查询的请求,也就是AbstractSearchAsyncAction的实例:

private AbstractSearchAsyncAction            
关注
打赏
1647422595
查看更多评论
0.0546s