title: ElasticSearch之深度应用及原理剖析author: Xonitags:
- 搜索引擎
- Elasticsearchcategories:
- 搜索引擎
- Elasticsearchabbrlink: 5a1f6e0b
在电商场景下,工作流程为:
- 读取商品信息,包括库存数量
- 用户下单购买
- 更新商品信息,将库存数减一如果是多线程操作,就可能有多个线程并发的去执行上述的3步骤流程,假如此时有两个人都来读取商品数据,两个线程并发的服务于两个人,同时在进行商品库存数据的修改。假设库存为100件 正确的情况:线程A将库存-1,设置为99件,线程B接着读取99件,再-1,变为98件。如果A,B线程都读取的为100件,A处理完之后修改为99件,B处理完之后再次修改为99件,此时结果就出错了。
顾名思义,就是很悲观,每次去拿数据的时候都认为被人会修改,所以每次拿数据的时候都会加锁,以防别人修改,直到操作完成后,才会被别人执行。常见的关系型数据库,就用到了很多这样的机制,如行锁,表锁,写锁,都是在操作之前加锁。悲观锁的优点:方便,直接加锁,对外透明,不需要额外的操作。悲观锁的缺点:并发能力低,同一时间只能有一个操作。
乐观锁不加锁,每个线程都可以任意操作。比如每条文档中有一个version字段,新建文档后为1,修改一次累加,线程A,B同时读取到数据,version=1,A处理完之后库存为99,在写入es的时候会跟es中的版本号比较,都是1,则写入成功,version=2,B处理完之后也为99,存入es时与es中的数据的版本号。与version=2相比,明显不同,此时不会用99去更新,而是重新读取最新的数据,再减一,变为98,执行上述操作写入。
Elasticsearch的后台都是多线程异步的,多个请求之间是乱序的,可能后修改的先到,先修改的后到。Elasticsearch的多线程异步并发修改是基于自己的**_version版本号**进行乐观锁并发控制的。在后修改的先到时,比较版本号,版本号相同修改可以成功,而当先修改的后到时,也会比较一下_version版本号,如果不相等就再次读取新的数据修改。这样结果会就会保存为一个正确状态。删除操作也会对这条数据的版本号加1。在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1。
3.2.4 es的乐观锁并发控制示例先新建一条数据
PUT /test_index/_doc/4
{
"test_field": "test"
}
模拟两个客户端,都获取到了同一条数据
GET /test_index/_doc/4
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "4",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"test_field" : "test"
}
}
其中一个客户端,先更新了一下这个数据, 同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号(_seq_no)是相同的,才能修改
PUT /test_index/_doc/4
{
"test_field": "client1 changed"
}
返回结果
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "4",
"_version" : 2,
"result" :"updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1
}
另外一个客户端,尝试基于version=1的数据去进行修改,同样带上(if_seq_no和if_primary_term)version版本号,进行乐观锁的并发控制
PUT /test_index/_doc/4?if_seq_no=0&if_primary_term=1
{
"test_field": "client2 changed"
}
会出错,返回
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[4]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
"index_uuid": "RLebBAGvR9iWdUoi3t5bXw",
"shard": "0",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[4]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",
"index_uuid": "RLebBAGvR9iWdUoi3t5bXw",
"shard": "0",
"index": "test_index"
},
"status": 409
}
乐观锁就成功阻止并发问题在乐观锁成功阻止并发问题之后,尝试正确的完成更新重新进行GET请求,得到 version
GET /test_index/_doc/4
结果
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "4",
"_version" : 2,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"test_field" : "client1 changed"
}
}
基于最新的数据和版本号(以前是version 现在是if_seq_no和if_primary_term ),去进行修改,修改后,带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
PUT /test_index/_doc/4?if_seq_no=1&if_primary_term=1
{
"test_field": "client2 changed"
}
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "4",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 2,
"_primary_term" : 1
}
3.2.5 基于external version进行乐观锁并发控制
es提供了一个feature,就是说,你可以不用它提供的内部_version
版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。
?version=1&version_type=external
区别在于,_version_方式,只有当你提供的_version_与_es_中的version一模一样的时候,才可以进行修改,只要不一样,就报错;当version_type=external的时候,只有当你提供的version比es中的``_version大的时候,才能完成修改。
es,if_seq_no=0&if_primary_term=1和 文档中的值相等 才能更新成功。
es,_version=1,?version>1&version_type=external,才能成功,比如说
?version=2&version_type=external`。
代码示例:先创建一条数据
PUT /test_index/_doc/5
{
"test_field": "external test"
}
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "5",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 3,
"_primary_term" : 1
}
模拟两个客户端同时查询到这条数据
GET /test_index/_doc/5
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "5",
"_version" : 1,
"_seq_no" : 3,
"_primary_term" : 1,
"found" : true,
"_source" : {
"test_field" : "external test"
}
}
第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2
PUT /test_index/_doc/5?version=2&version_type=external
{
"test_field": "external client1 changed"
}
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "5",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 4,
"_primary_term" : 1
}
模拟第二个客户端,同时拿到了自己数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
PUT /test_index/_doc/5?version=2&version_type=external
{
"test_field": "external client2 changed"
}
会出错,返回
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[5]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "RLebBAGvR9iWdUoi3t5bXw",
"shard": "0",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[5]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "RLebBAGvR9iWdUoi3t5bXw",
"shard": "0",
"index": "test_index"
},
"status": 409
}
在并发控制成功后,重新基于最新的版本号发起更新
GET /test_index/_doc/5
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "5",
"_version" : 2,
"_seq_no" : 4,
"_primary_term" : 1,
"found" : true,
"_source" : {
"test_field" : "external client1 changed"
}
}
PUT /test_index/_doc/5?version=3&version_type=external
{
"test_field": "external client2 changed"
}
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "5",
"_version" : 3,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 5,
"_primary_term" : 1
}