- 一、简介
- 1.1、文档操作的定义
- 1.2、可选参数
- 二、Index/Bulk 基本流程
- 三、Index/Bulk详细流程
- 3.1、协调节点流程
- 3.1.1、参数检查
- 3.1.2、处理pipeline请求
- 3.1.3、自动创建索引
- 3.1.4、对请求的预先处理
- 3.1.5、检测集群状态
- 3.1.6、内容路由,构建基于shard的请求
- 3.1.7、路由算法
- 3.1.8、转发请求并等待响应
- 3.2、主分片节点流程
- 3.2.1、检查请求
- 3.2.2、是否延迟执行
- 3.2.3、判断主分片是否已经发生迁移
- 3.2.4、检测写一致性
- 3.2.5、写 Lucene 和事务日志
- 3.2.6、flush translog
- 3.2.7、写副分片
- 3.2.8、处理副分片写失败情况
- 3.3、副分片节点流程
- 四、I/O 异常处理
- 4.1、Engine 关闭过程
- 4.2、Master的对应处理
- 4.3、异常流程总结
- 五、系统特性
- 六、思考
- 关注我的公众号【宝哥大数据】,更多干货
本章分析ES写入单个和批量文档写请求的处理流程,仅限于ES内部实现,并不涉及Lucene内部处理。在 ES 中,**写入单个文档的请求称为 Index 请求,批量写入的请求称为 Bulk 请求。**写单个和多个文档使用相同的处理逻辑,请求被统一封装为BulkRequest。
在分析写流程时,我们把流程按不同节点执行的操作进行划分。写请求的例子可以参考上一章。
1.1、文档操作的定义=======
在ES中,对文档的操作有下面几种类型:
enum OpType {
INDEX(0),
CREATE(1),
UPDATE(2),
DELETE(3);
}
INDEX
:向索引中“put”一个文档的操作称为“索引”一个文档。此处“索引”为动词。CREATE
:put请求可以通过op_type 参数设置操作类型为create, 在这种操作下,如果文档已存在,则请求将失败。UPDATE
:默认情况下,“put”一个文档时,如果文档已存在,则更新它。DELETE
:删除文档。
在put API中,通过op_type参数来指定操作类型。
1.2、可选参数====
Index API和Bulk API有一些可选参数,这些参数在请求的URI中指定,例如:
PUT my_index/my_type/my_id?pipeline=my\_pipeline\_id
"foo": "bar"
}
下面简单介绍各个参数的作用,这些参数在接下来的流程分析中都会遇到,如下表所示。
新建、索引(这里的索引是动词,指写入操作,将文档添加到Lucene的过程称为索引一个文档)和删除请求都是写操作。写操作必须先在主分片执行成功后才能复制到相关的副分片。
写单个文档的流程(图片来自官网)如下图所示。.
以下是写单个文档所需的步骤:
- 客户端向 NODE1 发送写请求。
- NODE1 使用文档ID来确定文档属于分片0,通过集群状态中的 内容路由表 信息获知 分片0 的主分片位于NODE3,因此请求被转发到NODE3上。
- NODE3上的主分片执行写操作。如果写入成功,则它将请求并行转发到 NODE1 和 NODE2 的副分片上,等待返回结果。当所有的副分片都报告成功,NODE3 将向协调节点报告成功,协调节点再向客户端报告成功。
在客户端收到成功响应时,意味着写操作已经在主分片和所有副分片都执行完成。
写一致性的默认策略是 quorum
,即多数的分片(其中分片副本可以是主分片或副分片)在写入操作时处于可用状态。
quorum = int( (primary + number_of_replicas) / 2 ) + 1
三、Index/Bulk详细流程
以不同角色节点执行的任务整理流程如下图所示。
下面分别讨论各个节点上执行的流程。
3.1、协调节点流程**协调节点负责创建索引、转发请求到主分片节点、等待响应、回复客户端。**实现位于TransportBulkAction
。执行本流程的线程池:http_server_worker
。
如同我们平常设计的任何一个对外服务的接口处理一样,收到用户请求后首先检测请求的合法性,把检查操作放在处理流程的第一步,有问题就直接拒绝,对异常请求的处理代价是最小的。
检查操作进行以下参数检查,如下表所示。
参数检查index不可为空type不可为空 (ES7.x废弃)source不可为空contentType不可为空opТуре当前操作类型如果是创建索引,则校验VersionType 必须为internal, 且Version 不可为MATCH_DELETEDresolvedVersion校验解析的Version是否合法versionType不可为FORCE类型,此类型已废弃id | 非空时,长度不可大于512,以及为空时对versionType和resolvedVersion的检查
每项检查遇到异常都会拒绝当前请求。
3.1.2、处理pipeline请求数据预处理(ingest)工作通过定义pipeline和processors实现。pipeline 是一系 列processors的定义,processors 按照声明的顺序执行。添加一个pipeline的简单例子如下:
PUT _ingest/pipeline/my_pipeline_id
{
"description" : "describe pipeline",
"processors" : [
"set" : {
"field": "foo",
"value": "bar"
}
]
}
my_pipeline_id是自定义的pipeline名称,processors 中定义了一系列的处理器,本例中只有set。
如果Index或Bulk请求中指定了pipeline 参数,则先使用相应的pipeline 进行处理。如果本节点不具备预处理资格,则将请求随机转发到其他具备预处理资格的节点。 预处理节点资格的配置参考第1章中的节点角色。
3.1.3、自动创建索引如果配置为允许自动创建索引( 默认允许),则计算请求中涉及的索引,可能有多个,其中有哪些索引是不存在的,然后创建它。 如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败。其他索引正常执行写流程。
创建索引请求被发送到Master节点,待收到全部创建请求的Response(无论成功还是失败的) 之后,才进入下一个流程。
Master节点什么时候返回Response? 在 Master 节点执行完创建索引流程,将新的 clusterState 发布完毕才会返回。
那什么才算发布完毕呢? 默认情况下,Master 发布 clusterState 的 Reques t收到半数以上的节点Response,认为发布成功。** 负责写数据的节点会先执行一遍内容路由的过程以处理没有收到最新clusterState的情况。
简化的实现如下:
//遍历所有需要创建的索引
for (String index : autoCreateIndices) {
//发送创建索引请求
createIndex (index,bulkRequest.timeout(),new ActionListener() {
//下面是listener的定义
//收到执行成功响应
public void onResponse (CreateIndexResponse result) {
//将计数器递减,计数器的值为需要创建的索引数量
if (counter.decrementAndGet()== 0) {
//全部创建完毕时执行后面的流程,参数省略
executeBulk(. ..) ;
}
//收到失败的响应
public void onFailure (Exception e) {
//将创建失败索引对应的请求置空
for (int i = 0; i = 1;
} else {
return activeShardCount >= value;
}
}
3.2.5、写 Lucene 和事务日志
遍历请求,处理动态更新字段映射,然后调用InternalEngine#index
逐条对doc进行索引。
Engine 封装了 Lucene 和 translog 的调用,对外提供读写接口。在写入Lucene
之前,先生成Sequence Number
和Version
。这些都是在 InternalEngine 类中实现的。Sequence Number
每次递增1,Version
根据当前doc
的最大版本加1。
索引过程为先写Lucene
,后写translog
。因为Lucene
写入时对数据有检查,写操作可能会失败。如果先写translog
, 写入Lucene
时失败,则还需要对translog
进行回滚处理。
根据配置的translog flush
策略进行刷盘控制,定时或立即刷盘。
private void maybeSyncTranslog (final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard. getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint () decPendingAndFinishIfNeeded());
向Master发送 shardFailed 请求:
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
然后Master
会更新集群状态,在新的集群状态中,这个shard
将:
- 从
in_sync_allocations
列表中删除(同步分片标识); - 在
routing_table
的shard
列表中將state
由STARTED
更改カUNASSIGNED
; - 添加到
routingNodes
的unassignedShards
列表;
执行本流程的线程池: bulk。执行与主分片基本相同的写doc过程,写完毕后回复主分片节点。
protected void doRun() throws Exception {
setPhase (task, "replica");
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
//检查AllocationId
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new Sha rdNotFoundException() ;
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor);
}
在副分片的写入过程中,参数检查的实现与主分片略有不同,最终都调用IndexShard-OperationPermits#acquire
判断是否需要delay,继续后面的写流程。
在一个shard上执行的一些操作可能会产生I/O异常之类的情况。一个shard上的CRUD等操作在ES里由一个Engine对象封装,在Engine处理过程中,部分操作产生的部分异常ES会认为有必要关闭此Engine,上报Master。例如,系统I/O层面的写入失败,这可能意味着磁盘损坏。
对Engine异常的捕获目前主要通过IOException实现。例如,索引文档过程中的异常处理:
try {
//索引文档到Lucene
indexResult = indexIntoLucene (index, plan) ;
} catch (RuntimeException| IOException e){
try {
maybeFailEngine ("index", e) ;
} catch (Exception inner) {
e. addSuppressed (inner) ;
}
throw e;
}
Engine类中的maybeFailEngine()
负责检查是否应当关闭引擎failEngine()
。可能会触发maybeFailEngine()的操作如下表所示。
注意:其中不包含get操作,也就是说,读取doc失败不会触发shard迁移。
4.1、Engine 关闭过程将Lucene标记为异常,简化的实现如下:
public void failEngine (String reason, @Nullable Exception failure) {
failedEngine.set( (failure != null) ? failure : new IllegalSta teException (reason));
store.markStoreCorrupted (new IOException("failed engine ( reason:\["+ reason +"\]) ", failure));
}
关闭shard,然后汇报给Master:
private void fai lAndRemoveShard(...) {
// 关闭shard
indexService.removeShard (shardRouting.shardId().id(), message);
//向Master节点发送SHARD_ FAILED_ ACTION_ NAME请求
sendFailShard(shardRouting, message, failure, state);
}
4.2、Master的对应处理
收到节点的SHARD_FAILED_ACTION_NAME
消息后,Master 通过 reroute 将失败的 shard 通过 reroute 迁移到新的节点,并更新集群状态。
- 如果请求在协调节点的路由阶段失败,则会等待集群状态更新,拿到更新后,进行重试,如果再次失败,则仍旧等集群状态更新,直到超时1分钟为止。超时后仍失败则进行整体请求失败处理。
- 在主分片写入过程中,写入是阻塞的。只有写入成功,才会发起写副本请求。如果主shard写失败,则整个请求被认为处理失败。如果有部分副本写失败,则整个请求被认为处理成功。
- 无论主分片还是副分片,当写一个doc失败时,集群不会重试,而是关闭本地shard,然后向Master汇报,删除是以shard为单位的。
ES本身也是一个分布式存储系统,如同其他分布式系统-样,我们经常关注的一些特性如下。
- 数据可靠性: 通过分片副本和事务日志机制保障数据安全。
- 服务可用性: 在可用性和一致性的取舍方面,默认情况下ES更倾向于可用性,只要主分片可用即可执行写入操作。
- 一致性: 笔者认为是弱一致性。只要主分片写成功,数据就可能被读取。因此读取操作在主分片和副分片上可能会得到不同结果。
- 原子性: 索引的读写、别名更新是原子操作,不会出现中间状态。但bulk不是原子操作,不能用来实现事务。
- 扩展性: 主副分片都可以承担读请求,分担系统负载。
分析完写入流程后,也许读者已经意识到了这个过程的一些缺点:
- 副分片写入过程需要重新生成索引,不能单纯复制数据,浪费计算能力,影响入库速度。
- 磁盘管理能力较差,对坏盘检查和容忍性比HDFS差不少。例如,在配置多磁盘路径的情况下,有一块坏盘就无法启动节点。