写点什么

ElasticSearch 写入流程详解

作者:IT巅峰技术
  • 2022 年 4 月 02 日
  • 本文字数:5504 字

    阅读完需:约 18 分钟

ElasticSearch写入流程详解

一、前言

介绍我们在前面已经知道 ElasticSearch 底层的写入是基于 lucence 依进行 doc 写入的。elasticsearch 作为一款分布式系统,在写入数据时还需要考虑很多重要的事项,比如:可靠性、原子性、一致性、实时性、隔离性、性能等多个指标。

elasticsearch 是如何做到的呢?下面我们针对 ElasticSearch 的写入进行分析。

二、lucence 写

2.1 增删改

ElasticSearch 拿到一个 doc 后调用 lucence 的 api 进行写入的。

public long addDocument(); public long updateDocuments(); public long deleteDocuments();
复制代码

如上面的代码所示,我们使用 lucence 的上面的接口就可以完成文档的增删改操作。在 lucence 中有一个核心的类 IndexWriter 负责数据写入和索引相关的工作。


//1. 初始化indexwriter对象IndexWriter writer = new IndexWriter(new Directory(Paths.get("/index")), new IndexWriterConfig());
//2. 创建文档Document doc = new Document();doc.add(new StringField("empName", "王某某", Field.Store.YES));doc.add(new TextField("content", "操作了某菜单", Field.Store.YES));
//3. 添加文档writer.addDocument(doc);
//4. 提交writer.commit();
复制代码

以上代码演示了最基础的 lucence 的写入操作,主要涉及到几个关键点:

初始化:

Directory 是负责持久化的,他的具体实现有很多,有本地文件系统、数据库、分布式文件系统等待,ElasticSearch 默认的实现是本地文件系统。

Document:

Document 就是 es 中的文档,FiledType 定义了很多索引类型。这里列举几个常见的类型:

  1. stored:字段原始内容存储 

  2. indexOptions:

(NONE/DOCS/DOCS_AND_FREQS/DOCS_AND_FREQS_AND_POSITIONS/DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS),倒排索引的选项,存储词频、位置信息等。

  1. docValuesType:正排索引,建立一个 docid 到 field 的的一个列存储。

  2. 一些其它的类型

IndexWriter:

IndexWriter 在 doc 进行 commit 后,才会被持久化并且是可搜索的。

IndexWriterConfig:

IndexWriterConfig 负责了一些整体的配置参数,并提供了方便使用者进行功能定制的参数: 

  1. Similarity:这个是搜索的核心参数,实现了这个接口就能够进行自定义算分。lucence 默认实现了前面文章提到的 TF-IDF、BM25 算法。 

  2. MergePolicy:合并的策略。我们知道 ElasticSearch 会进行合并,从而减少段的数量。 

  3. IndexerThreadPool:线程池的管理。

  4. FlushPolicy:flush 的策略。

  5. Analyzer:定制分词器。

  6. IndexDeletionPolicy:提交管理。

PS:在 ElasticSearch 中,为了支持分布式的功能,新增了一些系统默认字段:

  1. _uid,主键,在写入的时候,可以指定该 Doc 的 ID 值,如果不指定,则系统自动生成一个唯一的 UUID 值。

  2. _version,版本字段,version 来保证对文档的变更正确的执行,更新文档时有用。 

  3. _source,原始信息,如果后面维护不需要 reindex 索引可以关闭该字段,从而节省空间 

  4. _routiong,路由字段。 

  5. 其它的字段

2.2. 并发模型

上面我们知道 indexwriter 负责了 ElasticSearch 索引增删改查。那它具体是如何管理的呢?



(图:IndexWritter 模型)

2.2.1. 基本操作

关键点:  

  • DocumentsWriter 处理写请求,并分配具体的线程 DocumentsWriterPerThread

  • DocumentsWriterPerThread 具有独立内存空间,对文档进行处理 DocumentsWriter 触发一些 flush 的操作。

  • DocumentsWriterPerThread 中的内存 In-memory buffer 会被 flush 成独立的 segement 文件。 

  • 对于这种设计,多线程的写入,针对纯新增文档的场景,所有数据都不会有冲突,非常适合隔离的数据写入方式

2.2.2 更新

Lucene 的 update 和数据库的 update 不太一样,Lucene 的更新是查询后删除再新增。  

  • 分配一个操作线程 

  • 在线程里执行删除 

  • 在线程里执行新增

2.2.3 删除



(图:IndexWritter 删除模型)

上面已经说了,在 update 中会删除,普通的也会删除,lucence 维护了一个全局的删除表,每个线程也会维护一个删除表,他们双向同步数据

  • update 的删除会先在内部记录删除的数据,然后同步到全局表中。

  • delete 的删除会作用在 Global 级别,后异步同步到线程中。

  • Lucene Segment 内部,数据实际上其实并不会被真正删除,Segment 内部会维持一个文件记录,哪些是 docid 是删除的,在 merge 时,相应的 doc 文档会被真正的删除。

2.2.4 flush 和 commit

每一个 WriterPerThread 线程会根据 flush 策略将文档形成 segment 文件,此时 segment 的文件还是不可见的,需要 indexWriter 进行 commit 后才能被搜索。 

这里需要注意:

ElasticSearch 的 refresh 对应于 lucene 的 flush,ElasticSearch 的 flush 对应于 lucene 的 commit,ElasticSearch 在 refresh 时通过其它方式使得 segment 变得可读。

2.2.5 merge

merge 是对 segment 文件合并的动作,这样可以提升查询的效率并且可以真正的删除的文档。

2.2.6 小结

在这里我们稍微总结一下,一个 ElasticSearch 索引的一个分片对应一个完整的 lucene 索引, 而一个 lucene 索引对应多个 segment。

我们在构建同一个 lucene 索引的时候, 可能有多个线程在并发构建同一个 lucene 索引, 这个时候每个线程会对应一个 DocumentsWriterPerThread, 而每个 DocumentsWriterPerThread 会对应一个 index buffer. 

在执行了 flush 以后, 一个 DocumentsWriterPerThread 会生成一个 segment。

三、 ElasticSearch 的写

3.1. 宏观看 ElasticSearch 请求


在前面的文章已经讨论了写入的流程 ElasticSearch



(图:布式整体写入流程图) 

图片来自官网 

当写入文档的时候,根据 routing 规则,会将文档发送至特定的 Shard 中建立 lucence。

  • 介绍在 Primary Shard 上执行成功后,再从 Primary Shard 上将请求同时发送给多个 Replica Shardgit 

  • 请求在多个 Replica Shard 上执行成功并返回给 Primary Shard 后,写入请求执行成功,返回结果给客户端

注意上面的写入延时=主分片延时+max(Replicas Write),即写入性能如果有副本分片在,就至少是写入两个分片的延时延时之和。

3.2. 详细流程



(图:整体写入详情流程)  

3.2.1 协调节点内部流程

如上图所示:

  • 协调节点会对请求检查放在第一位,如果如果有问题就直接拒绝。主要有长度校验、必传参数、类型、版本、id 等等。

  • pipeline,用户可以自定义设置处理器,比如可以对字段切割或者新增字段,还支持一些脚本语言,可以查看官方文档编写。

  • 如果允许自动创建索引(默认是允许的),会先创建索引,创建索引会发送到主节点上,必须等待 master 成功响应后,才会进入下一流程。

  • 请求预处理,比如是否会自动生成 id、路由,获取到整个集群的信息了,并检查集群状态,比如集群 master 不存在,都会被拒绝。

  • 构建 sharding 请求,比如这一批有 5 个文档, 如果都是属于同一个分片的,那么就会合并到一个请求里,会根据路由算法将文档分类放到一个 map 里 Map> requestsByShard = new HashMap<>();路由算法默认是文档 id%分片数。

  • 转发请求,有了分片会根据前面的集群状态来确定具体的 ElasticSearch 节点 ip,然后并行去请求它们。

3.2.2 主分片节点流程

3.2.2.1 写入(index)



(图:refresh 段) 

该部分是 elasticsarch 的核心写入流程,在前面的文章也介绍了,请求到该节点会最终调用 lucence 的方法,建立 lucence 索引。其中主要的关键点:

  • ElasticSearch 节点接收 index 请求,存入 index buffer,同步存入磁盘 translog 后返回索引结果

  • Refresh 定时将 lucence 数据生成 segment,存入到操作系统缓存,此时没有 fsync,清空 lucence,此时就可以被 ElasticSearch 查询了,如果 index buffer 占满时,也会触发 refresh,默认为 jvm 的 10%。

  • Flush 定时将缓存中的 segments 写入到磁盘,删除 translog。如果 translog 满时(512m),也会触发 flush。

  • 如果数据很多,segment 的也很多,同时也可能由删除的文档,ElasticSearch 会定期将它们合并。

3.2.2.2 update



(图:update)  

  1. 读取同 id 的完整 Doc, 记录版本为 version1。

  2. 将 version1 的 doc 和 update 请求的 Doc 合并成一个 Doc,更新内存中的 VersionMap。获取到完整 Doc 后。进入后续的操作。

  3. 后面的操作会加锁。

  4. 第二次从 versionMap 中读取该 doc 的的最大版本号 version2,这里基本都会从 versionMap 中获取到。

  5. 检查版本是否冲突,判断版本是否一致(冲突),如果发生冲突,则回到第一步,重新执行查询 doc 合并操作。如果不冲突,则执行最新的添加 doc 请求。

  6. 介绍在 add Doc 时,首先将 Version + 1 得到 V3,再将 Doc 加入到 Lucene 中去,Lucene 中会先删同 id 下的已存在 doc id,然后再增加新 Doc。写入 Lucene 成功后,将当前 V3 更新到 versionMap 中。

  7. 释放锁,更新流程就结束了。

介绍其实就是乐观锁的机制,每次更新一次版本号加 1 ,不像关系式数据库有事物,你在更新数据,可能别人也在更新的话,就把你的给覆盖了。

你要更新的时候,先查询出来,记住版本号,在更新的时候最新的版本号和你查询的时候不一样,说明别人先更新了。你应该读取最新的数据之后再更新。

写成功后,会转发写副本分片,等待响应,并最后返回数据给协调节点。具体的流程:

  • 校验,校验写的分片是否存在、索引的状态是否正常等等。

  • 是否需要延迟执行,如果是则会放入到队列里等待。

  • 校验活跃的分片数是否存在,不足则拒绝写入。


public boolean enoughShardsActive(final int activeShardCount) { if (this.value < 0) { throw new IllegalStateException("not enough information to resolve to shard count"); } if (activeShardCount < 0) { throw new IllegalArgumentException("activeShardCount cannot be negative"); } return this.value <= activeShardCount;}
复制代码

为什么会要校验这个活跃的分片数呢?

  • ElasticSearch 的索引层有个一 waitforactiveshards 参数代表写入的时候必须的分片数,默认是 1。如果一个索引是每个分片 3 个副本的话,那么一共有 4 个分片,请求时至少需要校验存活的分片数至少为 1,相当于提前校验了。如果对数据的可靠性要求很高,就可以调高这个值,必须要达到这个数量才会写入。

  • 调用 lucence 写入 doc.

  • 写入 translog 日志。

  • 写入副本分片,循环处理副本请求,会传递一些信息。在这里需要注意,它们是异步发送到副本分片上的,并且需要全部等待响应结果,直至超时。

  • 接着上一步,如果有副本分片失败的情况,会把这个失败的分片发送给 master,master 会更新集群状态,这个副本分片会从可分配列表中移除。 

发送请求至副本


@Overridepublic void tryAction(ActionListener<ReplicaResponse> listener) { replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);}
复制代码

等待结果


privatevoid decPendingAndFinishIfNeeded() { assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); }}
复制代码

在以前的版本中,其实是异步请求副本分片的,后来觉得丢失数据的风险很大,就改成同步发送了,即 Primary 等 Replica 返回后再返回给客户端。如果副本有写入失败的,ElasticSearch 会进行一些重试,但最终并不强求一定要在多少个节点写入成功。

在返回的结果中,会包含数据在多少个 shard 中写入成功了,多少个失败了,如果有副本上传失败,会将失败的副本上报至 Master。

PS:ElasticSearch 的数据副本模型和 kafka 副本很相似,都是采用的是 ISR 机制。即:ES 里面有一个:in-sync copies 概念,主分片会在索引的时候会同步数据至 in-sync copies 里面所有的节点,然后再返回 ACK 给 client。而 in-sync copies 里面的节点是动态变化的,如果出现极端情况,在 in-sync copies 列表中只有主分片一个的话,这里很容易出现 SPOF 问题,这个是在 ElasticSearch 中是如何解决的呢?

就是依靠上面我们分析的 wait_for_active_shards 参数来防止 SPOF,如果配置 index 的 wait_for_active_shards=3 就会提前校验必须要有三个活跃的分片才会进行同步,否则拒绝请求。对于可靠性要求高的索引可以提升这个值。

PS:为什么是先写 lucence 再写入 translog 呢,这是因为写入 lucence 写入时会有数据检查,有可能会写入失败,这个是发生在内存之中的,如果先写入磁盘的 translog 的话,还需要回退日志,比较麻烦

3.2.3 副本分片节点流程

这个过程和主分片节点的流程基本一样,有些校验可能略微不同,最终都会写入 lucence 索引。

四、总结

本文介绍了 ElasticSearch 的写入流程和一些比较详细的机制,最后我们总结下开头我们提出的问题,一个分布式系统需要满足很多特性,大部分特性都能够在 ElasticSearch 中得到满足。

  • 可靠性:lucence 只是个工具,ElasticSearch 中通过自己设计的副本来保证了节点的容错,通过 translog 日志保证宕机后能够恢复。通过这两套机制提供了可靠性保障。

  • 一致性:ElasticSearch 实现的是最终一致性,副本和主分片在同一时刻读取的数据可能不一致。比如副本的 refresh 频率和主分片的频率可能不一样。

  • 高性能:ElasticSearch 通过多种手段来提升性能,具体包括:

  1. lucence 自身独立线程维护各自的 Segment,多线程需要竞争的资源更少,性能更好。 

  2. update 等操作使用 versionMap 缓存,减少 io.

  3. refresh 至操作系统缓存。

  • 原子性、隔离性:使用版本的乐观锁机制保证的。

  • 实时性:ElasticSearch 设计的是近实时的,如果同步进行 refresh、flush 将大幅降低性能,所以是”攒一部分数据“再刷入磁盘,不过实时写入的 tranlog 日志还是可以实时通过 id 查到的。



程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注“IT 巅峰技术” ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例,作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

发布于: 刚刚阅读数: 2
用户头像

一线架构师、二线开发、三线管理 2021.12.07 加入

Redis6.X、ES7.X、Kafka3.X、RocketMQ5.0、Flink1.X、ClickHouse20.X、SpringCloud、Netty5等热门技术分享;架构设计方法论与实践;作者热销新书《RocketMQ技术内幕》;

评论

发布
暂无评论
ElasticSearch写入流程详解_elasticsearch_IT巅峰技术_InfoQ写作平台