写点什么

TiKV 源码阅读三部曲(二)读流程

  • 2022-11-11
    北京
  • 本文字数:8643 字

    阅读完需:约 28 分钟

原文来源:https://tidb.net/blog/938aaf7b

作者简介:

谭新宇,清华大学软件学院研三在读,Apache IoTDB committer,Talent Plan Community mentor。


TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。


作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。


笔者将结合 TiKV 源码解析系列文章,基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:



希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。本文为第二篇博客,将主要介绍 TiKV 中一条读请求的全链路流程。

读流程

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析 介绍了 TiKV 3.x 版本的 ReadIndex/LeaseRead 实现方案。


本小节将在 TiKV 6.1 版本的源码基础上,以一条读请求为例,介绍当前版本读请求的全链路执行流程。


前文已经提到,可以从 kvproto 对应的 service TiKV 中了解当前 TiKV 支持的 RPC 接口。


经过简单整理,常用的读接口如下:


// Key/value store API for TiKV.service Tikv { 
rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {} rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse) {} rpc KvBatchGet(kvrpcpb.BatchGetRequest) returns (kvrpcpb.BatchGetResponse) {}
rpc RawGet(kvrpcpb.RawGetRequest) returns (kvrpcpb.RawGetResponse) {} rpc RawBatchGet(kvrpcpb.RawBatchGetRequest) returns (kvrpcpb.RawBatchGetResponse) {} rpc RawScan(kvrpcpb.RawScanRequest) returns (kvrpcpb.RawScanResponse) {} rpc RawBatchScan(kvrpcpb.RawBatchScanRequest) returns (kvrpcpb.RawBatchScanResponse) {}
...}
复制代码


以下将以最常用的 KvGet 接口为例介绍读流程,其他的读接口所经过的模块大致相似,之后也可以用断点调试的方案去自行阅读。

KVService

在 KVService 中, handle_request 宏将业务逻辑封装到了 future_get 函数中。在 future_get 函数中,主要使用了storage.get(req.take_context()Key::from_raw(req.get_key()), req.get_version().into())函数将请求路由到 Storage 模块去执行。


为了可观测性,当前 TiKV 在读写关键路径上加了很多全局和 request 级别的 metric,这一定程度上影响了刚开始阅读代码的体验。其实刚开始熟悉代码时只需要关注核心逻辑即可,metric 相关的代码可以先不用细究。


impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv    for Service<T, E, L, F>{    handle_request!(kv_get, future_get, GetRequest, GetResponse, has_time_detail);}
fn future_get<E: Engine, L: LockManager, F: KvFormat>( storage: &Storage<E, L, F>, mut req: GetRequest,) -> impl Future<Output = ServerResult<GetResponse>> {
...
let v = storage.get( req.take_context(), Key::from_raw(req.get_key()), req.get_version().into(), );
async move { let v = v.await; ... Ok(resp) }}
复制代码

Storage

在 Storage 模块的 get 函数中,所有的 task 都会被 spawn 到 readPool 中执行,具体执行的任务主要包含以下两个工作:


  • 使用Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?获取 snapshot

  • 使用snap_store.get(&key, &mut statistics)基于获取到的 snapshot 获取符合对应事务语义的数据


第二个工作比较简单,本小节不再赘述,以下主要介绍第一个工作的具体代码流程。


/// Get value of the given key from a snapshot.////// Only writes that are committed before `start_ts` are visible.pub fn get(    &self,    mut ctx: Context,    key: Key,    start_ts: TimeStamp,) -> impl Future<Output = Result<(Option<Value>, KvGetStatistics)>> {
...
let res = self.read_pool.spawn_handle( async move {
...
let snap_ctx = prepare_snap_ctx( &ctx, iter::once(&key), start_ts, &bypass_locks, &concurrency_manager, CMD, )?; let snapshot = Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?;
{ let begin_instant = Instant::now(); let stage_snap_recv_ts = begin_instant; let buckets = snapshot.ext().get_buckets(); let mut statistics = Statistics::default(); let result = Self::with_perf_context(CMD, || { let _guard = sample.observe_cpu(); let snap_store = SnapshotStore::new( snapshot, start_ts, ctx.get_isolation_level(), !ctx.get_not_fill_cache(), bypass_locks, access_locks, false, ); snap_store .get(&key, &mut statistics) // map storage::txn::Error -> storage::Error .map_err(Error::from) .map(|r| { KV_COMMAND_KEYREAD_HISTOGRAM_STATIC.get(CMD).observe(1_f64); r }) }); ... Ok(( result?, KvGetStatistics { stats: statistics, latency_stats, }, )) } } .in_resource_metering_tag(resource_tag), priority, thread_rng().next_u64(), ); async move { res.map_err(|_| Error::from(ErrorInner::SchedTooBusy)) .await? }}
复制代码


对于Self::snapshot(engine, snap_ctx)函数,其会经由storage::snapshot -> kv::snapshot -> raftkv::async_snapshot -> raftkv::exec_snapshot的调用链来到ServerRaftStoreRouter::read函数中。


/// Get a snapshot of `engine`.fn snapshot(    engine: &E,    ctx: SnapContext<'_>,) -> impl std::future::Future<Output = Result<E::Snap>> {    kv::snapshot(engine, ctx)        .map_err(txn::Error::from)        .map_err(Error::from)}
/// Get a snapshot of `engine`.pub fn snapshot<E: Engine>( engine: &E, ctx: SnapContext<'_>,) -> impl std::future::Future<Output = Result<E::Snap>> { let begin = Instant::now(); let (callback, future) = tikv_util::future::paired_must_called_future_callback(drop_snapshot_callback::<E>); let val = engine.async_snapshot(ctx, callback); // make engine not cross yield point async move { val?; // propagate error let result = future .map_err(|cancel| Error::from(ErrorInner::Other(box_err!(cancel)))) .await?; with_tls_tracker(|tracker| { tracker.metrics.get_snapshot_nanos += begin.elapsed().as_nanos() as u64; }); fail_point!("after-snapshot"); result }}
fn async_snapshot(&self, mut ctx: SnapContext<'_>, cb: Callback<Self::Snap>) -> kv::Result<()> { ...
self.exec_snapshot( ctx, req, Box::new(move |res| match res { ... }), ) .map_err(|e| { let status_kind = get_status_kind_from_error(&e); ASYNC_REQUESTS_COUNTER_VEC.snapshot.get(status_kind).inc(); e.into() })}
fn exec_snapshot( &self, ctx: SnapContext<'_>, req: Request, cb: Callback<CmdRes<E::Snapshot>>,) -> Result<()> {
... let mut cmd = RaftCmdRequest::default(); cmd.set_header(header); cmd.set_requests(vec![req].into()); self.router .read( ctx.read_id, cmd, StoreCallback::read(Box::new(move |resp| { cb(on_read_result(resp).map_err(Error::into)); })), ) .map_err(From::from)}
impl<EK: KvEngine, ER: RaftEngine> LocalReadRouter<EK> for ServerRaftStoreRouter<EK, ER> { fn read( &self, read_id: Option<ThreadReadId>, req: RaftCmdRequest, cb: Callback<EK::Snapshot>, ) -> RaftStoreResult<()> { let mut local_reader = self.local_reader.borrow_mut(); local_reader.read(read_id, req, cb); Ok(()) }}

复制代码


在 ServerRaftStoreRouter::read函数中,其会调用local_readerread函数,并进而路由到LocalReader::propose_raft_command函数。在该函数中,会使用LocalReader::pre_propose_raft_command 函数来判断是否能够 ReadLocal,如果可以则直接获取本地引擎的 snapshot 并执行 callback 返回即可,否则便调用redirect函数连带 callback 路由到 RaftBatchSystem 的对应 normal 状态机中去执行 ReadIndex 读,之后本线程不再处理该任务。


#[inline]pub fn read(    &mut self,    read_id: Option<ThreadReadId>,    req: RaftCmdRequest,    cb: Callback<E::Snapshot>,) {    self.propose_raft_command(read_id, req, cb);    maybe_tls_local_read_metrics_flush();}
pub fn propose_raft_command( &mut self, mut read_id: Option<ThreadReadId>, req: RaftCmdRequest, cb: Callback<E::Snapshot>,) { match self.pre_propose_raft_command(&req) { Ok(Some((mut delegate, policy))) => { let delegate_ext: LocalReadContext<'_, E>; let mut response = match policy { // Leader can read local if and only if it is in lease. RequestPolicy::ReadLocal => { ...
let region = Arc::clone(&delegate.region); let response = delegate.execute(&req, &region, None, read_id, Some(delegate_ext)); // Try renew lease in advance delegate.maybe_renew_lease_advance(&self.router, snapshot_ts); response } // Replica can serve stale read if and only if its `safe_ts` >= `read_ts` RequestPolicy::StaleRead => { ...
let region = Arc::clone(&delegate.region); // Getting the snapshot let response = delegate.execute(&req, &region, None, read_id, Some(delegate_ext));
... } _ => unreachable!(), }; ... cb.invoke_read(response); } // Forward to raftstore. Ok(None) => self.redirect(RaftCommand::new(req, cb)), Err(e) => { let mut response = cmd_resp::new_error(e); if let Some(delegate) = self.delegates.get(&req.get_header().get_region_id()) { cmd_resp::bind_term(&mut response, delegate.term); } cb.invoke_read(ReadResponse { response, snapshot: None, txn_extra_op: TxnExtraOp::Noop, }); } }}
复制代码


需要注意的是,在此处能否 ReadLocal 的判断是可以并行的,也就是乐观情况下并行的读请求可以并行获取底层引擎的 snapshot,不需要经过 RaftBatchSystem 。


那么到底什么时候可以直接读取 snapshot 而不需要经过 RaftStore 走一轮 ReadIndex 来处理呢?原理就是 Lease 机制,可以先简单阅读一下 TiKV Lease Read 的功能介绍


接着让我们回到LocalReader::pre_propose_raft_command函数,其会进行一系列的检查(此处已略去),如果皆通过则会进一步调用inspector.inspect(req)函数,在其内部,其会进行一系列的判断并返回是否可以 ReadLocal。


  • req.get_header().get_read_quorum():如果该请求明确要求需要用 read index 方式处理,所以返回 ReadIndex。

  • self.has_applied_to_current_term():如果该 leader 尚未 apply 到它自己的 term,则使用 ReadIndex 处理,这是 Raft 有关线性一致性读的一个 corner case。

  • self.inspect_lease():如果该 leader 的 lease 已经过期或者不确定,说明可能出现了一些问题,比如网络不稳定,心跳没成功等,此时使用 ReadIndex 处理,否则便可以使用 ReadLocal 处理。


乐观情况下的 ReadLocal 流程我们已经了解,接下来让我们看看 ReadIndex 在 RaftStore 中的执行路径。

RaftStore

前文已经介绍过 RaftBatchSystem 的大体框架,我们已知会有多个 PollHandler 线程调用 poll 函数进入长期循环来事件驱动并动态均衡地管理所有 normal 状态机。


当 ReadIndex 请求被路由到 RaftBatchSystem 中的对应 normal 状态机后,某个 PollHandler 会在接下来的一次 loop 中处理该状态机的消息。


直接定位到RaftPollerhandle_normal函数。可以看到,其会首先尝试获取messages_per_tick次路由到该状态机的消息,接着调用PeerFsmDelegate::handle_msgs函数进行处理,


这里只列出了我们需要关注的几种消息类型:


  • RaftMessage: 其他 Peer 发送过来 Raft 消息,包括心跳、日志、投票消息等。

  • RaftCommand: 上层提出的 proposal,其中包含了需要通过 Raft 同步的操作,以及操作成功之后需要调用的 callback 函数。ReadIndex 请求便是一种特殊的 proposal。

  • ApplyRes: ApplyFsm 在将日志应用到状态机之后发送给 PeerFsm 的消息,用于在进行操作之后更新某些内存状态。


对于 ReadIndex 请求,其会进入PeerMsg::RaftCommand(cmd)分支,进而以PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal的调用链走到store::propose函数中,在该函数中,会再进行一次self.inspect(),如果此时 Leader 的 lease 已经稳定,则会调用read_local函数直接获取引擎的 snapshot 并执行 callback 返回,否则调用read_index函数执行 ReadIndex 流程。


在 read_index 函数中,ReadIndex 请求连带 callback 会被构建成一个 ReadIndexRequest 被 push 到 pending_reads 即一个 ReadIndexQueue 中,之后当前线程即可结束本轮流程,之后的事件会进而触发该 ReadIndexRequest 的执行。


pub fn propose<T: Transport>(    &mut self,    ctx: &mut PollContext<EK, ER, T>,    mut cb: Callback<EK::Snapshot>,    req: RaftCmdRequest,    mut err_resp: RaftCmdResponse,    mut disk_full_opt: DiskFullOpt,) -> bool {
...
let policy = self.inspect(&req); let res = match policy { Ok(RequestPolicy::ReadLocal) | Ok(RequestPolicy::StaleRead) => { self.read_local(ctx, req, cb); return false; } Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb), Ok(RequestPolicy::ProposeTransferLeader) => { return self.propose_transfer_leader(ctx, req, cb); } Ok(RequestPolicy::ProposeNormal) => { // For admin cmds, only region split/merge comes here. if req.has_admin_request() { disk_full_opt = DiskFullOpt::AllowedOnAlmostFull; } self.check_normal_proposal_with_disk_full_opt(ctx, disk_full_opt) .and_then(|_| self.propose_normal(ctx, req)) } Ok(RequestPolicy::ProposeConfChange) => self.propose_conf_change(ctx, &req), Err(e) => Err(e), }; fail_point!("after_propose");
...}
fn read_index<T: Transport>( &mut self, poll_ctx: &mut PollContext<EK, ER, T>, mut req: RaftCmdRequest, mut err_resp: RaftCmdResponse, cb: Callback<EK::Snapshot>,) -> bool {
...
let mut read = ReadIndexRequest::with_command(id, req, cb, now); read.addition_request = request.map(Box::new); self.push_pending_read(read, self.is_leader()); self.should_wake_up = true;
...
true}
复制代码


那么什么条件满足后该 ReadIndexRequest 会被 pop 出队列并执行呢?


前面已经提到 ApplyBatchSystem 在应用一批日志之后首先会调用对应的 callback 尽快回复客户端,之后会发送一条 ApplyRes 的消息到 RaftBatchSystem,该消息和以上的 ReadIndex 请求一样被 PollHandler 在一次 loop 中被处理,并最终进入 PeerFsmDelegate::handle_msgs函数的PeerMsg::ApplyRes { res }分支,接着其会调用PeerFsmDelegate::on_apply_res函数并进入store::peer::post_apply函数,在该函数中,ApplyRes 中携带的信息会被用来更新一些内存状态例如raft_groupcmd_epoch_checker,当然,这些信息也会通过store::peer::post_pending_read_index_on_replicaself.pending_reads.pop_front()来释放某些满足条件的 ReadIndexRequest,对于每个 ReadIndexRequest ,此时可以通过store::peer::response_read 函数来获取底层引擎的 Snapshot 并执行 callback 返回。


fn on_apply_res(&mut self, res: ApplyTaskRes<EK::Snapshot>) {    fail_point!("on_apply_res", |_| {});    match res {        ApplyTaskRes::Apply(mut res) => {                        ...
self.fsm.has_ready |= self.fsm.peer.post_apply( self.ctx, res.apply_state, res.applied_term, &res.metrics, ); ... } ApplyTaskRes::Destroy { region_id, peer_id, merge_from_snapshot, } => { ... } }}
pub fn post_apply<T>( &mut self, ctx: &mut PollContext<EK, ER, T>, apply_state: RaftApplyState, applied_term: u64, apply_metrics: &ApplyMetrics,) -> bool { let mut has_ready = false;
if self.is_handling_snapshot() { panic!("{} should not applying snapshot.", self.tag); }
let applied_index = apply_state.get_applied_index(); self.raft_group.advance_apply_to(applied_index);
self.cmd_epoch_checker.advance_apply( applied_index, self.term(), self.raft_group.store().region(), );
...
if !self.is_leader() { self.post_pending_read_index_on_replica(ctx) } else if self.ready_to_handle_read() { while let Some(mut read) = self.pending_reads.pop_front() { self.response_read(&mut read, ctx, false); } } self.pending_reads.gc();
...
has_ready}
复制代码


综上,ReadIndexRequest 入队和出队的时机已经被介绍,那么 ReadIndex 的整体流程也基本介绍完整了。


通过本小节,希望您能够了解 KVGet 读请求的完整流程,并进而具备分析其他读请求全链路的能力。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TiKV 源码阅读三部曲(二)读流程_TiDB 社区干货传送门_InfoQ写作社区