作者: ylldty 原文来源:https://tidb.net/blog/1e876bc8
前言
我们在前面的概述里面,已经描述了 batchSystem 的重要组件:
FSM
n * normalFsm
controlFsm
MailBox
n * normalMailBox
controlMailBox
scheduler
normalScheduler
controlScheduler
poller
router
本文将会带大家看一下,TIKV 在启动过程中,是如何构建与初始化 BatchSystem 这些组件的。
另外,BatchSystem 还有两个比较重要的异步任务:
一个是 ApplyFsm,其是和 normalFsm 一一对应的,每当 normalFsm 内部的 raft 模块生成了 committedEntries,就需要 ApplyFsm 将其进行 Apply,最后落盘到 RocksDB
一个是 AsyncWriter,专门用于处理 multiRaft 生成的 Msg 和 raftLog,将 Msg 发送给其他 TIKV 实例,将 raftLog 通过 raftLogEngine 持久化到 RocksDB
BatchSystem 的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:
components/batch-system/src/
components/raftstore-v2/src/
components/raftstore/src/
一个是异步任务池和 channel 的大量使用,让 Msg 的流转很难理解。
本文专门研究 BatchSystem 的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel 对应的 Msg 流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystem、multiRaft 原理、Region 合并与分裂的流程。
BatchSystem 的创建
BatchSystem 创建入口是 components/server/src/server2.rs::run_impl::init_engines, 主要创建 BatchSystem 的框架,特别是创建并且初始化 control 相关组件,例如 controlFsm、controlMailBox、controlScheduler 等等。
简要逻辑
components/server/src/server2.rs::run_impl::init_engines: 创建 node,内部 StoreSystem 实现 multi raft 分布式协议: 创建 raftLogEngine 用于存储和恢复节点 raft log: 创建 tablet_registry 用于存储节点 KV 数据
-- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store : 创建 NodeV2.StoreSystem
---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system : 创建并且初始化 controlFsm
------ components/batch-system/src/batch.rs::create_system : 创建 controlMailBox、controlScheduler、normalScheduler、router : 创建 BatchSystem
复制代码
关键代码路径
init_engines
components/server/src/server2.rs
init_engines 函数主要用于创建 TIKV 各个重要模块的数据结构对象,其中:
node 对象代表当前 TIKV 的节点实例,每一个 TIKV 程序仅对应一个 node 对象
raft_engine 对象实际上是 RaftLogEngine,Raft 接受其他节点或者自己节点产生的任何 raftlog 都需要通过这个对象写入 rocksdb,是分布式系统恢复一致性的重要保证
tablet_registry 模块和 rocksdb 强相关, TIKV Raft Apply 后需要持久化的任何 KV 数据都需要通过 tablet_registry 来写入 rocksdb
router 对象我们在前一个文章中已经描述过,其内部包含了各个 Raft Region MailBox,专门用于路由 RPC 请求到 raftStore 模块
Node::try_bootstrap_store
src/server/raftkv2/node.rs
这个函数主要功能是生成唯一的 nodeId,然后利用 create_store_batch_system 创建 raftStore 的各个对象。
pub fn try_bootstrap_store( &mut self, cfg: &raftstore_v2::Config, raft_engine: &ER, ) -> Result<()> { let store_id = Bootstrap::new( raft_engine, self.cluster_id, &*self.pd_client, self.logger.clone(), ) .bootstrap_store()?; self.store.set_id(store_id);
let (router, system) = raftstore_v2::create_store_batch_system( cfg, store_id, self.logger.clone(), self.resource_ctl.clone(), ); self.system = Some((router, system)); Ok(()) }
复制代码
raftstore_v2::create_store_batch_system
components/raftstore-v2/src/batch/store.rs
StoreFsm 实际上就是 controlFsm,不管有多少个 region,controlFsm 只有一个,这里提前将 controlFsm 创建出来
StoreFsm::new 会返回两个结果,一个是 fsm,一个是 channel 的发送端 tx。channel 的接收端 rx 是 fsm 成员变量,tx 后续会绑定到 controlMailBox 内部
create_system 将会继续创建 raftStore 的其他组件对象
batch_system::create_system
components/batch-system/src/batch.rs
上一个步骤中,controlFsm 就是这里的 controller 参数,sender 是 controlFsm channel 的发送端。因此 create_system 首先就利用 controlFsm 和 sender 来创建 controlMailBox
接下来,还需要创建 normal_scheduler 、 control_scheduler 与 poller,这三个组件共用一套 channel
最后,把 controlMailBox、normal_scheduler、control_scheduler 放入 router 中去
使用 router 构建 BatchSystem
pool_state_builder 一般用于动态增加或者减少 poller 数量
进度
目前经过 init_engines 处理后,batchSystm 的当前进度为:
FSM
n * normalFsm (未创建)
controlFsm (已创建、已初始化)
MailBox
n * normalMailBox (未创建)
controlMailBox (已创建、已初始化)
scheduler
normalScheduler (已创建、已初始化)
controlScheduler (已创建、已初始化)
poller (未创建、未初始化)
router (已创建部分、未初始化)
初始化 BatchSystem
经过 init_engines 处理后,batchSystem 大概框架已经建立完毕,而且 control 组件基本初始化完成。接下来的流程中,batchSystem 将会着重创建并初始化剩余的组件。
简要逻辑
components/server/src/server2.rs::run_impl::init_servers-- src/server/raftkv2/node.rs::NodeV2::start---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start : 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入 : 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm : 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll : 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去 : 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply
------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to----------- components/raftstore/src/store/async_io/write.rs::Worker::run--------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db : 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb : 向外部返回 async_write channel 的 tx
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init : 创建各个 region 的 PeerFsm,也就是 normalFsm : `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块
------ components/batch-system/src/batch.rs::BatchSystem::spawn---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build : 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx) : 构建 Poller、StorePoller---------- components/batch-system/src/batch.rs::Poller::poll : 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理 : 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理 : 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理
--------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control-------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm : 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply
复制代码
关键代码路径
init_servers
components/server/src/server2.rs
主要作用调用 NodeV2::start
fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> { ... let engines = self.engines.as_mut().unwrap(); ...
self.node .as_mut() .unwrap() .start( engines.raft_engine.clone(), self.tablet_registry.clone().unwrap(), self.router.as_ref().unwrap(), server.transport(), ... ) ...
server_config }
复制代码
NodeV2::start
src/server/raftkv2/node.rs
主要作用调用 StoreSystem::start
pub fn start<T>( &mut self, raft_engine: ER, registry: TabletRegistry<EK>, router: &RaftRouter<EK, ER>, ... ) -> Result<()> where T: Transport + 'static, { let store_id = self.id(); if let Some(region) = Bootstrap::new( ... ) .bootstrap_first_region(&self.store, store_id)? { ... registry.tablet_factory().open_tablet(ctx, &path).unwrap(); }
...
self.start_store( raft_engine, registry, router, ... )?;
Ok(()) }
fn start_store<T>( &mut self, raft_engine: ER, registry: TabletRegistry<EK>, router: &RaftRouter<EK, ER>, ... ) -> Result<()> where T: Transport + 'static, { ...
let system = &mut self.system.as_mut().unwrap().1; system.start( store_id, store_cfg, raft_engine, registry, ... )?; Ok(()) }
复制代码
StoreSystem::start
components/raftstore-v2/src/batch/store.rs
启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入
构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm
使用BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll,等待 normalScheduler/controlScheduler 发来的 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理
根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去
发送控制命令 StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,持久化 committed entries
StorePollerBuilder::new
components/raftstore-v2/src/batch/store.rs
StorePollerBuilder::init
components/raftstore-v2/src/batch/store.rs
BatchSystem::spawn
components/batch-system/src/batch.rs
StorePollerBuilder::build
components/raftstore-v2/src/batch/store.rs
构建 StorePoller,其 handle_normal / handle_control 方法会对 fsm 进行解析,获取其 msg,递交给 raft 模块进行进一步处理
对于 raft 模块生成的 raftlog,会使用 schedulers.write 通过 async_write/raftLogEngine 将日志存储到 rocksdb 进行持久化
对于 raft 模块生成的 apply entries,会通过 ApplyFsm 使用 tablet_registry 存储到 rocksdb
Poller::poll
poll 函数主要是循环等待 channel 的接收端 fsm_receiver 发送 fsm,然后使用 handler,也就是 StorePoller 进行处理。
components/batch-system/src/batch.rs
pub fn poll(&mut self) { ... let mut run = true; while run && self.fetch_fsm(&mut batch) { ... if batch.control.is_some() { let len = self.handler.handle_control(batch.control.as_mut().unwrap()); ... }
for (i, p) in batch.normals.iter_mut().enumerate() { ... let res = self.handler.handle_normal(p); ... } let mut fsm_cnt = batch.normals.len(); while batch.normals.len() < max_batch_size { if let Ok(fsm) = self.fsm_receiver.try_recv() { run = batch.push(fsm); } if !run || fsm_cnt >= batch.normals.len() { break; } let p = batch.normals[fsm_cnt].as_mut().unwrap(); let res = self.handler.handle_normal(p); } ... } ... }
fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool { if batch.control.is_some() { return true; }
if let Ok(fsm) = self.fsm_receiver.try_recv() { return batch.push(fsm); }
if batch.is_empty() { self.handler.pause(); if let Ok(fsm) = self.fsm_receiver.recv() { return batch.push(fsm); } } !batch.is_empty() }
复制代码
StorePoller::handle_control
前面 StoreSystem::start 发出的 StoreMsg::Start 消息会触发 StorePoller::handle_control,进而触发 ApplyFsm 的构建:
components/raftstore-v2/src/batch/store.rs
fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> { ... let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx); delegate.handle_msgs(&mut self.store_msg_buf); ... }
components/raftstore-v2/src/fsm/peer.rs:pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>) where T: Transport, { for msg in store_msg_buf.drain(..) { match msg { StoreMsg::Start => self.on_start(), StoreMsg::Tick(tick) => self.on_tick(tick), ... } } }
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) { ... if self.fsm.peer.storage().is_initialized() { self.fsm.peer.schedule_apply_fsm(self.store_ctx); } ... }
复制代码
Peer::schedule_apply_fsm
components/raftstore-v2/src/operation/command/mod.rs
pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) { ... let (apply_scheduler, mut apply_fsm) = ApplyFsm::new( &store_ctx.cfg, self.peer().clone(), region_state, mailbox, ... );
store_ctx .apply_pool .spawn(async move { apply_fsm.handle_all_tasks().await }) .unwrap(); self.set_apply_scheduler(apply_scheduler); }
复制代码
ApplyFsm::handle_all_tasks
Apply 流程异步处理任务,用于处理 raft 生成的 CommittedEntries,对其进行 Apply
components/raftstore-v2/src/fsm/apply.rs
impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> { pub async fn handle_all_tasks(&mut self) { loop { ... let res = futures::select! { res = self.receiver.next().fuse() => res, ... }; ... loop { match task { // TODO: flush by buffer size. ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await, ... }
... } } }}
复制代码
StoreWriters::spawn
这个异步任务专门处理 raft 生成的 raft log 与 raft msg
对于 raft log,使用 raft_engine 写入到 rocksdb
对于 raft msg,使用 trans 发送给其他 tikv 实例
components/raftstore/src/store/async_io/write.rs
impl<EK, ER> StoreWriters<EK, ER>where EK: KvEngine, ER: RaftEngine,{ pub fn senders(&self) -> WriteSenders<EK, ER> { WriteSenders::new(self.writers.clone()) }
pub fn spawn<T: Transport + 'static, N: PersistedNotifier>( &mut self, store_id: u64, raft_engine: ER, kv_engine: Option<EK>, notifier: &N, trans: &T, cfg: &Arc<VersionTrack<Config>>, ) -> Result<()> { let pool_size = cfg.value().store_io_pool_size; if pool_size > 0 { self.increase_to( pool_size, StoreWritersContext { store_id, notifier: notifier.clone(), raft_engine, kv_engine, transfer: trans.clone(), cfg: cfg.clone(), }, )?; } Ok(()) }
pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>( &mut self, size: usize, writer_meta: StoreWritersContext<EK, ER, T, N>, ) -> Result<()> { ... self.writers .update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> { let mut cached_senders = writers.get(); for i in current_size..size { let (tx, rx) = bounded( writer_meta.cfg.value().store_io_notify_capacity, ); let mut worker = Worker::new( ... rx, ... ); let t = thread::Builder::new() .name(thd_name!(tag)) .spawn_wrapper(move || { set_io_type(IoType::ForegroundWrite); worker.run(); })?; cached_senders.push(tx); handlers.push(t); } writers.set(cached_senders); Ok(()) })?; Ok(()) }}
impl<EK, ER, N, T> Worker<EK, ER, N, T>where EK: KvEngine, ER: RaftEngine, N: PersistedNotifier, T: Transport,{ fn run(&mut self) { let mut stopped = false; while !stopped { let handle_begin = match self.receiver.recv() { Ok(msg) => { stopped |= self.handle_msg(msg); } ... };
... self.write_to_db(true); } }
pub fn write_to_db(&mut self, notify: bool) { ... let mut write_raft_time = 0f64; if !self.batch.raft_wbs[0].is_empty() { ... for i in 0..self.batch.raft_wbs.len() { self.raft_engine .consume_and_shrink( &mut self.batch.raft_wbs[i], ... ) ... } self.batch.raft_wbs.truncate(1); ... }
for task in &mut self.batch.tasks { for msg in task.messages.drain(..) { ... if let Err(e) = self.trans.send(msg) { ... }... } } ... }}
复制代码
总结
BatchSystem 的整体代码充斥着各种异步任务,还有各种 channel 的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg 是如何在整个系统中流转的。
经过本文的流程梳理,相信大家已经对整体 BatchSystem 有了比较熟悉的认知,后续研究 Raft Msg 流转,或者 Region 的 Merge 与 Split 应该比较有信心了。
评论