本专栏系列基于 elasticsearch-7.8.0 版本分析 陆续打算推出的博文列表如下:
1、创建索引底层源码分析 2、更新索引底层源码分析 3、删除索引底层源码分析 4、elasticsearch中的线程池实现 5、elasticsearch启动过程源码流程分析 6、high level rest client请求流程分析 7、elastic search数据副本模型、读写模型 8、lucene学习总结系列
- 全文检索的基本原理
- lucene的总体结构
- lucene的索引文件格式
- lucene的打分机制
- lucene的搜索过程解析
本文记录ElasticSearch-7.8.0创建索引源码执行流程,从源码角度看一下创建索引的底层实现和涉及到的服务,比如AllocationService、MasterService、IndexService等,以及这些服务的相关操作,若有不合理的地方,希望各位看官能够不吝赐教。
- 场景1:创建索引,没有写入文档。
curl -X PUT "localhost:9200/cool3" -d '{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}'
总体流程图
总述:客户端提交创建索引的基本信息(索引名称、分区数、副本数等),提交到服务端,服务端将CreateIndexRequest封装成CreateIndexClusterStateUpdateRequest。根据actionName获得具体响应的action,具体入口是
org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
拿到响应action后调用实例属性MetadataCreateIndexService#createIndex()进入到onlyCreateIndex(),该实例属性的作用是:负责提交创建索引请求的服务;所以创建索引大致可以分为以下几个主要流程:
1)master节点发起集群状态更新任务创建索引会改变 当前集群状态——ClusterState,集群状态只能在主节点上更新,所以onlyCreateIndex方法进来后,就会由ClusterService调起MasterService,并在master节点上发起:提交一批集群状态更新任务——MasterService#submitStateUpdateTasks;经过一系列调用,在master节点运行创建索引任务MasterService#runTasks,再进入到具体的创建索引逻辑,具体调用栈如下
1、MetadataCreateIndexService#onlyCreateIndex clusterService.submitStateUpdateTask({...}) // 发起提交集群状态更新任务 2、ClusterService#submitStateUpdateTask submitStateUpdateTask(source, updateTask, ...); 3、ClusterService#submitStateUpdateTask submitStateUpdateTasks(source, ...); 4、ClusterService#submitStateUpdateTask masterService.submitStateUpdateTasks(source, tasks, config, executor); 5、MasterService#submitStateUpdateTasks taskBatcher.submitTasks(safeTasks, config.timeout()); 6、TaskBatcher#submitTasks threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout)); 7、PrioritizedEsThreadPoolExecutor#execute execute(command); // 提交创建索引任务 8、EsThreadPoolExecutor#execute super.execute(command); 9、java.util.concurrent.ThreadPoolExecutor#execute java.util.concurrent.ThreadPoolExecutor.Worker#run runWorker(Worker w) 10、java.util.concurrent.ThreadPoolExecutor#runWorker task.run(); 10、TaskBatcher.BatchedTask#run // 创建索引、更新集群的状态信息是在Runnable#run()中,也就是TaskBatcher.BatchedTask#run方法中,这个方法继承了java类的Runnable接口 runIfNotProcessed(this); 11、TaskBatcher#runIfNotProcessed run(updateTask.batchingKey, toExecute, tasksSummary); // 最终回到run方法 // MasterService.Batcher#run实现了上面TaskBatcher#run 12、MasterService.Batcher#run runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); 13、MasterService#runTasks final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState); 14、MasterService#calculateTaskOutputs // 输入创建索引任务,输出集群状态变化结果 ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, previousClusterState); 15、MasterService#executeTasks clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); 16、ClusterStateTaskExecutor#execute ClusterState result = execute(currentState); // 抽象类ClusterStateUpdateTask实现了ClusterStateTaskExecutor接口 17、ClusterStateUpdateTask#execute // 该方法:根据当前状态更新群集状态。如果不应更改任何状态,则返回*相同实例*。 18、clusterService.submitStateUpdateTask({ new AckedClusterStateUpdateTask(...) { @Override public ClusterState execute(ClusterState currentState) throws Exception { return applyCreateIndexRequest(currentState, request, false); } } }) // AckedClusterStateUpdateTask继承ClusterStateUpdateTask类且实现AckedClusterStateTaskListener接口 // 回到步骤1、形成闭环;进入创建索引具体流程:applyCreateIndexRequest,详见调用栈-2 ......
调用栈-1
调用栈-1——类继承关系
2)匹配索引模板启动IndexService创建索引需要通过IndexModule初始化一个IndexService,IndexService主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。下面是调用栈
1、MetadataCreateIndexService#onlyCreateIndex return applyCreateIndexRequest(currentState, request, false); // 进入创建索引逻辑 2、MetadataCreateIndexService#applyCreateIndexRequest return applyCreateIndexRequest(currentState, request, silent, null); 3、MetadataCreateIndexService#applyCreateIndexRequest validate(request, currentState); // 索引相关检验 return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer); 4、MetadataCreateIndexService#applyCreateIndexRequestWithV1Templates return applyCreateIndexWithTemporaryService(...) 5、MetadataCreateIndexService#applyCreateIndexWithTemporaryService return indicesService.withTempIndexService(...) 6、IndicesService#withTempIndexService // final IndexService indexService = createIndexService(...) // 创建IndexService入口 7、IndicesService#createIndexService final IndexModule indexModule = new IndexModule(...) // 创建IndexModule ... // 为创建索引添加各种监听器 return indexModule.newIndexService(...) // 新建IndexService 8、IndexModule#newIndexService final IndexService indexService = new IndexService(...) 9、IndicesService#withTempIndexService // 回到步骤6、创建IndexService入口,形成闭环 return indexServiceConsumer.apply(indexService); 10、MetadataCreateIndexService#applyCreateIndexWithTemporaryService return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer); // 步骤6,进入具体创建索引操作 11、AllocationService#reroute reroute(allocation); --> shardsAllocator.allocate(allocation); 12、BalancedShardsAllocator#allocate balancer.allocateUnassigned(); // 分配未分配的分片 balancer.moveShards(); // 重新定位无法再留在节点上的分片 balancer.balance(); // 重新分配分片,以便分片在集群中平衡 13、回到调用栈-1的步骤13、,形成闭环
调用栈-2
3)创建索引将索引创建到集群状态中即生成新的cluster state,下面是创建成功后的日志
[es-source-node] [cool3] creating index, cause [api], templates [], shards [3]/[2], mappings []
下面是创建索引的具体实现
static ClusterState clusterStateCreateIndex(ClusterState currentState, Set clusterBlocks, IndexMetadata indexMetadata,
BiFunction rerouteRoutingTable,
BiConsumer metadataTransformer) {
Metadata.Builder builder = Metadata.builder(currentState.metadata())
.put(indexMetadata, false);
if (metadataTransformer != null) {
metadataTransformer.accept(builder, indexMetadata);
}
Metadata newMetadata = builder.build();
String indexName = indexMetadata.getIndex().getName();
ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks);
blocks.updateBlocks(indexMetadata);
ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metadata().index(indexName));
updatedState = ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build();
// 如果index状态open,执行allocationService.reroute 将分片分配到其他节点
return rerouteRoutingTable.apply(updatedState, "index [" + indexName + "] created");
}
这里详细说下cluster state:cluster state是全局性信息,包含了整个群集中所有分片的元信息(规则, 位置, 大小等信息),并保持每个每节的信息同步;下面的说明4、中,需要关注如何实现发布整体还是发布差异到具体的节点上,后续再分析
1、表示集群的当前状态。 2、除了{@link RoutingNodes}结构之外,cluster state对象是不可变的,该结构是根据{@link RoutingTable}的需求构建的。 3、集群状态只能在主节点上更新。所有更新都在单个线程上执行,并由{@link ClusterService}控制。在每次更新之后,{@link Discovery#publish}方法将集群状态的新版本发布给集群中的所有其他节点。实际的发布机制被委托给{@link Discovery#publish}方法,并取决于发现的类型。 4、集群状态实现{@link Diffable}接口,以支持发布集群状态差异的部分,而不是每次更改时的整个状态。如果某个节点处于集群状态的早期版本中,则发布机制只应将差异发送给该节点。如果某个节点在集群状态的早期版本中不存在,比如新加入的节点,则此节点不太可能具有早期集群状态版本,这时候就应发送完整版本。为了确保差异应用于集群状态的正确版本,每个集群状态版本更新都会生成{@link#stateUUID},唯一地标识该版本的状态。该uuid由{@link ClusterStateDiff#apply}方法验证,以确保应用了正确的差异。如果UUID不匹配,{@link ClusterStateDiff#apply}方法抛出{@link CompatibleClusterStateVersionException},这会导致发布机制将集群状态的完整版本发送到引发此异常的节点。
上面创建完索引后,也就会生成一个新的cluster state,集群状态 cluster state包含更新的路由。接着就会回调到AllocationService#reroute,执行分片分配。
4)AllocationService负责分片路由,将创建成功后的索引分配到分片上分片分配的核心逻辑是根据分片规则&分片权重(index、cluster)进行位置判断,然后进行数据移动、移动结束初始化启动、最后调整clusterstate完成分配。
AllocationService这个服务负责管理集群的节点分配,包括选择节点用于shard allocation——分片分配,管理新加入集群的新节点和分片的重新路由,他的触发条件有:
- 新增或删除
index
索引 node
节点的新增或删除- 执行
reroute
命令 - 修改
replica
副本数量 - 集群重启
此时我们的场景就是 新建索引 ;分片路由的具体实现逻辑入口如下:AllocationService#reroute
/**
* 在存活的节点中重新路由 路由表
* 如果返回了相同的ClusterState实例,则不会进行任何更改。
*/
public ClusterState reroute(ClusterState clusterState, String reason) {
// 1、检查复制副本是否具有需要调整的自动扩展功能。如果需要更改,则返回更新的群集状态;如果不需要更改,则返回相同的群集状态。
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
// 2、创建一个{@link RoutingNodes}。这是一个开销很大的操作,因此只能调用一次!
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
// 3、洗牌未分配的节点
routingNodes.unassigned().shuffle();
/**
* Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;
* allocator 寻找最优的节点来分配分片;
* allocator 负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator 的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;
* deciders 负责判断并决定是否要进行分配;
* deciders 依次遍历 allocator 给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
*/
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
// 关注这个方法
reroute(allocation);
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}
shard allocation, 是一个将分片分配到节点的过程;可能发生该操作的过程包括:
- 初始恢复(
initial recovery
) - 副本分配(
replica allocation
) - 重新平衡(
rebalance
) - 节点的新增和删除
分片的分配操作, 是由 master 节点来决定什么时候移动分片, 以及移动到哪个节点上, 以达到集群的均衡,详见——官方文档
哪些分片应该分配到哪些节点上?哪个分片作为主分片, 哪个作为副本分片? Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;
- allocator 寻找最优的节点来分配分片
- deciders 负责判断并决定是否要进行分配
① 对于新建的索引:
- allocator的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;deciders依次遍历allocator给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
② 已有的索引
- allocator 对于主分片, 只允许把主分片指定在已经拥有该分片完整数据的节点上; 对于副本分片, 则是先判断其他节点上是否已有该分片的数据的拷贝, 如果有这样的节点, allocator 则优先把分片分配到这其中一个节点上;
执行真正reroute逻辑,如果有节点没有分配shard,则执行gatewayAllocator.allocateUnassigned。关于gatewayAllocator的分配主要分为primaryShardAllocator和replicaShardAllocator:
primaryShardAllocator.allocateUnassigned(allocation);
replicaShardAllocator.processExistingRecoveries(allocation);
replicaShardAllocator.allocateUnassigned(allocation);
执行数据分片分配BalancedShardsAllocator.allocate(allocation)。该类基于WeightFunction重新分配集群节点node持有shard的分配关系。allocate方法主要分三步:
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
①:allocateUnassigned,根据WeightFunction算法和所有AllocationDecider把所有给定的shard分配一个最小化匹配的node ②:moveShards,根据第一步的结果对需要移动的节点进行移动,移动过程中为RELOCATING,移动过去初始化INITIALIZING ③:负载均衡,rebalance其实是从负载高的node向负载低的做转移。
5)分发集群状态:publish()分片分配结束后,再由主节点调用相关服务将集群状态cluster state分发到集群的其他节点上,完成集群状态同步;主节点将集群状态分发到集群中的其他节点上,具体调用栈详见该入口方法,在调用栈-1的步骤13、后的代码中
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info("{}, term: {}, version: {}, delta: {}",
summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);
}
}
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(summary, publicationStartTime, newClusterState, e);
}
6)监听器监听创建结果
等待集群中Active shards恢复到指定数目或者超时返回,将结果返回客户端
* Creates an index in the cluster state and waits for the specified number of shard copies to * become active (as specified in {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()}) * before sending the response on the listener.
默认情况下:只要Primary Shard是Active的,也就是wait_for_active_shards指定的分片数量(默认为1),就可以创建索引。这里有两个获取结果的方式,即isAcknowledged()和shardsAcknowledged(),如果cluster state创建成功,isAcknowledged()会返回true(然后等待shardsAcknowledged,如果超时,shardsAcknowledged返回false),否则返回false(不会等待已经started的分片,isShardsAcknowledged也会返回false)。如果Active shards未达到指定的数目,则创建索引请求会阻塞,直到集群中Active shards恢复到指定数目或者超时返回。可参考:ActiveShardsObserver#waitForActiveShards(...)方法
clusterService.submitStateUpdateTask(
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
// 2、clusterState更新返回
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// 1、适配创建索引请求
return applyCreateIndexRequest(currentState, request, false);
}
@Override
public void onFailure(String source, Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
logger.trace(() -> new ParameterizedMessage("[{}] failed to create", request.index()), e);
} else {
logger.debug(() -> new ParameterizedMessage("[{}] failed to create", request.index()), e);
}
super.onFailure(source, e);
}
});
onlyCreateIndex(request, ActionListener.wrap(response -> {
// 检查isAcknowledged
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
shardsAcknowledged -> {
// 检查shardsAcknowledged;
if (shardsAcknowledged == false) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", request.index());
}
listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
}, listener::onFailure);
} else {
listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));
索引创建成功后,客户端响应结果如下,至此,索引创建流程结束。
{ "acknowledged": true, "shards_acknowledged": true, "index": "cool3" }
7)针对创建索引调用栈中的实现细节做一些解释说明创建索引过程中,如果索引名不规范、或者创建配置不合理,都会在创建过程中进行校验,并且返回对应的报错信息
比如 调用栈-2 中的 索引相关校验
/**
* Validate the name for an index or alias against some static rules.
*/
public static void validateIndexOrAliasName(String index, BiFunction exceptionCtor) {
if (!Strings.validFileName(index)) {
// ('\\', '/', '*', '?', '"', '', '|', ' ', ','))
throw exceptionCtor.apply(index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS);
}
if (index.contains("#")) {
throw exceptionCtor.apply(index, "must not contain '#'");
}
if (index.contains(":")) {
throw exceptionCtor.apply(index, "must not contain ':'");
}
if (index.charAt(0) == '_' || index.charAt(0) == '-' || index.charAt(0) == '+') {
throw exceptionCtor.apply(index, "must not start with '_', '-', or '+'");
}
int byteCount = 0;
try {
byteCount = index.getBytes("UTF-8").length;
} catch (UnsupportedEncodingException e) {
// UTF-8 should always be supported, but rethrow this if it is not for some reason
throw new ElasticsearchException("Unable to determine length of index name", e);
}
if (byteCount > MAX_INDEX_NAME_BYTES) {
throw exceptionCtor.apply(index, "index name is too long, (" + byteCount + " > " + MAX_INDEX_NAME_BYTES + ")");
}
if (index.equals(".") || index.equals("..")) {
throw exceptionCtor.apply(index, "must not be '.' or '..'");
}
}
- 校验索引名--validateIndexName(),
- )索引名必须小写 -- must be lowercase
- )路由表是否已经存在该索引;!抛出索引已经存在异常 -- index {} already exists
- )元数据表是否已经存在该索引;同上
- )别名是否存在;!抛出别名已经存在异常 -- already exists as alias
/** * Validate the name for an index against some static rules and a cluster state. */ public void validateIndexName(String index, ClusterState state) { validateIndexOrAliasName(index, InvalidIndexNameException::new); if (!index.toLowerCase(Locale.ROOT).equals(index)) { throw new InvalidIndexNameException(index, "must be lowercase"); } // NOTE: dot-prefixed index names are validated after template application, not here if (state.routingTable().hasIndex(index)) { throw new ResourceAlreadyExistsException(state.routingTable().index(index).getIndex()); } if (state.metadata().hasIndex(index)) { throw new ResourceAlreadyExistsException(state.metadata().index(index).getIndex()); } if (state.metadata().hasAlias(index)) { throw new InvalidIndexNameException(index, "already exists as alias"); } }
- 校验settings是否正常--validateIndexSettings()
- )index.data_path及path.shared_data的设置是否合理(验证配置的索引数据路径(如果有)是否是配置的共享数据路径(如果有)的子路径)
- )参数值(非ES自身参数,例如index.merge.enabled、index.uuid)是否有key、value
public void validateIndexSettings(String indexName, final Settings settings, final boolean forbidPrivateIndexSettings)
throws IndexCreationException {
List validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
if (validationErrors.isEmpty() == false) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw new IndexCreationException(indexName, validationException);
}
}
3、Ending
至此,整个索引创建逻辑大体上分析结束,可以通过流程图,对照调用栈做分析,相关部分,比如重新路由和状态同步,后续再详细补充,包括创建索引后,具体的数据路径生成原理。
详见Elasticsearch-7.8.0集群方式运行源码,并集成IK分词器