带着疑问学源码,第二篇:Elasticsearch 搜索 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10.2+
目的在看源码之前先梳理一下,自己对于检索流程疑惑的点: 当索引是按照日期拆分之后,在使用-* 检索,会不会通过索引层面的时间配置直接跳过无关索引?使用*会对性能造成多大的影响?
源码分析第二部分是代码分析的过程,不想看的朋友可以跳过直接看第三部分总结。
分析的话,咱们就以_search操作为主线。
在RestSearchAction可以看到:
- 路由注册
- 请求参数转换
真正执行的是TransportSearchAction,类图如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JdxNHrba-1647421734887)(/images/elasticsearch-search-source-code-analysis/TransportSearchAction.png)]
TransportSearchAction doExecute =>
// executeRequest中会判断是local请求还是remote请求
// local请求会执行executeLocalSearch,在executeLocalSearch中会将remote相关参数置空,然后在调用executeSearch
// remote请求会执行executeSearch
TransportSearchAction executeRequest =>
TransportSearchAction executeLocalSearch|executeSearch =>
// executeSearch会合并remoteShardIterators(跨集群访问)与localShardIterators得到shardIterators
// 校验shard数是否超限
TransportSearchAction executeSearch =>
下面先看一下:
private void executeRequest(Task task, SearchRequest searchRequest,
SearchAsyncActionProvider searchAsyncActionProvider, ActionListener listener) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final SearchContextId searchContext;
final Map remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices());
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final ClusterState clusterState = clusterService.state();
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
} else {
// 对应 ccs_minimize_roundtrips
// https://www.elastic.co/guide/en/elasticsearch/reference/7.9/modules-cross-cluster-search.html
if (shouldMinimizeRoundtrips(searchRequest)) {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
// 针对每个集群将搜索请求发送出去,
// 目标集群TransportSearchAction收到请求调用doExecute方法处理
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
final BiFunction clusterNodeLookup =
getRemoteClusterNodeLookup(searchShardsResponses);
final Map remoteAliasFilters;
final List remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
remoteAliasFilters);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext, searchAsyncActionProvider);
},
listener::onFailure));
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}
下面再看一下executeSearch:
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
OriginalIndices localIndices, List remoteShardIterators,
BiFunction remoteConnections, ClusterState clusterState,
Map remoteAliasMap, ActionListener listener,
SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider) {
// red状态也可以查询
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final List localShardIterators;
final Map aliasFilter;
final String[] concreteLocalIndices;
if (searchContext != null) {
assert searchRequest.pointInTimeBuilder() != null;
aliasFilter = searchContext.aliasFilter();
concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
} else {
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
Map routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
concreteLocalIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteLocalIndices[i] = indices[i].getName();
}
Map nodeSearchCounts = searchTransportService.getPendingSearchRequests();
GroupShardsIterator localShardRoutings = clusterService.operationRouting().searchShards(clusterState,
concreteLocalIndices, routingMap, searchRequest.preference(),
searchService.getResponseCollectorService(), nodeSearchCounts);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
.collect(Collectors.toList());
aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
}
final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
// optimize search type for cases where there is only one shard group to search on
if (shardIterators.size() == 1) {
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
// convert to Q_T_F if we have only suggest
searchRequest.searchType(QUERY_THEN_FETCH);
break;
}
}
final DiscoveryNodes nodes = clusterState.nodes();
BiFunction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
// 判断是否需要在查询前做目标分片过滤
// pre_filter_shard_size
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
// shouldPreFilterSearchShards,判断为true需要同时满足3个条件:
// 1、查询类型为QUERY_THEN_FETCH
// 2、是否能通过查询重写预判出查询结果为空或者有字段排序
// 3、实际的查询分片数量> preFilterShardSize(默认128)
// 需要注意的是:
// pre-filter 最主要的作用不是降低查询延迟,而是 pre-filter 阶段可以不占用search theadpool,减少了这个线程池的占用情况。
final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size());
// 调用searchAsyncAction进行异步搜索,search操作是由action的start方法来处理的。
searchAsyncActionProvider.asyncSearchAction(
task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener,
preFilterSearchShards, threadPool, clusters).start();
}
在看searchAsyncAction之前先看一下AbstractSearchAsyncAction的继承及实现类: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f2EbF9Qb-1647421734890)(/images/elasticsearch-search-source-code-analysis/AbstractSearchAsyncAction.png)]
searchAsyncAction主要是生成查询的请求,也就是AbstractSearchAsyncAction的实例:
private AbstractSearchAsyncAction
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?