写点什么

TIKV 源码学习笔记 --BatchSystem 创建初始化流程

  • 2024-03-15
    北京
  • 本文字数:9155 字

    阅读完需:约 30 分钟

作者: ylldty 原文来源:https://tidb.net/blog/1e876bc8

前言

我们在前面的概述里面,已经描述了 batchSystem 的重要组件:


  • FSM

  • n * normalFsm

  • controlFsm

  • MailBox

  • n * normalMailBox

  • controlMailBox

  • scheduler

  • normalScheduler

  • controlScheduler

  • poller

  • router


本文将会带大家看一下,TIKV 在启动过程中,是如何构建与初始化 BatchSystem 这些组件的。


另外,BatchSystem 还有两个比较重要的异步任务:


一个是 ApplyFsm,其是和 normalFsm 一一对应的,每当 normalFsm 内部的 raft 模块生成了 committedEntries,就需要 ApplyFsm 将其进行 Apply,最后落盘到 RocksDB


一个是 AsyncWriter,专门用于处理 multiRaft 生成的 MsgraftLog,将 Msg 发送给其他 TIKV 实例,将 raftLog 通过 raftLogEngine 持久化到 RocksDB


BatchSystem 的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:


  • components/batch-system/src/

  • components/raftstore-v2/src/

  • components/raftstore/src/


一个是异步任务池和 channel 的大量使用,让 Msg 的流转很难理解。


本文专门研究 BatchSystem 的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel 对应的 Msg 流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystemmultiRaft 原理、Region 合并与分裂的流程。

BatchSystem 的创建

BatchSystem 创建入口是 components/server/src/server2.rs::run_impl::init_engines, 主要创建 BatchSystem 的框架,特别是创建并且初始化 control 相关组件,例如 controlFsmcontrolMailBoxcontrolScheduler 等等。

简要逻辑

components/server/src/server2.rs::run_impl::init_engines: 创建 node,内部 StoreSystem 实现 multi raft 分布式协议: 创建 raftLogEngine 用于存储和恢复节点 raft log: 创建 tablet_registry 用于存储节点 KV 数据
-- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store : 创建 NodeV2.StoreSystem
---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system : 创建并且初始化 controlFsm
------ components/batch-system/src/batch.rs::create_system : 创建 controlMailBox、controlScheduler、normalScheduler、router : 创建 BatchSystem
复制代码

关键代码路径

init_engines

components/server/src/server2.rs


init_engines 函数主要用于创建 TIKV 各个重要模块的数据结构对象,其中:


  • node 对象代表当前 TIKV 的节点实例,每一个 TIKV 程序仅对应一个 node 对象

  • raft_engine 对象实际上是 RaftLogEngineRaft 接受其他节点或者自己节点产生的任何 raftlog 都需要通过这个对象写入 rocksdb,是分布式系统恢复一致性的重要保证

  • tablet_registry 模块和 rocksdb 强相关, TIKV Raft Apply 后需要持久化的任何 KV 数据都需要通过 tablet_registry 来写入 rocksdb

  • router 对象我们在前一个文章中已经描述过,其内部包含了各个 Raft Region MailBox,专门用于路由 RPC 请求到 raftStore 模块

Node::try_bootstrap_store

src/server/raftkv2/node.rs


这个函数主要功能是生成唯一的 nodeId,然后利用 create_store_batch_system 创建 raftStore 的各个对象。


pub fn try_bootstrap_store(        &mut self,        cfg: &raftstore_v2::Config,        raft_engine: &ER,    ) -> Result<()> {        let store_id = Bootstrap::new(            raft_engine,            self.cluster_id,            &*self.pd_client,            self.logger.clone(),        )        .bootstrap_store()?;        self.store.set_id(store_id);
let (router, system) = raftstore_v2::create_store_batch_system( cfg, store_id, self.logger.clone(), self.resource_ctl.clone(), ); self.system = Some((router, system)); Ok(()) }

复制代码

raftstore_v2::create_store_batch_system

components/raftstore-v2/src/batch/store.rs


  • StoreFsm 实际上就是 controlFsm,不管有多少个 regioncontrolFsm 只有一个,这里提前将 controlFsm 创建出来

  • StoreFsm::new 会返回两个结果,一个是 fsm,一个是 channel 的发送端 tx。channel 的接收端 rxfsm 成员变量,tx 后续会绑定到 controlMailBox 内部

  • create_system 将会继续创建 raftStore 的其他组件对象

batch_system::create_system

components/batch-system/src/batch.rs


  • 上一个步骤中,controlFsm 就是这里的 controller 参数,sendercontrolFsm channel 的发送端。因此 create_system 首先就利用 controlFsmsender 来创建 controlMailBox

  • 接下来,还需要创建 normal_schedulercontrol_schedulerpoller,这三个组件共用一套 channel

  • 最后,把 controlMailBoxnormal_schedulercontrol_scheduler 放入 router 中去

  • 使用 router 构建 BatchSystem

  • pool_state_builder 一般用于动态增加或者减少 poller 数量

进度

目前经过 init_engines 处理后,batchSystm 的当前进度为:


  • FSM

  • n * normalFsm (未创建)

  • controlFsm (已创建、已初始化)

  • MailBox

  • n * normalMailBox (未创建)

  • controlMailBox (已创建、已初始化)

  • scheduler

  • normalScheduler (已创建、已初始化)

  • controlScheduler (已创建、已初始化)

  • poller (未创建、未初始化)

  • router (已创建部分、未初始化)

初始化 BatchSystem

经过 init_engines 处理后,batchSystem 大概框架已经建立完毕,而且 control 组件基本初始化完成。接下来的流程中,batchSystem 将会着重创建并初始化剩余的组件。

简要逻辑

components/server/src/server2.rs::run_impl::init_servers-- src/server/raftkv2/node.rs::NodeV2::start---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start     : 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入     : 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm     : 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll     : 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去     : 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply
------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to----------- components/raftstore/src/store/async_io/write.rs::Worker::run--------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db : 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb : 向外部返回 async_write channel 的 tx

------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init : 创建各个 region 的 PeerFsm,也就是 normalFsm : `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块

------ components/batch-system/src/batch.rs::BatchSystem::spawn---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build : 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx) : 构建 Poller、StorePoller---------- components/batch-system/src/batch.rs::Poller::poll : 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理 : 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理 : 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理

--------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control-------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm : 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply
复制代码

关键代码路径

init_servers

components/server/src/server2.rs


主要作用调用 NodeV2::start


fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> {        ...        let engines = self.engines.as_mut().unwrap();        ...
self.node .as_mut() .unwrap() .start( engines.raft_engine.clone(), self.tablet_registry.clone().unwrap(), self.router.as_ref().unwrap(), server.transport(), ... ) ...
server_config }

复制代码

NodeV2::start

src/server/raftkv2/node.rs


主要作用调用 StoreSystem::start


pub fn start<T>(        &mut self,        raft_engine: ER,        registry: TabletRegistry<EK>,        router: &RaftRouter<EK, ER>,        ...    ) -> Result<()>    where        T: Transport + 'static,    {        let store_id = self.id();        if let Some(region) = Bootstrap::new(            ...        )        .bootstrap_first_region(&self.store, store_id)?        {            ...            registry.tablet_factory().open_tablet(ctx, &path).unwrap();        }
...
self.start_store( raft_engine, registry, router, ... )?;
Ok(()) }
fn start_store<T>( &mut self, raft_engine: ER, registry: TabletRegistry<EK>, router: &RaftRouter<EK, ER>, ... ) -> Result<()> where T: Transport + 'static, { ...
let system = &mut self.system.as_mut().unwrap().1; system.start( store_id, store_cfg, raft_engine, registry, ... )?; Ok(()) }
复制代码

StoreSystem::start

components/raftstore-v2/src/batch/store.rs


  • 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入

  • 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm

  • 使用BatchSystem::spawn 方法构建 pollerStorePoller,启动多个异步任务 poller.poll,等待 normalScheduler/controlScheduler 发来的 fsm,然后交给 StorePollerhandle_normal / handle_controlfsm 进行处理

  • 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去

  • 发送控制命令 StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,持久化 committed entries

StorePollerBuilder::new

components/raftstore-v2/src/batch/store.rs


  • 构建 apply_pool,为后续 ApplyFsm 的运行启动做准备

  • 返回 StorePollerBuilder

StorePollerBuilder::init

components/raftstore-v2/src/batch/store.rs


  • 创建各个 regionPeerFsm,也就是 normalFsm

  • PeerFsm 内部会继续创建 Peer,比较关键的是创建了 raftnode,也就是创建了 raft 模块

BatchSystem::spawn

components/batch-system/src/batch.rs


  • 构建 poller

  • 构建 StorePoller


  • 启动多个异步任务 poller.poll,等待 normalScheduler/controlScheduler 发来的 fsm,然后交给 StorePollerhandle_normal / handle_controlfsm 进行处理

StorePollerBuilder::build

components/raftstore-v2/src/batch/store.rs


  • 构建 StorePoller,其 handle_normal / handle_control 方法会对 fsm 进行解析,获取其 msg,递交给 raft 模块进行进一步处理

  • 对于 raft 模块生成的 raftlog,会使用 schedulers.write 通过 async_write/raftLogEngine 将日志存储到 rocksdb 进行持久化

  • 对于 raft 模块生成的 apply entries,会通过 ApplyFsm 使用 tablet_registry 存储到 rocksdb

Poller::poll

poll 函数主要是循环等待 channel 的接收端 fsm_receiver 发送 fsm,然后使用 handler,也就是 StorePoller 进行处理。


components/batch-system/src/batch.rs


pub fn poll(&mut self) {        ...        let mut run = true;        while run && self.fetch_fsm(&mut batch) {            ...            if batch.control.is_some() {                let len = self.handler.handle_control(batch.control.as_mut().unwrap());                ...            }
for (i, p) in batch.normals.iter_mut().enumerate() { ... let res = self.handler.handle_normal(p); ... } let mut fsm_cnt = batch.normals.len(); while batch.normals.len() < max_batch_size { if let Ok(fsm) = self.fsm_receiver.try_recv() { run = batch.push(fsm); } if !run || fsm_cnt >= batch.normals.len() { break; } let p = batch.normals[fsm_cnt].as_mut().unwrap(); let res = self.handler.handle_normal(p); } ... } ... }
fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool { if batch.control.is_some() { return true; }
if let Ok(fsm) = self.fsm_receiver.try_recv() { return batch.push(fsm); }
if batch.is_empty() { self.handler.pause(); if let Ok(fsm) = self.fsm_receiver.recv() { return batch.push(fsm); } } !batch.is_empty() }
复制代码

StorePoller::handle_control

前面 StoreSystem::start 发出的 StoreMsg::Start 消息会触发 StorePoller::handle_control,进而触发 ApplyFsm 的构建:


components/raftstore-v2/src/batch/store.rs


fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> {        ...        let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx);        delegate.handle_msgs(&mut self.store_msg_buf);        ...    }
components/raftstore-v2/src/fsm/peer.rs:pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>) where T: Transport, { for msg in store_msg_buf.drain(..) { match msg { StoreMsg::Start => self.on_start(), StoreMsg::Tick(tick) => self.on_tick(tick), ... } } }
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) { ... if self.fsm.peer.storage().is_initialized() { self.fsm.peer.schedule_apply_fsm(self.store_ctx); } ... }
复制代码

Peer::schedule_apply_fsm

  • 构建 ApplyFsm

  • 启动 handle_all_tasks 异步任务来接受 apply_scheduler 发送过来的 apply entries


components/raftstore-v2/src/operation/command/mod.rs


pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) {                ...        let (apply_scheduler, mut apply_fsm) = ApplyFsm::new(            &store_ctx.cfg,            self.peer().clone(),            region_state,            mailbox,            ...        );
store_ctx .apply_pool .spawn(async move { apply_fsm.handle_all_tasks().await }) .unwrap(); self.set_apply_scheduler(apply_scheduler); }
复制代码

ApplyFsm::handle_all_tasks

Apply 流程异步处理任务,用于处理 raft 生成的 CommittedEntries,对其进行 Apply


components/raftstore-v2/src/fsm/apply.rs


impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {    pub async fn handle_all_tasks(&mut self) {        loop {            ...            let res = futures::select! {                res = self.receiver.next().fuse() => res,                ...            };            ...            loop {                match task {                    // TODO: flush by buffer size.                    ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,                    ...                }
... } } }}
复制代码

StoreWriters::spawn

这个异步任务专门处理 raft 生成的 raft lograft msg


对于 raft log,使用 raft_engine 写入到 rocksdb


对于 raft msg,使用 trans 发送给其他 tikv 实例


components/raftstore/src/store/async_io/write.rs


impl<EK, ER> StoreWriters<EK, ER>where    EK: KvEngine,    ER: RaftEngine,{    pub fn senders(&self) -> WriteSenders<EK, ER> {        WriteSenders::new(self.writers.clone())    }
pub fn spawn<T: Transport + 'static, N: PersistedNotifier>( &mut self, store_id: u64, raft_engine: ER, kv_engine: Option<EK>, notifier: &N, trans: &T, cfg: &Arc<VersionTrack<Config>>, ) -> Result<()> { let pool_size = cfg.value().store_io_pool_size; if pool_size > 0 { self.increase_to( pool_size, StoreWritersContext { store_id, notifier: notifier.clone(), raft_engine, kv_engine, transfer: trans.clone(), cfg: cfg.clone(), }, )?; } Ok(()) }
pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>( &mut self, size: usize, writer_meta: StoreWritersContext<EK, ER, T, N>, ) -> Result<()> { ... self.writers .update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> { let mut cached_senders = writers.get(); for i in current_size..size { let (tx, rx) = bounded( writer_meta.cfg.value().store_io_notify_capacity, ); let mut worker = Worker::new( ... rx, ... ); let t = thread::Builder::new() .name(thd_name!(tag)) .spawn_wrapper(move || { set_io_type(IoType::ForegroundWrite); worker.run(); })?; cached_senders.push(tx); handlers.push(t); } writers.set(cached_senders); Ok(()) })?; Ok(()) }}

impl<EK, ER, N, T> Worker<EK, ER, N, T>where EK: KvEngine, ER: RaftEngine, N: PersistedNotifier, T: Transport,{ fn run(&mut self) { let mut stopped = false; while !stopped { let handle_begin = match self.receiver.recv() { Ok(msg) => { stopped |= self.handle_msg(msg); } ... };
... self.write_to_db(true); } }
pub fn write_to_db(&mut self, notify: bool) { ... let mut write_raft_time = 0f64; if !self.batch.raft_wbs[0].is_empty() { ... for i in 0..self.batch.raft_wbs.len() { self.raft_engine .consume_and_shrink( &mut self.batch.raft_wbs[i], ... ) ... } self.batch.raft_wbs.truncate(1); ... }
for task in &mut self.batch.tasks { for msg in task.messages.drain(..) { ... if let Err(e) = self.trans.send(msg) { ... }... } } ... }}
复制代码

总结

BatchSystem 的整体代码充斥着各种异步任务,还有各种 channel 的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg 是如何在整个系统中流转的。


经过本文的流程梳理,相信大家已经对整体 BatchSystem 有了比较熟悉的认知,后续研究 Raft Msg 流转,或者 RegionMergeSplit 应该比较有信心了。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TIKV 源码学习笔记--BatchSystem 创建初始化流程_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区