写点什么

TiKV 源码分析之 PointGet

  • 2024-06-13
    广东
  • 本文字数:9484 字

    阅读完需:约 31 分钟

作者:来自 vivo 互联网存储研发团队-Guo Xiang


本文介绍了 TiDB 中最基本的 PointGet 算子在存储层 TiKV 中的执行流程。

一、背景介绍

TiDB 是一款具有 HTAP 能力(同时支持在线事务处理与在线分析处理 )的融合型分布式数据库产品,具备水平扩容或者缩容等重要特性。TiDB 采用多副本+Multi-Raft 算法的方式将数据调度到不同的机器节点上,具备较高的可靠性和容灾能力。TiDB 中的存储层 TiKV 组件,能够独立于 TiDB 作为一款分布式 KV 数据库使用,目前已经捐赠给 CNCF 并于 2020 年正式毕业。目前 vivo 公司内部的磁盘 KV 产品采用了开源的 TiKV 作为存储层实现, 目前已经在公司的不同业务产品中有深度实践。


TiKV 作为一款 KV 数据库产品,同时提供了 RawAPI 和 TxnAPI 两套接口:

  • RawAPI 仅支持最基本的针对单 Key 操作的 Set/Get/Del 及 Scan 语义

  • TxnAPI 提供了基于 ACID 事务标准的接口,支持多 Key 写入的原子性


TxnAPI 采用了分布式事务来保证多 Key 写入的原子性,其适用的业务场景与 RawAPI 相比来说更为广泛。本文后续内容将重点对 PointGet 在 TiKV 侧的执行流程进行分析,其内容涉及到 storage 和 txn 模块。阅读本文后,读者将会深入了解 TiKV 源码中 Get 流程的实现细节,包括如何处理读请求、如何进行数据定位和读取、如何实现事务隔离级别等方面,并且能够更好地理解 TiKV 的内部工作原理和性能优化。

二、PointGet 介绍

2.1 TiDB 视角中的 PointGet

PointGet 顾名思义即"点查", 它是 TiDB 中最为基本的几种算子之一,以下列举了两个常见的 PointGet 算子的使用场景:


  • 根据主键 Id 查询

MySQL [test]> explain select * from user where id = 1024;+-------------+---------+------+-------------------------------+---------------+| id          | estRows | task | access object                 | operator info |+-------------+---------+------+-------------------------------+---------------+| Point_Get_1 | 1.00    | root | table:user, index:PRIMARY(id) |               |+-------------+---------+------+-------------------------------+---------------+
复制代码
  • 根据唯一索引查询

MySQL [test]> explain select * from users where name = "test";+-------------+---------+------+-------------------------------+---------------+| id          | estRows | task | access object                 | operator info |+-------------+---------+------+-------------------------------+---------------+| Point_Get_1 | 1.00    | root | table:users, index:name(name) |               |+-------------+---------+------+-------------------------------+---------------+
复制代码

2.2 纯 KV 用户视角中的 PointGet

部分业务没有完整地使用 TiDB 组件,而是使用官方提供的 client-go/client-rust 直接访问 PD 和 TiKV。

func testGet(k []byte) (error) {    txn, err := client.Begin()    if err != nil {        return err    }    v, err := txn.Get(context.TODO(), k)    if err != nil {        return err    }    fmt.Printf("value of key is: %+v", v)    return nil}
复制代码

三、PointGet 在 TiDB 中的实现

TiDB 层为计算层,其主要职能为 MySQL 协议的实现以及 SQL 优化器和执行器的构建。客户端发起的所有 SQL, 都会经过以下生命周期流程:

  1. Lexer/Parser 解析后得到 AST,并转换为执行计划。

  2. 执行计划经过 RBO/CBO 后得到优化过后的执行计划。

  3. 基于执行计划构建执行器,其本质是不同的算子"套娃",整体构成一个树型结构。


TiDB 的执行器基于"火山模型"构建,不同的操作算子具有不同的 Executor 实现:

type Executor interface {       base() *baseExecutor    Open(context.Context) error    Next(ctx context.Context, req *chunk.Chunk) error    Close() error    Schema() *expression.Schema}
复制代码


Executor 中最为核心的是三个函数分别是 Open/Next/Close,分别对应算子的初始化、迭代以及收尾逻辑。本文涉及的 PointGet 算子由 PointGetExector 实现,其核心的查询逻辑位于 PointGetExector::Next()函数中。由于相关逻辑耦合了悲观事务,以及 tikv/client-go 中部分 Percolator 的实现,且不属于本文重点分析的主要内容,这里不展开描述,感兴趣的读者可以自行阅读。

四、PointGet 在 TiKV 中的实现

4.1 PointGet 接口定义

TiKV 和 TiDB 使用 gRPC 进行通信,其接口契约定义采用了 protobuf,我们可以在 pingcap/kvproto 项目中找到与 PointGet 相关的接口定义 KvGet 如下:

// Key/value store API for TiKV.service Tikv {    // Commands using a transactional interface.    rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {}    // ... other api definations ...}
复制代码


其中入参 GetRequest 定义如下代码片段,我们可以看到,TiKV 的点查接口除了 key 之外,还额外需要一个名为 version 的参数,即当前事务的 start_ts(事务开始时间戳),这个时间戳是由 TiDB 在启动事务时从 Pd 组件申请而来。与很多数据库类似,TiKV 也采用了 MVCC 机制,即同一个 key 在底层的存储中在不同时刻拥有不同的值,因此要想进行查询,除了 key 之外,还需要带上版本。

// A transactional get command. Lookup a value for `key` in the transaction with// starting timestamp = `version`.message GetRequest {    Context context = 1;    bytes key = 2;    uint64 version = 3;}
复制代码

4.2 TiKV 侧调用堆栈

TiKV 作为 gRPC 的 Server 端,提供了 KvGet 接口的实现,相关调用堆栈为:

+TiKV::kv_get (grpc-poll-thread) +future_get  +Storage::get   +Storage::snapshot (readpool-thread)   +SnapshotStore::get     +PointGetterBuilder::build     +PointGetter::get
复制代码


在一次 KvGet 调用中,函数执行流程会在 grpc-poll-thread 和 readpool-thread 中切换,其中前者为 gRPC 的 poll thread,请求在被路由到 Storage 层后,会根据读写属性路由到不同的线程池中,只读语义的 Get/Scan 请求都会被路由到 ReadPool 中执行,这是一个特定用于处理只读请求的线程池。

4.3 Read through locks 介绍

在分析后续逻辑之前,我们需要对 Read through locks 机制先做个简单介绍。TiKV 使用 Percoaltor 模型来实现分布式事务,同时也引入了 MVCC 机制。然而其实现和传统的 MVCC 实现略有差异:TiKV 的读取过程中若遇到其他事务提交时写入的 Lock, 则需要等待或者尝试解锁,这会阻塞读取直到事务状态确定,一定程度上会损失并发性能。


然而在一些场景(如 SecondaryLocks),在 Key 对应的锁仍然存在的情况下,我们已经知道相关事务的最终状态(提交或回滚)。如果我们将这些事务的最终状态与查询请求一起发送给 TiKV, 那么 TiKV 可以根据这些事务状态来确定能否在有 Lock 的情况下安全读取,避免不必要的等待, 即本小节提到的 Read through lock 机制。


Context 是所有的 TiKV 请求都会携带的上下文信息,为了实现 Read through lock, 

https://github.com/pingcap/kvproto/pull/833 这个 PR 在 Context 中添加了如下字段:

message Context {    // Read requests can ignore locks belonging to these transactions because either    // these transactions are rolled back or theirs commit_ts > read request's start_ts.    repeated uint64 resolved_locks = 13;      // Read request should read through locks belonging to these transactions because these    // transactions are committed and theirs commit_ts <= read request's start_ts.    repeated uint64 committed_locks = 22;}
复制代码


其中 resolved_locks 用于记录读取时可以忽略的锁,这些锁对应的事务可能已被回滚,或者已成功提交但 CommitTS 大于当前的读 StartTS,直接忽略这些锁也不影响快照一致性。


其中 committed_locks 则用于记录逻辑上已被正确提交但物理上 Lock 还未被清理的、且 CommitTS 小于当前读取使用的 StartTS 的事务。由于事务本质上已经被提交,因此读取时可以不需要返回等待,只需要通过 Lock 查询 DefaultCF 中的数据即可。


通过 Read through lock 机制,TiKV 可以在一些 Lock 尚未被清理的情况下直接返回正确的结果,避免了客户端层面的 Wait 和 ResolveLock,其具体实现在后续小节会涉及到。

4.4 Storage::get 流程分析

下方代码块是经过精简过后的伪代码,主要标注了 get 流程中一些比较关键的步骤。

pub fn get(&self, mut ctx: Context, key: Key, start_ts: TimeStamp) -> impl Future<Output = ... >> {  self.read_pool.spawn_handle(async move {         // 1. 创建创建快照需要的上下文     let snap_ctx = prepare_snap_ctx(...);       // 2. 申请一个快照     let snapshot = Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?;            // 3. 创建SnapshotStore对象并执行查询     let snap_store = SnapshotStore::new(...);     let result = snap_store.get(key);       // 4. 更新Metrics和Stats统计信息  });}
复制代码

4.4.1 准备快照上下文

prepare_snap_ctx 顾名思义即准备用于创建快照所需要的上下文对象,即 SnapContext 对象,其完整定义如下:

pub struct SnapContext<'a> {    pub pb_ctx: &'a Context,    pub read_id: Option<ThreadReadId>,    // When start_ts is None and `stale_read` is true, it means acquire a snapshot without any    // consistency guarantee.    pub start_ts: Option<TimeStamp>,    // `key_ranges` is used in replica read. It will send to    // the leader via raft "read index" to check memory locks.    pub key_ranges: Vec<KeyRange>,    // Marks that this snapshot request is allowed in the flashback state.    pub allowed_in_flashback: bool,} fn prepare_snap_ctx<'a>(...) -> Result<SnapContext<'a>> {    if !pb_ctx.get_stale_read() {        concurrency_manager.update_max_ts(start_ts);    }    if need_check_locks(isolation_level) {       concurrency_manager.read_key_check(...)    }    let mut snap_ctx = SnapContext {...};    if need_check_locks_in_replica_read(pb_ctx) {       snap_ctx.key_ranges = ...    }}
复制代码


prepare_snap_ctx 只需要创建一个 SnapContext 对象,但目前实现中多出了如下判断或操作,绝大部分都源于 TiKV5.0 中的 AsyncCommit 特性所需。


1.当本次读取非 StaleRead 时,需要将当前读取请求的 start_ts 与 CurrencyManager 中的 max_ts 进行比较,并将二者中的最大值更新为全局 max_ts。这一操作用于保证异步提交事务计算出来的 MinCommitTs 不会破坏快照一致性。


2. 若当前的隔离级别是 SnapshotIsolation 或者 RcCheckTs 时, 则需要额外检查 CurrencyManager 中的内存锁。如果存在锁且当前 start_ts 大于锁中的 MinCommitTs,TiKV 会直接拒绝本次读取请求。其原因在于 AsyncCommit 事务 Prewrite 结束之前需要暂时阻止使用更新的 start_ts 发起的快照读,否则会导致正在异步提交的事务计算出的 MinCommitTS 无法满足快照一致性。

4.4.2 向 Engine 申请 Snapshot

Engine 是 TiKV 中对上层存储组件的一次抽象,所有实现了 Engine Trait 的具体实现都可以作为 TiKV 中的存储层组件。目前 TiKV 中已经实现了 BTreeEngine/MockEngine/RocksEngine/RaftKV 等多个实现。

pub trait Engine: Send + Clone + 'static {   // 获取用于查询的快照   fn async_snapshot(&mut self, ctx: SnapContext<'_>) -> Self::SnapshotRes;        // 提交写入的Mutation   fn async_write(&self,ctx: &Context,batch: WriteData,subscribed: u8, on_applied: Option<OnAppliedCb>) -> Self::WriteRes;    // 其他接口...}
复制代码


Engine 的接口定义中与读写相关的接口分别是 async_snapshot 和 async_write。目前 TiKV 中的默认 Engine 实现为 RaftKV,即一个基于 Raftstore 的实现。在 RaftKV 中,所有的写入都会通过 Raft 状态机进行 propose/commit/apply 流程,用户可以基于订阅机制获得这 3 个事件的通知从而做出不同处理,默认情况下,TiKV 会在一次写入请求被 RaftLeader apply 成功后返回用户。而读取操作则需要遵循先行一致性读取,在早期版本中,一次读取需要通过 Raft 状态机进行一次 ReadIndex 才能进行,在新版中 TiKV 实现了基于租约的 LeaseRead, 简化了读取流程。本次介绍的 PointGet 读取流程中,会涉及到使用 async_snapshot 获取一个 Engine 在当前时刻的快照,并基于快照进行读取。


TiKV 按照 KeyRange 将 Key 拆分为不同的 Region, 每个 Region 都是一个 RaftGroup,且拥有独立的状态机推进运转。因此,RaftKV-Engine 中 async_snapshot 返回的是一个名为 RegionSnapshot 的对象,其定义如下:

pub struct RegionSnapshot<S: Snapshot> {    snap: Arc<S>,    region: Arc<Region>,    apply_index: Arc<AtomicU64>,    pub term: Option<NonZeroU64>,    pub txn_extra_op: TxnExtraOp,    // `None` means the snapshot does not provide peer related transaction extensions.    pub txn_ext: Option<Arc<TxnExt>>,    pub bucket_meta: Option<Arc<BucketMeta>>,}
复制代码


RegionSnapshot 本质是对底层的 KV 引擎 RocksDB 层面的快照的封装,其逻辑视图如下:


4.4.3 MVCC 实现和快照隔离级别实现

前文提到的 Engine::async_snapshot 接口返回的快照本质是 Engine 在当下时刻的快照,并不等于事务层面的 MVCC 快照,因此在具体查询时,需要配合 StartTS 进行使用。TiKV 中封装了一个 SnapshotStore 用于辅助 MVCC 层面的查询。其定义如下:

pub struct SnapshotStore<S: Snapshot> {    snapshot: S,    start_ts: TimeStamp,    isolation_level: IsolationLevel,    fill_cache: bool,    bypass_locks: TsSet,    access_locks: TsSet,    check_has_newer_ts_data: bool,    point_getter_cache: Option<PointGetter<S>>,}
复制代码


SnapshotStore 中集合了从 Engine 获取的快照和客户端请求附带的 StartTS, 因此可以被认为是一个 MVCC 层面的快照。用户对 SnapshotStore 发起的点查会被委托给内部的 PointGetter。

// PointGetter::getpub fn get(&mut self, user_key: &Key) -> Result<Option<Value>> {        fail_point!("point_getter_get");                  // 根据当前请求使用的隔离级别判定是否需要检查锁        if need_check_locks(self.isolation_level) {            // 如果需要检查锁且锁存在,则需要根据判定锁            if let Some(lock) = self.load_and_check_lock(user_key)? {                return self.load_data_from_lock(user_key, lock);            }        }                  // Percoaltor正常读取流程:从WriteCF中找到<=start_ts中最大的commit_ts,并基于其存储的start_ts到DefaultCF中读取                self.load_data(user_key)}
复制代码


在执行查询前,TiKV 需要根据当前请求的隔离级别判定是否需要检查锁。

pub fn need_check_locks(iso_level: IsolationLevel) -> bool {    matches!(iso_level, IsolationLevel::Si | IsolationLevel::RcCheckTs)}
复制代码


TiKV 支持 SnapshotIsolation/ReadCommitted/ReadCommittedCheckTs 三种隔离级别,其中前两种需要检查锁。其原因在于 LockCf 中的锁是由于事务在 2PC 的第一阶段提交阶段写入的,事务的最终状态无法确定,如果不检查锁直接读取,那么可能导致快照读取被破坏。

fn load_and_check_lock(&mut self, user_key: &Key) -> Result<Option<Lock>> {        // 从LockCf查询该Key的锁信息        let lock_value = self.snapshot.get_cf(CF_LOCK, user_key)?;          if let Some(ref lock_value) = lock_value {            let lock = Lock::parse(lock_value)?;            // 如果存在锁则检查锁是否冲突            if let Err(e) = Lock::check_ts_conflict(                Cow::Borrowed(&lock),                user_key,                self.ts,                &self.bypass_locks,                self.isolation_level,            )        // ...}
复制代码


其中 Lock::check_ts_conflict 的实现中会根据当前的事务隔离级别进行判定,不同的隔离级别的判定逻辑略有差异。由于本文篇幅有限,这里只分析我们常用的快照隔离级别的实现。

fn check_ts_conflict_si(lock: Cow<'_, Self>, key: &Key, ts: TimeStamp, bypass_locks: &TsSet ) -> Result<()> {        if lock.ts > ts            || lock.lock_type == LockType::Lock            || lock.lock_type == LockType::Pessimistic        {            return Ok(());        }          if lock.min_commit_ts > ts {            // Ignore lock when min_commit_ts > ts            return Ok(());        }          if bypass_locks.contains(lock.ts) {            return Ok(());        }          let raw_key = key.to_raw()?;          if ts == TimeStamp::max() && raw_key == lock.primary && !lock.use_async_commit {            // When `ts == TimeStamp::max()` (which means to get latest committed version            // for primary key), and current key is the primary key, we ignore            // this lock.            return Ok(());        }          // There is a pending lock. Client should wait or clean it.        Err(Error::from(ErrorInner::KeyIsLocked(            lock.into_owned().into_lock_info(raw_key),        )))}
复制代码


  • 当 lock.ts > ts 时,当前查询请求可以直接忽略这个锁。其原因在于当前的 lock 是由具有更高 start_ts 的事务写入,因此即便这个事务后续被提交,其 commit_ts 一定大于当前的 start_ts,其新写入的数据是不可见的,不会破坏快照一致性。

  • 当 lock_type==Lock 时,也可以直接忽略这个锁突, 其原因在于 LockType::Lock 是由于创建索引产生,它只用于指示被锁定但不会修改数据,因此也可以直接被忽略。

  • 当 lock_type==Pessistics 时,也可以直接忽略这个锁突,LockType::Pessistics 是由于悲观事务执行 DML 时写入,并未进行到事务提交阶段,即使这个事务很快被提交,由于其 commit_ts 也一定大于当前读取的 start_ts, 直接忽略并不会影响快照一致性。

  • 当 lock.min_commit_ts > ts 时,也可以直接忽略这个锁,其原因在于它能保证这个 AsyncCommit 事务的最终计算出的 commit_ts 一定大于 ts,即使这个事务会被提交,也不会破坏快照一致性。

  • 当 bypass_locks 中包含了当前锁的 start_ts 时, 也可以直接忽略这个锁。bypass_locks 即前面 Read through locks 小节中提到了 resloved_locks,这些锁虽然存在,但它们对应事务要么已经被回滚,要么使用了大于当前读取 start_ts 的 commit_ts 进行提交,无论是哪种情况都不会破坏快照一致性。

  • 其他情况则需要返回 KeyIsLocked 错误给客户端,客户端收到这个错误后则会检查这个锁的过期时间,如果锁尚未过期则需要做 wait,否则会尝试进行解锁恢复这个事务的状态。


若 check_ts_conflict_si 返回 KeyIsLocked 或其他错误后,TiKV 会额外检查 access_locks 里是否包含该锁,如果该锁存在,则 KeyIsLocked 错误则会被忽略,同时锁会被直接返回,外层函数可以通过锁找到 start_ts 从而直接读取 DefaultCF 中的数据。这里的 access_locks 即 Read through locks 中的 committed_locks,即已经知晓被提交的且 commit_ts 小于当前快照读 start_ts 的事务,在这种情况下,直接读取 DefaultCF 是一个超前但安全的操作,原因在在于一旦这个 Lock 被 Resolve,用户通过新的 commit_ts 可以定位到同一个 start_ts。

if let Err(e) = Lock::check_ts_conflict(Cow::Borrowed(&lock),user_key,self.ts,&self.bypass_locks,self.isolation_level) {    if self.access_locks.contains(lock.ts) {        return Ok(Some(lock));   }    Err(e.into())}
复制代码


在不存在 Key 被锁定或冲突,且没有使用 Read through locks 读取后,TiKV 则会进行正常的 Percolator 读取流程,即从 WriteCF 中找到<=start_ts 中最大的 commit_ts,并基于其存储的 start_ts 到 DefaultCF 中读取。

4.4.4 RegionSnapshot 的 Get 实现

RegionSnapshot::get 的实现相对比较简单,逻辑如下:

fn get_value_cf_opt(&self, opts: &ReadOptions, cf: &str, key: &[u8]) -> EngineResult<Option<Self::DbVector>> {    // 1. 检查查询的key是否在Region的范围内, 如果不在则直接返回错误。    check_key_in_range(key,self.region.get_id(),self.region.get_start_key(),self.region.get_end_key()).map_err(|e| EngineError::Other(box_err!(e)))?;          // 2. 基于查询的key拼接出raftstore层面的DataKey (raftstore在写入时会给用户key前添加一个前缀'z')。    let data_key = keys::data_key(key);          // 3. 使用内部的RocksSnapshot查询RocksDB获取key对应的值。    self.snap.get_value_cf_opt(opts, cf, &data_key).map_err(|e| self.handle_get_value_error(e, cf, key))}
复制代码

4.4.5 RocksDB/Titan 的 Get 实现

TiKV 使用 rust-rocksdb 库使用 FFI 实现与 RocksDB C-API 的交互,RocksSnapshot::get 会通过 crocksdb_get_pinned_cf 将查询接口委托给底层的 RocksDB。值得注意的是,TiKV 使用的并不是官方的 RocksDB,而是自行维护的一个整合了 Titan 插件的版本。Titan 是一个受 WiscKey 论文启发而创建的项目,其主要目的是将存入 RocksDB 的大 Value 从 LSM-Tree 中分离出来,存储到额外的 Blob 文件中,从而达到减小写放大的目的。


本小节我们着重分析一下 TitanDB 中一次查询的实现过程(做过大量精简):

Status TitanDBImpl::GetImpl(const ReadOptions& options,                            ColumnFamilyHandle* handle, const Slice& key,                            PinnableSlice* value) {      // 先查询RocksDB  s = db_impl_->GetImpl(options, key, gopts);    // 如果Key的Value不存在或者不是BlobIndex, 则直接返回  if (!s.ok() || !is_blob_index) return s;      // Value是BlobIndex,说明这是一个索引,还需要额外查询BlobStorage  BlobIndex index;  s = index.DecodeFrom(value);  assert(s.ok());  if (!s.ok()) return s;    BlobRecord record;  PinnableSlice buffer;    mutex_.Lock();  // 根据索引查询BlobStorage  auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();  mutex_.Unlock();    if (s.ok()) {    value->Reset();    value->PinSelf(record.value);  }  return s;}
复制代码

五、总结

  1. TiKV 对数据存储层的职能进行了非常合理的抽象,通过 Engine/Snapshot/Iterator 等 trait 定义实现了存储层与上层的解耦。

  2. TiKV 在 RocksDB 提供的多列族原子性写入能力之上实现了 Percolator 模型,提供了分布式事务和 MVCC 等能力,并实现了 AsyncCommit 和 1PC 等改善了事务提交延迟。

  3. TiKV 实现了一个基于 RocksDB 的 KV 分离插件 titan, 借鉴了 Wisckey 的思想将大 Value 从 LSM-Tree 中分离,在大 Value 的业务场景下能够通过降低写放大改善性能。

  4. 从 PointGet 的实现我们可以看到在使用了 MVCC 的情况下,查询时遇到前一事务 Prewrite 产生的 Lock 仍然需要等待 Resolve, 因此在 AsyncCommit 开启的前提下,业务开发需要尽量避免设计事务提交后即刻发起查询的场景,此外也要尽量避免由于大事务提交延迟高影响相关的查询。


参考资料:

发布于: 刚刚阅读数: 10
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
TiKV 源码分析之 PointGet_TiKV_vivo互联网技术_InfoQ写作社区