作者: ylldty 原文来源:https://tidb.net/blog/1e876bc8
前言
我们在前面的概述里面,已经描述了 batchSystem
的重要组件:
FSM
n * normalFsm
controlFsm
MailBox
n * normalMailBox
controlMailBox
scheduler
normalScheduler
controlScheduler
poller
router
本文将会带大家看一下,TIKV
在启动过程中,是如何构建与初始化 BatchSystem
这些组件的。
另外,BatchSystem
还有两个比较重要的异步任务:
一个是 ApplyFsm
,其是和 normalFsm
一一对应的,每当 normalFsm
内部的 raft
模块生成了 committedEntries
,就需要 ApplyFsm
将其进行 Apply
,最后落盘到 RocksDB
一个是 AsyncWriter
,专门用于处理 multiRaft
生成的 Msg
和 raftLog
,将 Msg
发送给其他 TIKV
实例,将 raftLog
通过 raftLogEngine
持久化到 RocksDB
BatchSystem
的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:
components/batch-system/src/
components/raftstore-v2/src/
components/raftstore/src/
一个是异步任务池和 channel
的大量使用,让 Msg
的流转很难理解。
本文专门研究 BatchSystem
的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel
对应的 Msg
流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystem
、multiRaft
原理、Region
合并与分裂的流程。
BatchSystem 的创建
BatchSystem
创建入口是 components/server/src/server2.rs::run_impl::init_engines
, 主要创建 BatchSystem
的框架,特别是创建并且初始化 control
相关组件,例如 controlFsm
、controlMailBox
、controlScheduler
等等。
简要逻辑
components/server/src/server2.rs::run_impl::init_engines
: 创建 node,内部 StoreSystem 实现 multi raft 分布式协议
: 创建 raftLogEngine 用于存储和恢复节点 raft log
: 创建 tablet_registry 用于存储节点 KV 数据
-- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store
: 创建 NodeV2.StoreSystem
---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system
: 创建并且初始化 controlFsm
------ components/batch-system/src/batch.rs::create_system
: 创建 controlMailBox、controlScheduler、normalScheduler、router
: 创建 BatchSystem
复制代码
关键代码路径
init_engines
components/server/src/server2.rs
init_engines
函数主要用于创建 TIKV
各个重要模块的数据结构对象,其中:
node
对象代表当前 TIKV
的节点实例,每一个 TIKV
程序仅对应一个 node
对象
raft_engine
对象实际上是 RaftLogEngine
,Raft
接受其他节点或者自己节点产生的任何 raftlog
都需要通过这个对象写入 rocksdb
,是分布式系统恢复一致性的重要保证
tablet_registry
模块和 rocksdb
强相关, TIKV
Raft
Apply
后需要持久化的任何 KV
数据都需要通过 tablet_registry
来写入 rocksdb
router
对象我们在前一个文章中已经描述过,其内部包含了各个 Raft Region
MailBox
,专门用于路由 RPC
请求到 raftStore
模块
Node::try_bootstrap_store
src/server/raftkv2/node.rs
这个函数主要功能是生成唯一的 nodeId
,然后利用 create_store_batch_system
创建 raftStore
的各个对象。
pub fn try_bootstrap_store(
&mut self,
cfg: &raftstore_v2::Config,
raft_engine: &ER,
) -> Result<()> {
let store_id = Bootstrap::new(
raft_engine,
self.cluster_id,
&*self.pd_client,
self.logger.clone(),
)
.bootstrap_store()?;
self.store.set_id(store_id);
let (router, system) = raftstore_v2::create_store_batch_system(
cfg,
store_id,
self.logger.clone(),
self.resource_ctl.clone(),
);
self.system = Some((router, system));
Ok(())
}
复制代码
raftstore_v2::create_store_batch_system
components/raftstore-v2/src/batch/store.rs
StoreFsm
实际上就是 controlFsm
,不管有多少个 region
,controlFsm
只有一个,这里提前将 controlFsm
创建出来
StoreFsm::new
会返回两个结果,一个是 fsm
,一个是 channel
的发送端 tx
。channel 的接收端 rx
是 fsm
成员变量,tx
后续会绑定到 controlMailBox
内部
create_system
将会继续创建 raftStore
的其他组件对象
batch_system::create_system
components/batch-system/src/batch.rs
上一个步骤中,controlFsm
就是这里的 controller
参数,sender
是 controlFsm
channel
的发送端。因此 create_system
首先就利用 controlFsm
和 sender
来创建 controlMailBox
接下来,还需要创建 normal_scheduler
、 control_scheduler
与 poller
,这三个组件共用一套 channel
最后,把 controlMailBox
、normal_scheduler
、control_scheduler
放入 router
中去
使用 router
构建 BatchSystem
pool_state_builder
一般用于动态增加或者减少 poller
数量
进度
目前经过 init_engines
处理后,batchSystm
的当前进度为:
FSM
n * normalFsm
(未创建)
controlFsm
(已创建、已初始化)
MailBox
n * normalMailBox
(未创建)
controlMailBox
(已创建、已初始化)
scheduler
normalScheduler
(已创建、已初始化)
controlScheduler
(已创建、已初始化)
poller
(未创建、未初始化)
router
(已创建部分、未初始化)
初始化 BatchSystem
经过 init_engines
处理后,batchSystem
大概框架已经建立完毕,而且 control
组件基本初始化完成。接下来的流程中,batchSystem
将会着重创建并初始化剩余的组件。
简要逻辑
components/server/src/server2.rs::run_impl::init_servers
-- src/server/raftkv2/node.rs::NodeV2::start
---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start
: 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入
: 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm
: 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll
: 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去
: 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply
------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to
----------- components/raftstore/src/store/async_io/write.rs::Worker::run
--------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db
: 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb
: 向外部返回 async_write channel 的 tx
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new
------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init
: 创建各个 region 的 PeerFsm,也就是 normalFsm
: `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块
------ components/batch-system/src/batch.rs::BatchSystem::spawn
---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build
: 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx)
: 构建 Poller、StorePoller
---------- components/batch-system/src/batch.rs::Poller::poll
: 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理
: 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理
: 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理
--------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control
-------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start
------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm
: 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply
复制代码
关键代码路径
init_servers
components/server/src/server2.rs
主要作用调用 NodeV2::start
fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> {
...
let engines = self.engines.as_mut().unwrap();
...
self.node
.as_mut()
.unwrap()
.start(
engines.raft_engine.clone(),
self.tablet_registry.clone().unwrap(),
self.router.as_ref().unwrap(),
server.transport(),
...
)
...
server_config
}
复制代码
NodeV2::start
src/server/raftkv2/node.rs
主要作用调用 StoreSystem::start
pub fn start<T>(
&mut self,
raft_engine: ER,
registry: TabletRegistry<EK>,
router: &RaftRouter<EK, ER>,
...
) -> Result<()>
where
T: Transport + 'static,
{
let store_id = self.id();
if let Some(region) = Bootstrap::new(
...
)
.bootstrap_first_region(&self.store, store_id)?
{
...
registry.tablet_factory().open_tablet(ctx, &path).unwrap();
}
...
self.start_store(
raft_engine,
registry,
router,
...
)?;
Ok(())
}
fn start_store<T>(
&mut self,
raft_engine: ER,
registry: TabletRegistry<EK>,
router: &RaftRouter<EK, ER>,
...
) -> Result<()>
where
T: Transport + 'static,
{
...
let system = &mut self.system.as_mut().unwrap().1;
system.start(
store_id,
store_cfg,
raft_engine,
registry,
...
)?;
Ok(())
}
复制代码
StoreSystem::start
components/raftstore-v2/src/batch/store.rs
启动 async_write
,循环等待 multiRaft
生产的 raftlog
,然后通过 raftLogEngine
写入
构建 StorePollerBuilder
,并且通过初始化 StorePollerBuilder
构建 PeerFsm
使用BatchSystem::spawn
方法构建 poller
、StorePoller
,启动多个异步任务 poller.poll
,等待 normalScheduler
/controlScheduler
发来的 fsm
,然后交给 StorePoller
的 handle_normal
/ handle_control
对 fsm
进行处理
根据 StorePollerBuilder
构建 PeerFsm
来构建 mailboxes
,并将它们注册到 router
成员变量里面去
发送控制命令 StoreMsg::Start
,创建 ApplyFsm
,启动 Apply
的异步任务,持久化 committed entries
StorePollerBuilder::new
components/raftstore-v2/src/batch/store.rs
StorePollerBuilder::init
components/raftstore-v2/src/batch/store.rs
BatchSystem::spawn
components/batch-system/src/batch.rs
StorePollerBuilder::build
components/raftstore-v2/src/batch/store.rs
构建 StorePoller
,其 handle_normal
/ handle_control
方法会对 fsm
进行解析,获取其 msg
,递交给 raft
模块进行进一步处理
对于 raft
模块生成的 raftlog
,会使用 schedulers.write
通过 async_write/raftLogEngine
将日志存储到 rocksdb
进行持久化
对于 raft
模块生成的 apply entries
,会通过 ApplyFsm
使用 tablet_registry
存储到 rocksdb
Poller::poll
poll
函数主要是循环等待 channel
的接收端 fsm_receiver
发送 fsm
,然后使用 handler
,也就是 StorePoller
进行处理。
components/batch-system/src/batch.rs
pub fn poll(&mut self) {
...
let mut run = true;
while run && self.fetch_fsm(&mut batch) {
...
if batch.control.is_some() {
let len = self.handler.handle_control(batch.control.as_mut().unwrap());
...
}
for (i, p) in batch.normals.iter_mut().enumerate() {
...
let res = self.handler.handle_normal(p);
...
}
let mut fsm_cnt = batch.normals.len();
while batch.normals.len() < max_batch_size {
if let Ok(fsm) = self.fsm_receiver.try_recv() {
run = batch.push(fsm);
}
if !run || fsm_cnt >= batch.normals.len() {
break;
}
let p = batch.normals[fsm_cnt].as_mut().unwrap();
let res = self.handler.handle_normal(p);
}
...
}
...
}
fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool {
if batch.control.is_some() {
return true;
}
if let Ok(fsm) = self.fsm_receiver.try_recv() {
return batch.push(fsm);
}
if batch.is_empty() {
self.handler.pause();
if let Ok(fsm) = self.fsm_receiver.recv() {
return batch.push(fsm);
}
}
!batch.is_empty()
}
复制代码
StorePoller::handle_control
前面 StoreSystem::start
发出的 StoreMsg::Start
消息会触发 StorePoller::handle_control
,进而触发 ApplyFsm
的构建:
components/raftstore-v2/src/batch/store.rs
fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> {
...
let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.store_msg_buf);
...
}
components/raftstore-v2/src/fsm/peer.rs:
pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>)
where
T: Transport,
{
for msg in store_msg_buf.drain(..) {
match msg {
StoreMsg::Start => self.on_start(),
StoreMsg::Tick(tick) => self.on_tick(tick),
...
}
}
}
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) {
...
if self.fsm.peer.storage().is_initialized() {
self.fsm.peer.schedule_apply_fsm(self.store_ctx);
}
...
}
复制代码
Peer::schedule_apply_fsm
components/raftstore-v2/src/operation/command/mod.rs
pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) {
...
let (apply_scheduler, mut apply_fsm) = ApplyFsm::new(
&store_ctx.cfg,
self.peer().clone(),
region_state,
mailbox,
...
);
store_ctx
.apply_pool
.spawn(async move { apply_fsm.handle_all_tasks().await })
.unwrap();
self.set_apply_scheduler(apply_scheduler);
}
复制代码
ApplyFsm::handle_all_tasks
Apply
流程异步处理任务,用于处理 raft
生成的 CommittedEntries
,对其进行 Apply
components/raftstore-v2/src/fsm/apply.rs
impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {
pub async fn handle_all_tasks(&mut self) {
loop {
...
let res = futures::select! {
res = self.receiver.next().fuse() => res,
...
};
...
loop {
match task {
// TODO: flush by buffer size.
ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,
...
}
...
}
}
}
}
复制代码
StoreWriters::spawn
这个异步任务专门处理 raft
生成的 raft log
与 raft msg
对于 raft log
,使用 raft_engine
写入到 rocksdb
对于 raft msg
,使用 trans
发送给其他 tikv
实例
components/raftstore/src/store/async_io/write.rs
impl<EK, ER> StoreWriters<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn senders(&self) -> WriteSenders<EK, ER> {
WriteSenders::new(self.writers.clone())
}
pub fn spawn<T: Transport + 'static, N: PersistedNotifier>(
&mut self,
store_id: u64,
raft_engine: ER,
kv_engine: Option<EK>,
notifier: &N,
trans: &T,
cfg: &Arc<VersionTrack<Config>>,
) -> Result<()> {
let pool_size = cfg.value().store_io_pool_size;
if pool_size > 0 {
self.increase_to(
pool_size,
StoreWritersContext {
store_id,
notifier: notifier.clone(),
raft_engine,
kv_engine,
transfer: trans.clone(),
cfg: cfg.clone(),
},
)?;
}
Ok(())
}
pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>(
&mut self,
size: usize,
writer_meta: StoreWritersContext<EK, ER, T, N>,
) -> Result<()> {
...
self.writers
.update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> {
let mut cached_senders = writers.get();
for i in current_size..size {
let (tx, rx) = bounded(
writer_meta.cfg.value().store_io_notify_capacity,
);
let mut worker = Worker::new(
...
rx,
...
);
let t =
thread::Builder::new()
.name(thd_name!(tag))
.spawn_wrapper(move || {
set_io_type(IoType::ForegroundWrite);
worker.run();
})?;
cached_senders.push(tx);
handlers.push(t);
}
writers.set(cached_senders);
Ok(())
})?;
Ok(())
}
}
impl<EK, ER, N, T> Worker<EK, ER, N, T>
where
EK: KvEngine,
ER: RaftEngine,
N: PersistedNotifier,
T: Transport,
{
fn run(&mut self) {
let mut stopped = false;
while !stopped {
let handle_begin = match self.receiver.recv() {
Ok(msg) => {
stopped |= self.handle_msg(msg);
}
...
};
...
self.write_to_db(true);
}
}
pub fn write_to_db(&mut self, notify: bool) {
...
let mut write_raft_time = 0f64;
if !self.batch.raft_wbs[0].is_empty() {
...
for i in 0..self.batch.raft_wbs.len() {
self.raft_engine
.consume_and_shrink(
&mut self.batch.raft_wbs[i],
...
)
...
}
self.batch.raft_wbs.truncate(1);
...
}
for task in &mut self.batch.tasks {
for msg in task.messages.drain(..) {
...
if let Err(e) = self.trans.send(msg) {
...
}...
}
}
...
}
}
复制代码
总结
BatchSystem
的整体代码充斥着各种异步任务,还有各种 channel
的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg
是如何在整个系统中流转的。
经过本文的流程梳理,相信大家已经对整体 BatchSystem
有了比较熟悉的认知,后续研究 Raft Msg
流转,或者 Region
的 Merge
与 Split
应该比较有信心了。
评论