写点什么

Elasticsearch 文档版本冲突原理与解决

用户头像
Skysper
关注
发布于: 2020 年 04 月 22 日

一般我们在更新文档时,主要的操作流程时:读取文档->修改->提交保存。数据中心等保存的都是最新一次提交的内容。

正常来说没什么问题,但是如果两个或更多的请求并发修改同一个文档时,很容易产生冲突。如果按照先后顺序,则最后被处理的请求可能覆盖首先被处理的请求作出的操作和变更,从而导致其数据变更丢失(最后被处理的请求也不一定是最后发起的,取决于其网络传输等因素影响)。

在发生并发冲突的时候,我们有常用的两种策略:

  1. 悲观锁并发策略 在关系性数据库中,通过阻塞并排队的方式,来避免发生冲突,例如在读取数据行时阻塞,来保证正在修改行数据的请求完成正常操作后,以读取到最新的数据。这种方式的前提假设是数据冲突更有可能发生。

  2. 乐观锁并发策略 Elasticsearch中采用的是乐观锁的并发策略,这种方式的前期假设是数据冲突一般不会发生,从而避免阻塞数据请求。然而,在读和写之间,如果数据发生改变,更新就失败了,然后由程序决定如果进行后续的处理。

Elastic中的乐观锁策略

Elasticsearch是分布式的,文档的创建/变更等都会同步到其他节点。由于其异步性和并发的特点,这些同步请求都是并行的,因此并不能保证数据的是按照修改顺序依次到达的。Elasticsearch保证了一个老版本的数据永远无法重写或覆盖更新版本的数据。

在 index get 和 delete请求中,都存在一个 _version 字段。数据的变更均会导致_version 的值增大。Elasticsearch通过该字段来保证小于该值的数据会被忽略掉。

通过数字版本的方式也可以避免ABA的数据问题,即数据A修改为B而后又修改为A,对于应用端来说,数据是没有任何变化的

创建文档:

PUT /website/blog/1/_create
{
"title": "My first blog entry",
"text": "Just trying this out..."
}

获取文档:

GET /website/blog/1
结果:
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"title": "My first blog entry",
"text": "Just trying this out..."
}
}

此时 _version 为1

修改数据:

PUT /website/blog/1?version=1
{
"title": "My first blog entry",
"text": "Starting to get the hang of this..."
}



{
"_index": "website",
"_type": "blog",
"_id": "1",
"_version": 2
"created": false
}

此时操作成功。

如果再版本1的基础我们再次提交修改:

{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
}
],
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
},
"status": 409
}

此时,显示存在版本冲突。此时应用可以据此作出相应的处理,获取最新数据Merge处理亦或其他处理方式。

问题处理

一般的系统中,这种冲突出现的情况比较少。但当我们在实际中存在多个系统可能会对某一数据做更新的情况时,会偶尔出现并发修改的冲突。

org.elasticsearch.index.engine.VersionConflictEngineException: [project][1140860]: version conflict, current version [5] is different than the one provided [4]
at org.elasticsearch.index.engine.InternalEngine.planIndexingAsPrimary(InternalEngine.java:559)
at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:472)
at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:560)
at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:549)
at org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary(TransportShardBulkAction.java:484)
at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:143)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:113)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:69)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:939)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:908)
at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:113)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:322)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:264)
at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:888)
at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:885)
at org.elasticsearch.index.shard.IndexShardOperationsLock.acquire(IndexShardOperationsLock.java:147)
at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationLock(IndexShard.java:1658)
at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryShardReference(TransportReplicationAction.java:897)
at org.elasticsearch.action.support.replication.TransportReplicationAction.access$400(TransportReplicationAction.java:93)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:281)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:260)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:252)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69)
at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:627)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:638)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


此时我们应该在请求上加上retry_on_conflict的参数,在冲突发生时,重试提交数据:

POST /website/pageviews/1/_update?retry_on_conflict=5
{
//some update
}

也可以尝试更多次数,以保证提交能够最终成功,java/python等都有具体的实现,一般时UpdateRequest对象上进行设置(重试情况的前提是两种数据都可以成功写入为最新数据,如文章写作)

发布于: 2020 年 04 月 22 日阅读数: 147
用户头像

Skysper

关注

还未添加个人签名 2016.03.29 加入

还未添加个人简介

评论

发布
暂无评论
Elasticsearch文档版本冲突原理与解决