Bookie 存储架构源码剖析|得物技术
一、Pulsar 存储架构简析
Pulsar 作为新一代 MQ 中间件,在底层架构设计上充分贯彻了存算分离的思想,broker 与 Bookeeper 两个组件独立部署,前者负责流量的调度、聚合、计算,后者负责数据的存储,这也契合了云原生下 k8s 大行其道的时代背景。Bookeeper 又名 Bookie ,是一个单独的存储引擎。在组件关系上,broker 深度依赖 Bookie,内部集成了 Bookie 的 client 端,broker 和 Bookie 之间基于 TCP 通信,使用 protobuf。
Pulsar 整体架构
消息流从 client 端发送到 broker,经过 broker 的计算、转化、路由后再次被分发到具体的 Bookie 节点,一条消息被存储几份是可配置的。数据的高可用由 broker 来保障而非 Bookie,Bookie 只是一个简单的单机存储引擎。一般而言数据多副本有两种主要的分发方式:一种是基于主从模式,主节点在收到数据写入后,将数据二次分发到从节点,从节点的数据流源头只有主节点,可以存在多个从节点,这种架构典型实现有 rocketMQ ,MySQL 等;另一种方式是并行多份写入多份相同的数据,在接收到 SDK 侧数据后进行多路分发。两种方式各有优劣,前者实现简单,但是延迟较高,在开启同步复制(异步复制可能丢数据)的情况下延迟为: master 写入延迟+slave 写入延迟;后者实现复杂,需要处理单节点分发失败补偿的问题,但是延迟较低,实际的写入延迟为 Max(shard1 写入延迟,shard2 写入延迟,.....)。Pulsar 的数据分发模式为后者。
Pulsar 数据流架构
一个 topic 在时间序列上被分为多个 Ledger,使用 LedgerId 标识,在一个物理集群中,LedgerId 不会重复,采用全局分配模式,对于单个 topic(分区 topic)而言同一时刻只会有一个 Ledger 在写入,关闭的 Ledger 不可以写入,以 topicA-partition1 的 Ledgers[ledger1, ledger3, ledger7, ...., ledgerN]为例,可写入的 Ledger 只有 N,小于 N 的 Ledger 均不可写入,单个 Ledger 默认可以存储 5W 条消息,当 broker 以 (3,2,2)模式写入数据时,具体架构如下图所示。3,2,2 可以解释为当前 topic 可以写入的节点有 3 个,每次数据写入 2 份,并且收到 2 个数据写入成功的 ACK 后才会返回响应 client 端。
Ledger 分段机制
二、Bookie 的架构设计
对 Pulsar 的架构有了大致的了解后,我们重点剖析下 Bookie 这个核心的存储引擎。消息系统为了追求最大写入吞吐,一般都采用顺序写的方式来压榨磁盘的 IO 性能。Bookie 也是一样,默认情况下 Bookie 的数据会写入 journal 日志文件,这个日志类似于 MySQL 中的 binlog 文件或者 rocketMQ 中的 commitlog 文件,采用乱序追加写的方式,存在多个 topic 的数据写入同一个文件的情况。
为了更好的 IO 隔离,官方建议 journal 单独挂一块盘。为了充分发挥磁盘 IO 性能,journal 目录可以有多个,即同时存在多个并行写入的 journal 日志,每个 journal 日志会绑定一个写入线程,写入请求提交后会被归一化到某个具体线程,实现无锁化,单个消息写入是按照 LedgerId 对目录数量取模,决定当前数据落到哪个 journal 目录。journal 日志落盘策略是可配置的,当配置同步落盘时,数据实时落盘后才会返回写入成功。journal 日志数据写入后会确认返回写入成功,而 entrylog 的数据是否落盘并不影响请求的立即返回。journal 和 entrylog 均可以配置为异步刷盘,这种情况下落盘的时序上并没有先后之分。
Bookie 数据存储架构
Journal 日志的主要作用是保证数据不丢失,同时提供足够快的性能,因此采用了混合落盘的模式。实际业务消费时,针对单个 topic 的数据在时间序列上是顺序消费,如果实际的数据从 journal 文件中读取则会出现大量的随机 IO,性能较差。Bookie 通过将数据进行二次转写的方式实现数据的局部有序从而提升读取性能,默认情况下一份数据在磁盘上会存两份:一份在 journal 日志中,一份在 entry 日志中。entry 日志中的数据具备局部有序的特性,在一批数据刷盘时,会针对这批数据按照 LedgerId,entryId 进行排序后落盘。这样消费侧在消费数据时能够实现一定程度上的顺序 IO,以提升性能。
entryIndex 的作用是保存(LedgerId+entryId)到 offset 的映射关系,这里的 offset 是指 entry data 文件中的 offset。
这样的一组映射关系很容易想到其在内存中的组织形式,一个 map。实际的存储 Pulsar 选择 rocksDB 来存储这样的 KV 关系,但 Bookie 本身也有自己的 KV 存储实现;
通过对 Bookie 架构的上分析,我们发现针对读写场景 Bookie 做了两件事来支撑:
混合 Ledger 顺序写的 journal 日志支撑高吞吐低延迟的写入场景;
局部有序的 entry data 支撑消费场景下的 Ledger 级别的顺序读。
三、Bookie 的数据写入流程
对于 Bookie 的写入流程大致如下图所示。Bookie 收到数据后会同时写入 journal 日志和 memtable,memtable 是一个内存 buffer。memtable 再次分发到 entry logger 以及 entry index,数据在 journal 中 append 完后会立即返回写入成功。entry data 和 entry index 的构建可以理解都是异步操作。
Bookie 数据写入流程
client 端源码分析
Pulsar 中 broker 组件 使用 low level API 与 Bookie 进行通信。下文结合具体代码进行分析。
使用 low level api 时,借助于 LedgerHandle 添加 entry 对象。在 Pulsar 中 entryId 为一个递增的序列,在 broker 中 Bookie 的源码调用顺序如下所示,其中 LedgerHandle,OpAddEntry,LedgerHandle class 对象为 Bookeeper 模块提供。
ManagedLedgerImpl#asyncAddEntry()方法(参数省略,下同)
ManagedLedgerImpl#internalAsyncAddEntry()方法
LedgerHandle#asyncAddEntry()方法
OpAddEntry#initiate()方法
LedgerHandle#doAsyncAddEntry()方法
BookieClient#addEntry()方法
LedgerHandle#doAsyncAddEntry 方法
在 doAsyncAddEntry 中的 729 行,发现 entryId 其实是由 lastAddPushed 递增得到,并且这段代码也被加上了重量级锁。PendingAddOp 对象构建完成后会进入一个 pendingAddOps 队列,该队列与当前 Ledger 绑定。
PendingAddOp#initiate 方法
这里的 PendingAddOp 对象代表着一个写数据的请求,在 initiate 进一步加锁,结合写入节点的数量分别向不同的 Bookie 存储节点发送写请求,sendWriteRequest 方法内容比较简单,直接调用 addEntry 方法即可。
PendingAddOp#sendWriteRequest
BookieClient#addEntry
addEntry 方法的实现依然有很多方法包装的细节,但最终通过网络调用 server 端的相关接口,这里篇幅有限,不过度展开。
server 端源码分析
请求路由组件:BookieRequestProcessor
直接跳转 bookeeper 的 server 端的核心处理方法上,BookieRequestHandler 为 server 端的处理类,其继承了 Netty 的 ChannelInboundHandlerAdapter,是最外层与 netty 组合工作的 handler。
BookieRequestHandler
在 channelRead 方法中触发了 requestProcessor 的处理逻辑,这里的 processor 实际为 BookieRequestProcessor,具体的相关代码在 BookieServer 类的构造函数中,BookieServer 是整个 bookeeper server 端的启动类。
BookieRequestProcessor#processRequest 方法为数据流的核心指令分发器。
BookieRequestProcessor#processRequest
这里围绕 processAddRequestV3 方法展开分析;Bookie 中有个很有意思的设定,将请求处理线程池分为普通线程池和高优线程池;两者执行逻辑相同。在下图的 452 行将写操作请求放入了线程池,需要说明的是这个线程池是经过改良的,多了一个 orderingKey 参数,在内部会将根据该参数进行 hash 运算,映射具体的线程上,其内部由多个单线程的线程池组成。这样做的好处是可以大幅度减少投递任务时的队列头部竞争,相比传统线程池有一定的性能优势。
processAddRequestV3
核心线程池任务:WriteEntryProcessorV3
显然,核心的处理逻辑在 write.run 方法内,继续开扒。run 方法中核心逻辑封装在 getAddResponse()。
WriteEntryProcessorV3#run
getAddResponse 方法内会对当前请求的标记,判断后分别调用 recoveryAddEntry 和 addEntry 这两个方法。前者的使用场景顾名思义是在异常恢复流程中被触发,一般是节点启动,宕机后重启等过程中恢复数据。addEntry 方法位于 Bookie 内,Bookie 是个接口,只有一个实现类 BookieImpl。
WriteEntryProcessorV3#getAddResponse
存储引擎接口抽象:Bookie
继续来看 BookieImpl#addEntry 方法,在 1067 这一行加上了 synchronized 锁,锁的对象为 handle,具体为 LedgerDescriptor 类型,这表示在单个 Ledger 内部的数据在写入时通过加锁的方式实现串行化写入。
1073 行的 addEntryInternal 方法内部是核心的写入逻辑。
BookieImpl#addEntry
Ledger 的管理者:LedgerDescriptor
getLedgerForEntry 方法基于传入的参数 LedgerId 查找到对应的 LedgerDescriptor,该类是一个抽象类,有两个实现类,分别是 LedgerDescriptorImpl 和 LedgerDescriptorReadOnlyImpl,顾名思义,二者分别提供读写功能。
BookieImpl#getLedgerForEntry
LedgerDescriptor 的两个实现类
handles 是 HandleFactory 类型接口,从其定义的接口来看主要作用就是实现 LedgerDescriptor 的读写分离,且只有一个实现 HandleFactoryImpl,在 HandleFactoryImpl 中保存了 2 个 Map 类型的 MAP。分别服务于两个接口的调用,getHandle 方法就是从 map 中获取可以写入的 LedgerDescriptor。
HandleFactory
事实上 LedgerDescriptorReadOnlyImpl 的实现很简单,继承了 LedgerDescriptorImpl 后将该类涉及到写入的方法全部重写为抛出异常!
LedgerDescriptorReadOnlyImpl
获取到对应的 LedgerDescriptor 后,就需要进行写入操作,下面分析 BookieImpl#addEntryInternal 方法。
从逻辑上来讲,entry 先是被写入 Ledger storage(930 行),其次才被写入 journal 日志,同时 journal 日志的写入是可选的,默认情况下开启;journal 关闭后将不存在数据落盘的逻辑,这意味着将无法依靠 journal 日志进行数据恢复。但考虑到消息写入时一般是多份,不考虑写入的多个节点同时宕机的情况,数据某种程度上依然是可靠的。
BookieImpl#addEntryInternal
Ledger 级的接口抽象:LedgerStorage
LedgerDescriptorImpl 中持有一个 ledgerStorage 类型,该组件负责最终的 entry 对象写入,存在多个实现类,分别是:DbLedgerStorage,SingleDirectoryDbLedgerStorage,InterleavedLedgerStorage,SortedLedgerStorage。
LedgerDescriptorImpl
LedgerStorage 实现类
Bookie 默认使用 SortedLedgerStorage,但 Pulsar 中使用 DbLedgerStorage 进行管理。
实际可配置的实现只有三个选项,下面依次对每个实现类进行分析。
ServerConfiguration
1
DbLedgerStorage->SingleDirectoryDbLedgerStorage
writeCache 写入
DbLedgerStorage 主要特点是使用了 rocksDB 保存[ledgerId+entryId --> location]的映射关系;内部又存在了一层套娃。addEntry 方法中先获取到 LedgerId, 再根据 LedgerId 获取 ledgerStorage,也就是说 LedgerId 和实际的 LedgerStorage 存在映射关系;DbLedgerStorage 内部又继续封装了 SingleDirectoryDbLedgerStorage 类来支撑数据写入,具体是一个 ListledgerStrageList;字段。经过 hash 后获得真实的 SingleDirectoryDbLedgerStorage 对象进行实际的 addEntry 操作;下文首先对该实现进行分析。
DbLedgerStorage#addEntry
DbLedgerStorage#getLedgerStorage
DbLedgerStorage 的成员变量
在 SingleDirectoryDbLedgerStorage 的源码中,待写入的 entry 仅仅是被放入 writeCache 中,put 成功后更新 LAC 并通知相关监听者,同时触发写入成功事件,貌似没有任何写盘的操作出现!!!进一步分析 497 行,如果 put 失败会触发 flush 操作并尝试再次 addEntry,这里的 flush 有点眼熟,有必要展开分析一波。
不难发现这里的写入操作和刷盘操作其实是线程隔离的,默认情况下,类比于 RMQ,大部分存储组件的刷盘操作和实际写入动作切分为两个线程在执行,刷盘线程会不断地巡检是否需要刷盘,主要基于当前未刷盘的数据量以及距离上次刷盘的时间间隔,如果开启同步刷盘,一般写入线程会被挂起在 req 请求上,当刷盘进度已经 cover 写入请求的 offset 时,被挂起的请求上的线程会被唤醒继续执行,这是一种非常典型的存储引擎设计模式。这里 writeCache 就是个 buffer,既可以充当写入缓冲也可以充当读取缓冲,在 tail read 场景下会有非常好的性能收益。
SingleDirectoryDbLedgerStorage#addEntry
writeCache 背后的 flush
triggerFlushAndAddEntry 的逻辑并不复杂,在超时时间到来之前会不断的检查当前的刷盘标记位,如果没有正在刷盘以及刷盘逻辑没有被触发,会尝试刷盘,同时尝试继续向 writeCache 中 put 数据,因为刷盘成功后会在 cache 中清理出一部分空间,用于 put 新的的数据,一旦 put 成功立即返回,跟外层的 addEntry 方法类似,只是多了个刷盘逻辑的处理。
SingleDirectoryDbLedgerStorage#triggerFlushAndAddEntry 方法
flush 方法其实是个空壳,核心逻辑在 checkpoint()方法内,该方法的主要逻辑为:
交换 writeCache,避免刷盘过程中数据无法写入,导致写入抖动;
对 writeCache 内的数据进行排序,实现局部有序;
分别调用 entryLog 的 add 方法和 entryIndex 的 add 方法;
调用 entrylog 的 flush 和 entryIndex 的 flush 进行刷盘。
SingleDirectoryDbLedgerStorage#flush
SingleDirectoryDbLedgerStorage#checkpoint
源码中的 writeCacheBeingFlushed 实际上和 writeCache 一体两面,上一次刷盘结束后 writeCacheBeingFlushed 会被 clear,再次刷盘时会交换两者;保证写入的稳定性;如果实际查询数据时要利用这部分 cache,需要查询两次,先查 writeCache 如果不存在 ,再查 writeCacheBeingFlushed。
writeCacheBeingFlushed 的注释
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);方法底层依赖 rocksDB 建立了[ledgerId, entryId]-->location 的映射关系,Batch 在这里代表着 一个 RocksDBBatch,location 可以理解为实际磁盘文件上的 offset。rocksDB 引擎超出本文范畴,这里不做分析。
EntryLogger
entryLogger 代表着存储实际数据的组件抽象,调用 addEntry(ledgerId, entry)方法完成数据写入。进一步对 addEntry 方法展开分析,发现 EntryLogger 是一个接口,有 2 个直接实现类,分别是 DefaultEntryLogger 和 DirectEntryLogger,默认使用 DefaultEntryLogger 参见源码:
DbLedgerStorage#initialize -part1
DbLedgerStorage#initialize -part2
最终的调用来到了 EntryLogManagerBase#addEntry 方法,首先获取到待写入的数据,然后调用 BufferedLogChannel#write 将其写入,可以看到实际的数据长度为:entry.readableBytes() + 4,4 个字节用于记录长度,先写入长度值,再写入 entry 的二进制数据;addEntry 方法返回值为 location,方法的最后一行表明 location 由 2 部分组成,分别是 logId 和 pos,各暂用高位和低位的 4 个字节。很容易想到随着时间的推移 EntryLogger 中的文件不止一个,因此需要一个 logId 来标识不同文件,具体到文件上又需要一个偏移量来定位具体的一条数据,4 个字节的 pos 也表明了单个 entryLog 文件理论大小值不能超过 4 个 G,实际默认值为 1G。
EntryLogManagerBase#addEntry
进一步分析 BufferedLogChannel#write 方法,发现 BufferedLogChannel 继承置之 BufferedChannel ,在 BufferedChannel 中有一个 writeBuffer ,write 方法只是将数据写入到这个 writeBuffer 中,至于是否刷盘则不一定。
只在满足下列两种情况时数据才会刷盘:
当前 writeBuffer 已经写满,writeBuffer 默认值 64KB;
ServerConfiguration 中配置了 FLUSH_ENTRYLOG_INTERVAL_BYTES 参数,且值大于 0,默认值为 0。
BufferedLogChannel#write
flush 方法内容很简单,调用底层的 fileChannel 将 writeBuffer 中的数据写入底层的文件系统,但是 flush 并不保证一定落盘,而是最后一行代码 forceWrite 方法保证。forceWrite 会调用 fileChannel.force(forceMetadata)将数据同步到磁盘上。
BufferedChannel#flush
为了保证数据的务必落盘,在 SingleDirectoryDbLedgerStorage#checkpoint 方法中,addEntry 方法之后,又单独调用了 entryLogger.flush();和 ledgerIndex.flush();对 entryLogger.flush()进一步拆分,发现底层调用了 EntryLogManagerBase#flush 方法,二者两个方法在 base 类中是 abstract 类型,具体实现又落到了 EntryLogManagerForSingleEntryLog 中,最终任务还是落在了 BufferedChannel#flushAndForceWrite 上。
BufferedChannel#flushAndForceWrite
2
SortedLedgerStorage->InterleavedLedgerStorage
在 SortedLedgerStorage 类中,持有了 InterleavedLedgerStorage 类型,大部分的接口实现都委托给了 InterleavedLedgerStorage 的相关方法调用,SortedLedgerStorage 的最大特点是,每次数据写入时都会进行排序,其内部使用了跳表。
EntryMemTable 写入
addEntry 方法的逻辑非常简单,将数据 add 到 memtTable 后,更新下 LAC 即结束;
SortedLedgerStorage#addEntry
继续研究 EntryMemTable 的 addEntry 方法之前先了解下 EntryMemTable 的结构,这个组件是一个纯内存的数据保存结构,kvmap 和 snapshot 负责实际数据保存,二者类型皆是 EntrySkipList ,这个类简单封装了 ConcurrentSkipListMap,实际使用时 KV 值相同,因为需要保证有序,所以重写了排序规则,主要比较 LedgerId 和 entryId。
kvmap 和 snapshot 工作机制是,当写满 kvmap 时,会将数据交换给 snapshot,kvmap 重新构建一个新的指定大小的结构,后台线程负责将 snpshot 中的数据刷盘保存,因此只要后台刷盘的速度不是特别垮,可以提供持续不间断的写入。单个 kvmap 有大小限制,默认 64M 大小,结合前面的 swap 机制,最多可以兜住 128M 的写入缓存。
EntryMemTable
addEntry 写入之前先获取读锁,(没错,写入用的是读锁!!!)然后将数据 put 进入 kvmap 结构中,internalAdd 方法内容很简单,就是一个对 kvmap 的 putIfAbsent 调用,看到这里可以理解为什么用的是读锁了。因为这里 kvmap 的并发安全控制根本不依赖这个读写锁。
EntryMemTable#addEntry
EntryMemTable 刷盘
读写锁的主要作用是,在 swap kvmap 和 snapshot 瞬间加上写锁控制以及读取数据时加上读锁控制。
ReentrantReadWriteLock 使用场景
每次刷盘之前会先创建个一个 snapshot 快照,用以保证此快照之前的数据在此次的刷盘范围内;创建 snapshot 时,会交换 kvmap 与 snapshot 两个字段,因为快照的创建是刷盘行为触发的,而刷盘动作一般都是有个单独的线程在执行,所以这里需要控制并发逻辑,保证在 swap 的瞬间,不能有 addEntry 操作,同样的在刷盘结束后需要清理 snapshot 的数据,也加上了写锁来控制。
EntryMemTable#snapshot
会有一个后台的刷盘线程执行 flush 操作,首先会先将 snapshot 数据 flush,然后尝试创建新的 snapshot,如果创建成功说明,仍然有可刷数据,再次执行 flushSnapshot 的动作。
EntryMemTable#flush
在 flushSnapshot 方法中,会调用 flusher 的 process 方法,这里的 flusher 其实就是 SortedLedgerStorage,在 process 方法内的实际调用了 InterleavedLedgerStorage 的 processEntry 方法,这个方法并不能保证数据一定会落到磁盘文件上,因此 EntryMemTable 所谓的 flush 操作只是将其内存数据刷新到 InterleavedLedgerStorage 组件中。
EntryMemTable#flushSnapshot
SortedLedgerStorage#process
EntryLogger
继续来看 InterleavedLedgerStorage 的处理逻辑,添加 Entry 后将对应的 KV 索引写入 LedgerCache 缓存后返回。查看 InterleavedLedgerStorage 的 entryLogger 字段发现,与上文的 SingleDirectoryDbLedgerStorage 相同,写入 entry 依然用的是 DefaultEntryLogger。
InterleavedLedgerStorage#processEntry
InterleavedLedgerStorage#entryLogger
EntryIndex
上文提到默认情况下 Pulsar 使用 DbLedgerStorage 来存储数据和索引信息,而索引信息默认情况下使用 rocksDB 来存储,rocksDB 作为顶级 KV 引擎其性能和稳定性毋庸置疑。但是在实际的使用过程中,某些时候会选择 LedgerStorage 的另一个实现类:SortedLedgerStorage。SortedLedgerStorage 的主要特点是是在每次写入数据的时候都会进行内部排序,内部维护一个跳表,同时其存储 leggerId+entryId 到 location 的映射关系是使用 Java 的引擎实现。下面对这个 Java 实现的 KV 引擎做详细分析。
ServerConfiguration 关于 ledgerStorageClass 的配置
仍然是先从 entryLog 的写入作为突破口,SortedLedgerStorage 内部套了一个 InterleavedLedgerStorage 对象,前者复用了后者的 addEntry 方法,核心方法在 InterleavedLedgerStorage#processEntry 中。
long pos = entryLogger.addEntry(ledgerId, entry, rollLog);方法添加完 entry 对象后返回对象在文件的 offset,内部的 add 逻辑与上文分析的 SingleDirectoryDbLedgerStorage 一致。
InterleavedLedgerStorage#addEntry
InterleavedLedgerStorage#processEntry
LedgerCache 是一个接口,具体的实现只有一个 LedgerCacheImpl 类,后者内部有两个支撑组件,IndexInMemPageMgr 和 IndexPersistenceMgr,从名称可以看出前者负责数据在内存中的保持,后者负责实际的存储。按照之前的分析的源码,很容易联想到数据大概率先落入 memoryPage 再落盘,pageSize 默认 8K,entriesPerPage 默认为 pageSize/8= 1K。
LedgerCacheImpl
putEntryOffset 方法首先通过 entryId 模以单个 page 页的 entry 数量得到当前 entryId 在具体的 page 页中的偏移量,这里的 page 不是 OS 中的 page 页,是 Bookeeper 单独抽象出来的 page 概念,需要区分开。在 getLedgerEntryPage 方法中,首先会尝试从内存中获取 LedgerEntryPage 对象,如果没有则调用 grabLedgerEntryPage 方法从磁盘上加载,内存中缓存的对象结构为 InMemPageCollection,内部是一个 LRU 缓存。
写入算法分析:
LedgerEntryPage 是对单个页的抽象;
int offsetInPage = entry % entriesPerPage;计算出当前的 entryId 在单个 LedgerEntryPage 的逻辑偏移量;
long pageEntry = entry - offsetInPage; 计算出当前 LedgerEntryPage 中初始 entryId;
基于 LedgerId 和初始 entryId 查找定位到 LedgerEntryPage,如果缓存中不存在,则从文件中加载;
按照 offsetInPage 计算当前的 offset 需要写入的真实位置,这里的 offset 即是 entryLogger 中 entry location 值;
由于写入的数据为 offset 是个 long 类型,需要 8 个二进制为,实际的写入的位点为逻辑上的 offsetInPage*8。
IndexInMemPageMgr#putEntryOffset
上述的算法自然也是可逆的,读取的时候同样基于 LedgerId 和 entryId 定位到具体的 LedgerEntryPage。然后在计算出实际的物理偏移量,在特定位置读取到 location 参数。
顺序写入的 WAL 日志:Journal
分析完 writeCache 的写入及其背后的逻辑,我们继续分析 journal 日志的写入流程;上文提到 journal 为混合写入模式,可能存在多个 LedgerId 的数据混编。在 addEntryInternal 方法的最后一行中通过 LedgerId 获取到真实的 journal,获取的逻辑依然是个 hash 算法,用来保证相同 LedgerId 始终落到一个 journal 上进行处理。
logAddEntry 干了三件事:
entry.retain()调整 entry 的引用计数值;
journalStats 给内部的 queueSize +1;
memoryLimitController 内存使用限速器,如果超限时,当前线程会被置为等待状态;
queue.put(.......), 将待写入的数据放进队列。
结合 logAddEntry 源码发现又是熟悉的味道,写入方法只是将请求放入队列,那么必然存在从队列获取数据并进行刷盘的逻辑。既然有 put 操作,必然有 take 操作,我们发现 takeAll 和 pollAll 方法 ,都位于 journal#run 方法中,run 方法这个名字如此敏感,以至于不跟 Thread 扯上点关系都说不过去。
Journal#logAddEntry
queue 的调用点
查看 journal 的 class 签名 发现其不出所料的实现了 Thread 的 run 方法,journal 既是顺序写入日志逻辑的抽象也是后台的刷盘线程的抽象;run 方法的实现较为复杂,其注释表明这是一个专门负责持久化的线程方法,同时负责 journal 文件的滚动,当 journal 文件被写满时,会使用当前时间戳创建一个新的 journal 文件,老的文件会被定期回收。
在 queue 字段旁边有一个 forceWriteRequests 字段,这个字段在实际的刷盘逻辑中起到了重要作用。
Journal 部分成员变量释义:
maxGroupWaitInNanos 组提交间隔,一般超过这个时间需要刷盘;
flushWhenQueueEmpty 开关表示当 queue 为空时是否刷盘;
bufferedEntriesThreshold 表示暂存在 toFlush 中的对象数量的阈值;
bufferedWritesThreshold 表示待刷盘的字节数阈值;
journalPageCacheFlushIntervalMSec 真实刷盘的时间间隔。
Journal#run 方法成员变量释义:
localQueueEntries 是一个复用的定长数组;
localQueueEntriesIdx 是这个定长数组中当前处理的元素索引编号,从 0 开始;
localQueueEntriesLen 代表每次从 queue 队列中获取的对象数量;
toFlush 队列是个可复用的 ArrayList,可以认为是个对象池;
numEntriesToFlush 是个待刷盘对象数量的计数器,与 toFlush 配合使用;
lastFlushPosition 为上次刷盘位点记录值;
lastFlushTimeMs 为上次刷盘时间点(毫秒单位);
JournalChannel 是单个 journal 文件的抽象,journal 代表单个目录下的多文件抽象;
BufferedChannel 代表一个写入缓冲区,来自于 JournalChannel;
qe 为 QueueEntry 类型的临时变量。
Journal#run 的主要逻辑如下:
启动 forceWriteThread 线程,这是一个真正意义上的刷盘线程;journal 线程只是将 queue 中的 QueueEntry 对象写入相关的 FileChannel 的 buffer 中,并不保证一定落盘;实际的刷盘行为由 forceWriteThread 负责。
不断的从 queue 中获取一组 QueueEntry 对象,并逐一将其写入 BufferedChannel 缓冲区;从 queue 中获取的 QE 对象放入 localQueueEntries 数组中;entry 需要符合一定的条件才会被写入二进制数据流(主要 entryId 和版本的识别);写入调用的是 BufferedChannel#write 方法,只是将数据写入内部的 writeBuffer 中;写入缓冲区后,将 QE 对象添加进入 toFlush 队列,同时调整 numEntriesToFlush(+1);继续处理 localQueueEntries 中的下一个元素。当 localQueueEntriesIdx == localQueueEntriesLen 时,表示 localQueueEntries 元素全部处理完成,此时临时变量 qe(QueueEntry) 置为 null。
在处理 qe 对象的过程中,会综合多方面条件判断是否需要刷盘,使用临时变量 shouldFlush 表示;当 numEntriesToFlush>0 且符合以下条件时会触发“刷盘”逻辑;当临时变量 qe 为空 或者当前的 qe 处理的时间超过 maxGroupWaitInNanos;当临时变量 qe 为空并且开启 flushWhenQueueEmpty 配置时刷盘;当临时变量 qe 不为空,符合下面两个条件时刷盘;且 toFlush 中暂存的对象数量超过 bufferedEntriesThreshold;或距离上次刷盘的位点间隔超过 bufferedWritesThreshold。
如果满足刷盘条件,调用 BufferedChannel#flush 操作;flush 操作会将之前攒批的 writeBuffer 中的数据写入 OS 的文件系统;底层作为 FileChannel#write 方法的入参;将 toFlush 相关索引为置空,同时调整 numEntriesToFlush;触发 entry 写入的相关回调逻辑执行;更新 lastFlushPosition。
flush 操作完成后将进一步判断是否需要向 forceWriteThread 提交真实的刷盘请求;提交时会将 toFlush 列表中全部对象连同其他参数封装成一个请求对象;一旦提交后将更新 lastFlushTimeMs;符合提交条件的情况有:开启 syncData ,journal 级别的开启同步刷盘的开关;当前需要滚动创建新的 journal 文件;距离上次真实刷盘时间超过阈值 journalPageCacheFlushIntervalMSec。
Journal#run 方法
当真实刷盘请求被提交到 forceWriteThread 线程后,有必要进一步分析该线程的执行逻辑,相比之下 ForceWriteThread#run 方法的逻辑简单很多,解包收到的请求,然后调用 syncJournal 进行强制刷盘,同时做一些清理回收的动作,以及最后的一些回调方法的触发和统计操作。这里的 localRequests 也是一个可复用的临时数组。
ForceWriteThread#run
Journal#syncJournal 方法调用了 request 对象的 flushFileToDisk 方法,该方法内部调用了 logFile.forceWrite(false); 。
logFile 就是之前提到的单个 journal 文件的抽象,即 JournalChannel,其内部封装了 BufferedChannel,实际的类型为 DefaultEntryLogger,与 EntryLogger 所使用的底层实现如出一辙。
Journal#syncJournal
JournalChannel 类
再论 Bookie
上文提到 BookieImpl 中的 addEntry 逻辑似乎很简单,数据写入交由 LedgerHander 和 journal 组件,自身则是简单的封装。实则不然,查看 BooikeImpl 的实现,发现其中存在一个 SyncThread 对象,该对象是一个同步线程,其逻辑为转写 journal 日志的数据到 entryLog 和 entryIndex。
BookieImpl
启动 checkpoint 定期检查
doCheckpoint 在底层最终调用了 LedgerStorage#checkpoint 方法,与上文提到的 writeCache 背后的 flush 殊途同归。这里存在另外一个问题:SyncThread 线程是否会与 triggerFlushAndAddEntry 中的 flush 线程并发执行,以及是否存在并发刷盘带来的数据错乱问题。答案是不会,具体来看 checkpoint 方法内部存在一个 flushMutex 锁,同时在进入锁之前,首先会对当前的 checkpoint 做判断,如果传入的 checkpoint 水位线低于当前 SingleDirectoryDbLedgerStorage 对象持有的 lastCheckpoint 水位线,则不执行实际的 checkpoint 动作。
SyncThread#doCheckpoint
server 端分析总结
Bookeeper 的 server 端的架构较为复杂,分为多级写入的架构,收据流向为:
数据首先进入 writeCache,有后台线程定期将 cache 数据同步到 entryLog 和 entryIndex;writeCache 底层采用 swap 机制,保证写入延迟的稳定性。
调用 entryLog 和 entryIndex 分别写入业务数据和索引数据;entryIndex 使用 rocksDB 作为 KV 索引保存 LedgerId+ entryId 到 offset 的映射关系。
SingleDirectoryDbLedgerStorage#flush 操作和 EntryLogger#flush 操作不同;前者只是将数据同步到 entryLog 和 entryIndex 中;后者真实调用底层的文件系统进行刷盘。
journal 日志的写入时可配置的,默认开启,journal 日志同样存在后台的刷盘线程;journal 线程一直重复在干两件事;将 QueueEntry 转化为二进制写入 bufferChannel 的 writebuffer;综合判断各种条件,定期向 forceWriteThread 线程 提交真实的刷盘任务。
四、Bookie 的数据读取流程
server 端源码分析
请求路由
回到 BookieRequestProcessor#processRequest 的源码截图,读取流程围绕 READ_ENTRY 这一 opCode 展开,同样在最新版本的 Bookie 代码中,read processor 升级到了 V3 版本。
BookieRequestProcessor#processRequest
和写入一样,在读取的 processReadRequestV3 方法中,依然有高优先级线程池和普通线程池,不同的是还多了一个长轮询线程池,在投递任务时又出现了熟悉的操作,跟 LedgerId 选择线程池中具体的线程执行操作。
BookieRequestProcessor#processReadRequestV3
直接跳转 ReadEntryProcessorV3#run 方法。发现是个空壳,逻辑封装 executeOp 方法中。
ReadEntryProcessorV3#run
BookieImpl#readEntry
最终的读取逻辑在 BookieImpl#readEntry 中,该方法只是简单的封装,根据 LedgerId 获取到 LedgerDescriptor 后,读取逻辑顺利委托给了 LedgerDescriptor,在 LedgerDescriptor#readEntry 方法内进一步套娃,又将请求转移给了 LedgerStorage#getEntry,前文提到 LedgerStorage 是个接口,真正干活的是 SingleDirectoryDbLedgerStorage 中的 doGetEntry 方法,这个类在写入请求的分析过程中同样出场过。
BookieImpl#readEntry
doGetEntry 方法的逻辑整体较为简单,主要分为以下几步:
如果传入的 entryId 为-1 ,表示读取 LAC ,先从 Ledger 中获取实际的 LAC 的 entryId,在进行读取;
默认先从 writeCache 中读取,如果读取不到则去 writeCacheBeingFlushed 中读取,命中则直接返回;
如果 2 级缓存中均不存在,则去 readCache 中据需读取;
如果 readCache 也不存在,那么就要触发磁盘读,先去 entryLocationIndex 获取 entryLogger 中的物理偏移量;
随后调用 entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); 获取真实数据;
数据获取到后会放入 readCache 中;
在方法结束时,会触发一次预读,读取紧挨着当前 entry 的下一个 entry 并放入 readCache 中。
SingleDirectoryDbLedgerStorage#doGetEntry
DefaultEntryLogger 如何读取 entry
entry 的读取在上图的 640 行,最终调用方法为 DefaultEntryLogger#internalReadEntry 方法。逻辑如下:
将 location 参数转化为 buffer 中的 position 位点;
获取到 FileChannel(856 行);
从 pos-4 位置开始读取 20 个字节并解析,sizeBuf 值为 entry 的整体长度(4+8+8);
然后分配一个 sizeBuf 大小的 buffer 用于装载即将要读取的 entry。
DefaultEntryLogger#internalReadEntry
DefaultEntryLogger#readEntrySize
在存量数据足够的情况下 readFromLogChannel 方法会尽可能将入参中的 buffer 填满,在 BufferedReadChannel 中存在一个 readBuffer,默认大小 512 字节,read 方法仍然有可能命中该缓存。
DefaultEntryLogger#readFromLogChannel
server 端分析总结
数据的查询内容比较简单,从大的架构上来看整个读取过程存在三级缓存,都不命中的话才会读取磁盘。
实际上在上层的 broker 组件里还有一层缓存存在。消息获取流程如下图所示:
五、读写调用链分析
组件模块分析
BufferedChannel
其派生关系如下图所示,还有一个 SlowBufferedChannel 继承 BufferedChannel 类,但是该类为测试使用。BufferedReadChannel 是读场景下的主要支撑类,内部有 512 字节的读缓冲。
EntryLogger
默认使用 DefaultEntryLogger,主要用于存储实际的 entry 对象数据,DefaultEntryLogger 和 DirectEntryLogger 的区别在于一个使用 JDK 的 RandomAccessFile ,另一个直接使用 DIO(单独依赖特定 C 库)。
DefaultEntryLogger
DirectEntryLogger
LedgerStorage
基于 EntryLogger 的上层抽象,主要实现有 InterleavedLedgerStorage 和 SingleDirectoryDbLedgerStorage, 还有一个 SortedLedgerStorage,内部封装了 InterleavedLedgerStorage,复用了大部分的 InterleavedLedgerStorage 的方法。SortedLedgerStorage 每次写入时对内部的数据进行排序,使用自带的 KV 引擎存储 LedgerId+entryId-->location 映射关系。SingleDirectoryDbLedgerStorage 每次刷盘时才会对缓存的数据进行排序,使用 rocksDB 存储 KV 关系。
SingleDirectoryDbLedgerStorage
InterleavedLedgerStorage
LedgerDescriptor
包装类,大部分逻辑委托给 ledgerStorage 实现。内部持有 ledgerId,每个 ledgerId 对应一个 LedgerDescriptor 对象。
Bookie
Bookie 节点级存储抽象,内部封装了多个 journal 抽象组成的 journalList,ledgerStorage,syncThread 线程。
syncThread 线程主要负责将 journal 中的 appendLog 转写为 entryLog 和 enrtyIndex,checkpoint 之前的数据在执行 GC(数据清理工作,非 JVM 中的 GC)时可被回收删除。
ReadEntryProcessorV3,WriteEntryProcessorV3
负责读写指令的路由和转化。
写入流程调用时序
WriteEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedLogChannel
读取流程调用时序
ReadEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedReadChannel
六、架构总结
Bookie 的存储架构主要分为三大块,首先是代表 WAL 日志的 journal 文件写入,以顺序混写的方式提升写入性能,保证低延迟,通常以独立盘隔离挂载,典型的消息场景下 journal 日志写完后即可返回。由于是不同 topic 的混合写入,journal 日志无法很好的支撑单个 topic 的消息的顺序读,回溯等场景,会存在读放大问题。
由此就衍生出了 entryLog 的二次转储,为了尽可能利用顺序读,单个 entryLog 内部的数据在写入时会根据 ledgerId+entryId 排序,这样同一个 ledgerId 的数据会紧密的收敛在局部,能够一定程度上提升读性能;entryLog 写入后会获取到消息实际存储的位点信息 offset,由于该 offset 不可被自定义,很难表述出这条消息在 topic 写入序列上为第几条信息,这一点很重要,因为消费的时候是基于这样的序列来消费的,同时在消费位点管理时也需要这样的信息。
entryId 的作为一个传入参数,其作用恰恰如此,是一个面向用户的更易于管理的唯一 Id。当用户基于 ledgerId+entryId 来查找数据时,显然并不知道这个这条数据实际存储 offset 信息。这就诞生了一个额外的 KV 结构,用来保存 ledgerId+entryId 到 offset 的映射关系。Bookie 内嵌了 rocksDB 的 KV 引擎,同时也自行实现了一套,Pulsar 默认使用 rocksDB 方式保存 KV 关系。
bookie 在整个写入和读取过程中利用了大量的用户态缓存机制,相比于 mmap 的 pageCache 机制更为灵活可控,同时也很大程度上降低了读写的抖动,尤其是在容器环境下不同 POD 互相干扰的情况。
*文/簌语
本文属得物技术原创,更多精彩文章请看:得物技术
未经得物技术许可严禁转载,否则依法追究法律责任!
评论