写点什么

大数据 -183 Elasticsearch - 并发冲突与乐观锁、分布式数据一致性剖析

作者:武子康
  • 2025-12-14
    山东
  • 本文字数:3612 字

    阅读完需:约 12 分钟

大数据-183 Elasticsearch - 并发冲突与乐观锁、分布式数据一致性剖析

TL;DR

  • 场景:电商扣库存并发更新,读-改-写在多线程/多客户端下发生写覆盖

  • 结论:ES 用乐观并发控制(seq_no + primary_term)拒绝乱序/并发写,冲突返回 409

  • 产出:最小复现请求、409 速查与修复路径、ES5 前后写一致性参数替代对照


版本矩阵


详解并发冲突

在电商场景下,工作流程为:


  • 读取商品信息,包括库存数量

  • 用户下单购买

  • 更新商品信息,将库存数减一


如果是多线程操作,就可能有多个线程并发的去执行上述的 3 步骤流程,假如此时有两个人都来读取商品数据,两个线程并发的服务于两个人,同时在进行商品库存数据的修改,假设库存为 100 件,正确的情况:线程 A 将库存-1,设置为 99 件,线程 B 读取 99 再-1,设置为 98 件。但是如果 A 和 B 都是读取的 99 件,那么后续就会出现数量错误的问题。

解决方案

悲观锁

每次去拿数据的时候认为都会被人修改,所有每次拿数据的时候都会加锁,以防止别人修改,直到操作完成后,再释放锁,才会被别人拿去执行。常见的关系型数据库,就用到了如行锁、表锁、写锁,都是在操作之前的锁。


  • 悲观锁优点:方便直接加锁,对外透明,不需要额外的操作。

  • 悲观锁缺点:并发能力低,同一时间只能有一个操作。


乐观锁

乐观锁不加锁,每个线程都可以任意操作。比如每条文档中都有一个 version 字段,新建文档后为 1,每次修改都进行累加,线程 A 和 B 都拿到 version 为 1,等写入时会和 ES 中的版本号进行比较,如果相等则写入成功,失败则重新读取数据再-1,再进行对比,如果相等则写入成功。


Elasticsearch 的乐观锁

Elasticsearch 的后台处理机制采用多线程异步架构,这种设计虽然提高了系统吞吐量,但也带来了请求处理的乱序问题。在实际操作中,可能会出现以下场景:用户先发送了修改请求 A,紧接着又发送了修改请求 B,但由于网络延迟或线程调度原因,请求 B 可能会先于请求 A 到达服务端进行处理。


为了应对这种并发修改带来的数据一致性问题,Elasticsearch 实现了一套基于版本号(_version)的乐观锁并发控制机制。具体工作原理如下:


  1. 版本号比较机制:

  2. 每个文档都有一个递增的版本号字段

  3. 当后发请求先到达时(请求 B),系统会比较请求携带的版本号与当前文档版本号

  4. 如果版本号匹配(说明没有其他修改),则执行更新并将版本号+1

  5. 当先发请求后到达时(请求 A),由于文档版本号已被请求 B 更新,此时版本号不匹配,系统会拒绝本次修改

  6. 冲突解决流程:

  7. 当修改因版本冲突被拒绝时,系统会自动:

  8. 重新读取文档最新数据(包含最新版本号)

  9. 基于最新数据重新应用修改

  10. 再次尝试提交更新

  11. 这个过程会循环执行,直到修改成功或达到重试上限

  12. 删除操作的特殊处理:

  13. 删除操作也会触发版本号递增(+1)

  14. 删除后文档并非立即物理删除,而是进入"逻辑删除"状态

  15. 这种设计带来两个重要特性:

  16. 版本号信息会被保留一段时间

  17. 重新创建同名文档时,新文档版本号会在删除版本基础上继续递增


应用场景示例:假设有一个商品库存文档(ID:123,version:5,stock:100):


  1. 请求 A(减库存 10):期望 version=5→6,stock=90

  2. 请求 B(减库存 20):期望 version=5→6,stock=80 如果请求 B 先到达:


  • 请求 B 成功(version=6,stock=80)

  • 请求 A 到达时发现 version 已变为 6(与携带的 5 不匹配)

  • 系统重新读取数据(version=6,stock=80)

  • 基于新数据执行减 10 操作(version=7,stock=70)


这种机制确保了在高并发环境下,数据修改的最终一致性,同时避免了传统锁机制带来的性能损耗。

Elasticsearch 的乐观锁测试

新建一条数据:


PUT /wzk_lock_index/_doc/1{  "test_field": "wzkicu"}
复制代码


执行结果如下图所示,可以看到 _version 是 1。



假设我们现在有 A 和 B 两个客户端同时拿到了数据,想要进行更新:


# A 更新PUT /wzk_lock_index/_doc/1{  "test_field": "client1 update"}
复制代码


可以看到执行结果,顺利更新了,_version 变成了 2:



此时 B 在同一时间要进行更新:


# B 更新PUT /wzk_lock_index/_doc/1?if_seq_no=0&if_primary_term=1{  "test_field": "client2 update"}
复制代码


可以看到,此时报错了:


{  "error": {    "root_cause": [      {        "type": "version_conflict_engine_exception",        "reason": "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",        "index_uuid": "BBxoVVVqSw2TxtU-vPd-NA",        "shard": "0",        "index": "wzk_lock_index"      }    ],    "type": "version_conflict_engine_exception",    "reason": "[1]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [1] and primary term [1]",    "index_uuid": "BBxoVVVqSw2TxtU-vPd-NA",    "shard": "0",    "index": "wzk_lock_index"  },  "status": 409}
复制代码


对应的截图如下所示:



这说明我们的乐观锁是生效的,阻止了并发的问题。我们需要重新 GET 请求获取当前的版本信息:


GET /wzk_lock_index/_doc/1
复制代码


获取到当前的 version 是 2、seq_no 是 1,primary_term 是 1:



B 客户端重新发起更新:


# B 再次更新PUT /wzk_lock_index/_doc/1?if_seq_no=1&if_primary_term=1{  "test_field": "client2 update"}
复制代码


我们可以看到执行成功了:


分布式数据一致性

在分布式环境下,一致性指的是多个数据副本是否能保持一致的特性。在一致性条件下,系统在执行数据更新操作之后能够从一致性状态到另一个一致性状态,对系统的一个数据更新成功之后,如果所有用户都能够读取到最新的值,该系统就被认为具有强一致性。

ES5.0 以前的一致性

  • consistency

  • one(primary shard)

  • all(all shard)

  • quorum(default)


我们在发送任何一个增删改查的时候,比如 PUT 时,都可以带上一个 consistency 参数,指明我们想要的写一致性是什么?比如:


PUT /index/indextype/id?consistency=quorum
复制代码

quorum 机制

基本概念

quorum 机制是分布式系统中常用的一致性保障机制,主要用于确保数据写入操作的正确性。在 Elasticsearch 中,quorum 机制用于控制写操作前必须满足的最小可用分片数量要求。

工作原理

写操作执行前,系统需要确认足够数量的分片副本处于可用状态,计算公式如下:


# 当num_of_replicas > 1时才生效int((primary shard + number_of_replicas) / 2) + 1
复制代码

详细说明

  1. 参数解释

  2. primary shard:主分片数量,固定为 1

  3. number_of_replicas:副本分片数量

  4. int():向下取整函数

  5. 生效条件:该机制仅在配置了副本(即number_of_replicas > 1)时才会生效。如果只有主分片(number_of_replicas = 0),则不需要 quorum 检查。

  6. 计算示例

  7. 当有 1 个主分片和 2 个副本时:int((1 + 2)/2) + 1 = 2,即需要至少 2 个分片可用

  8. 当有 1 个主分片和 3 个副本时:int((1 + 3)/2) + 1 = 3,即需要至少 3 个分片可用

应用场景

该机制主要用于以下情况:


  • 写入新文档时

  • 更新现有文档时

  • 执行批量操作时

注意事项

  1. 该机制确保了大多数分片副本可用,从而防止脑裂问题

  2. 如果可用分片数不满足 quorum 要求,写操作将被拒绝并返回错误

  3. 可以通过设置write_consistency参数来调整 quorum 要求


比如:1 个 primary shard,3 个 replica,那么 quorum=((1 + 3 ) / 2) + 1 = 3。如果这时只有两台机器的话:


Timeout 机制

quorum 不齐全时,会 wait(等待)1 分钟默认是 1 分钟,但是可以通过 timeout 去手动调整,默认单位是毫秒。等待期间,期望活跃的 shard 数量可以增加,最后无法满足 shard 数量就 timeout。我们在写操作的时候,可以加一个 timeout 参数,比如:


# 当quorum不齐全的时候 ES的timeout时长PUT /index/_doc/id?timeout=30s
复制代码

ES5.0 以后的一致性

在 ES5.0 以后,原先执行 PUT 带 consistency=all/quorum 参数的,都会报错,提示语法错误。原因是 consistency 检查是在 PUT 之前做的,然而,虽然检查的时候,shard 满足 quorum,但是真正 primary shard 写到 replica 之前,仍然会出现 shard 挂掉,但 Update API 也会返回 Successd,因此,这个检查不能保证 replica 成功写入,甚至这个 primary shard 是否能成功写入也未必能保证。因此,修改了语法,用了 wait_for_active_shards,这个更加清楚一些:


PUT /index/_doc/1?wait_for_active_shards=2&timeout=10s{  "xxx": "xxx"}
复制代码

错误速查

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI 研究-132 Java 生态前沿 2025:Spring、Quarkus、GraalVM、CRaC 与云原生落地🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-180 Java 接入 FastDFS:自编译客户端与 Maven/Spring Boot 实战 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务已完结,Dubbo 已完结,MySQL 已完结,MongoDB 已完结,Neo4j 已完结,FastDFS 已完结,OSS 正在更新... 深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-183 Elasticsearch - 并发冲突与乐观锁、分布式数据一致性剖析_Java_武子康_InfoQ写作社区