写点什么

percolator 的理解与开源实现分析

作者:
  • 2022 年 9 月 25 日
    北京
  • 本文字数:10468 字

    阅读完需:约 34 分钟


论文

论文地址


https://research.google/pubs/pub36726/

概述

设计 percolator 的目的


为大数据集群进行增量处理更新的系统,主要用于 google 网页搜索索引服务。使用基于 Percolator 的增量处理系统代替原有的批处理索引系统后,Google 在处理同样数据量的文档时,将文档的平均搜索延时降低了 50%。


基于 bigtable 单行事务实现跨行事务


Percolator 在 Bigtable 之上实现的,以 client library 的方式实现。


Percolator 利用 Bigtable 的单行事务能力,依靠 client 的协议和一个全局的授时服务器 TSO 以及两阶段提交协议来实现了跨机器的多行事务。


MVCC 与 snapshot isolation


Percolator 依据全局时间戳和 MVCC 实现 Snapshot Isolation 隔离级别的并发控制协议。


percolator 特点


  • 事务: 跨行、跨表的、基于快照隔离的 ACID 事务

  • 观察者(observers):一种类似触发器的通知机制

设计

隔离等级

通过 MVCC 来实现 SI(Snapshop isolation)隔离等级。


Percolator 使用 Bigtable 的时间戳记维度实现了数据的多版本化。优点如下:


  • 读操作:可以读取任何指定时间戳版本的记录

  • 写操作,能很好的应对写写冲突:若两个事务操作同一记录,只有一个会提交成功


I 存在 write skew(写偏斜)

锁机制

Because it is built as a client library accessing Bigtable, rather than controlling access to storage itself, Percolator faces a different set of challenges implementing distributed transactions than traditional PDBMSs. Other parallel databases integrate locking into the system component that manages access to the disk: since each node already mediates access to data on the disk it can grant locks on requests and deny accesses that violate locking requirements.


Percolator 锁的管理必须满足以下条件:


  • 能应对机器故障:若一个锁在两阶段提交时消失,系统可能将两个有冲突的事务都提交

  • 高吞吐量:上千台机器会同时请求获取锁

  • 低延时


锁服务要实现:


  • 多副本 : survive failure

  • distributed and balanced : handle load

  • 写入持久化存储系统

时间戳

Timestamp Oracle(不是 Oracle 数据库): TSO 通过统一中心授权可以保证按照递增的方式分配逻辑时钟,任何事件申请的时钟都不会重复,能够保证事务版本号的单调递增,确保分布式事务的时序。


所以 TSO 是一个分配严格的单调递增时间戳的服务器。


优化


因为每个事务都需要调用 oracle 两次,所以这个服务必须有很好的可伸缩性。


Oracle 会定期分配一个范围的时间戳,然后将范围中的最大值写入持久化,Oracle 在内存中原子递增来快速分配时间戳,查询时不涉及磁盘 I/O。如果 oracle 重启,将以存储中的最大值作为开始值。worker 会维持一个长连接 RPC 到 oracle,低频率的、批量的获取时间戳。


性能


Oracle 中单台机器每秒向外分配接近两百万的时间戳。


关于批量获取时间戳


批量获取时间戳并不会造成乱序问题,因为就算事务 A 先获取时间戳 T1,事务 B 后获取时间戳 T2,T1<T2,那么分布式系统中,也无法保证事务 A 先执行,事务 B 后执行。


如果事务 B 先执行,那么事务 A 势必能发现写冲突从而 rollback。


单点


为了保证单调递增的特性,所以很多 TSO 的开源实现都存在单点问题。如 tidb 的 TSO。


而且,一般 TSO 也存在跨数据中心高延迟的问题。


其他时序方案


  • Logic Clock: dynamoDB

  • True Time : spanner

  • Hybrid Logic Clock : cockroachDB,没有单点问题,但是为了解决时钟误差而无法避免的时延问题。

数据存储

percolator 定义了 5 个列



Lock


事务的锁,key value 映射


(key,start_ts) ==> (primary_key,lock_type)
复制代码


  • key:数据的 key

  • start_ts:事务开始时间

  • primary:该锁的 primary 的引用。事务从待修改的 keys 中选择一个作为 primary,其余的则作为 secondary,secondary 的 primary_key 指向 primary 的 key,事务的上锁和解锁都由 primary key 决定。


Write


已提交的数据对应的时间戳。key value 映射


(key,commit_ts) ==> (start_ts)
复制代码


  • key:数据的 key

  • commit_ts:事务的提交时间

  • start_ts:事务的开始时间(此数据在 data 中的时间戳版本)


Data


存储数据的列,key value 映射


(key,start_ts) ==> (value)
复制代码


  • key:对应的主键

  • start_ts:事务的开始时间

  • value:除主键外的数据列


Notify


notify 列仅仅是一个 hint 值(可能是个 bool 值),表示是否需要触发通知。


Ack


ack 列是一个简单的时间戳值,表示最近执行通知的观察者的开始时间。

案例

以银行转账为案例


Bob 向 Joe 转账 7 元。


事务开始时间:start timestamp =7 ,提交时间:commit timestamp=8。



  • Bob 有 10 元:查询 column write 获取最新时间戳版本的数据(data@5),然后从 column data 里面获取时间戳为 5 的数据(2)



  • stat timestamp=7 作为当前事务的开始时间戳,将 Bob 选为此事务的 primary key,再写入 column:lock 对 Bob 上锁,同时将 column:data 列更新为 7:$3。



  • start timestamp=7 作为锁定 Joe 账户的时间戳,更新其 column:data 为 $9,其锁是 secondary 指向 primary



  • 当前时间戳 commit timestamp=8 作为事务提交时间戳:删除 primary 所在的 lock,在 write 列中写入 commit_ts:data@7



  • 在所有 secondary 中写入 column:write 且清理 column:lock,事务完成。

流程

伪代码

class Transaction {    struct Write{ Row row; Column: col; string value;};    vector<Write> writes_;    int start_ts_;
Transaction():start_ts_(oracle.GetTimestamp()) {} // 往事务中添加一行数据 void Set(Write w) {writes_.push_back(w);} bool Get(Row row, Column c, string* value) { while(true) { // 开启bigtable事务 bigtable::Txn = bigtable::StartRowTransaction(row); // Check for locks that signal concurrent writes. // 检查[0, start_ts]内是否有锁,有则等待再重试 if (T.Read(row, c+"locks", [0, start_ts_])) { // There is a pending lock; try to clean it and wait BackoffAndMaybeCleanupLock(row, c); continue; } }
// Find the latest write below our start_timestamp. // 读取column:wriet的[0, start_ts]的最新提交记录版本 latest_write = T.Read(row, c+"write", [0, start_ts_]); if(!latest_write.found()) return false; // no data // 根据此时间戳版本去column:data读取数据 int data_ts = latest_write.start_timestamp(); *value = T.Read(row, c+"data", [data_ts, data_ts]); return true; } // prewrite : 尝试上锁+写数据,如果锁冲突则返回失败 bool Prewrite(Write w, Write primary) { Column c = w.col; // bigtable 事务 bigtable::Txn T = bigtable::StartRowTransaction(w.row);
// 检查写写冲突:从column:write获取最新数据,如果commit_ts>=事务开始时间,则冲突 if (T.Read(w.row, c+"write", [start_ts_, max])) return false; // 检查column:lock,如果存在已存在,则锁冲突 if (T.Read(w.row, c+"lock", [0, max])) return false; // 往column:lock写入锁,若为secondary,则指向primary T.Write(w.row, c+"lock", start_ts_, {primary.row, primary.col}) // 往column:data写入(key, start_ts = value) T.Write(w.row, c+"data", start_ts_, w.value); // 提交bigtable事务 return T.Commit(); } // 此commit整合 2PC的prewrite和commit, // 对外caller而言不需要关注是2PC提交,直接commit就好 bool Commit() { // ***** Prewrite *****// // 2PC 第一阶段:prewrite // 第一行为primary Write primary = write_[0]; // 其余为secondary vector<Write> secondaries(write_.begin() + 1, write_.end()); if (!Prewrite(primary, primary)) return false; for (Write w : secondaries) if (!Prewrite(w, primary)) return false; // ***** Commit *****// // 2PC第二阶段:commit // 从oracle获取commit timestamp int commit_ts = oracle.GetTimestamp();
// Commit primary first. Write p = primary; // primary : 先开始bigtable的事务 bigtable::Txn T = bigtable::StartRowTransaction(p.row); // primary : 如果primiary的锁不存在,则abort if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_])) return false; // aborted while working // primary : 在column:write 列写入开始时间(Pointer to data written at start_ts_) T.Write(p.row, p.col+"write", commit_ts, start_ts_); // primary : 移除column:lock 列 T.Erase(p.row, p.col+"lock", commit_ts); // primary : 提交bigtable 事务 if(!T.Commit()) return false;
// Second phase: write our write records for secondary cells. // 遍历所有secondary,写入column:write数据同时删除column:lock for (Write w:secondaries) { bigtable::write(w.row, w.col+"write", commit_ts, start_ts_); bigtable::Erase(w.row, w.col+"lock", commit_ts); } return true; }};
复制代码


事务


  • 第一阶段

  • 获取时间戳 T1,

  • 写入 column:data 和锁:时间戳都为 T1

  • 第二阶段

  • 获取时间戳 T2

  • 写入 column:write:key:commit_ts:start_ts (key:T2:T1)

  • 删除锁


读取


  • 获取时间戳 Tx

  • 从 column:write 读取 key[0, Tx]的最大时间戳数据(获取到事务写入的 commit_ts=T2)

  • 从 T2 中提取出 start_ts 为 T1

  • 从 column:data 中读取 key:T1 的数据


所以读取到的数据是 commit 时间戳的数据。

清理锁

若客户端在 Commit 一个事务时,出现了异常,Prepare 时产生的锁会被留下。为避免将新事务挂住,Percolator 必须清理这些锁。


Percolator 用 lazy 方式来处理未处理的锁:当事务在执行时,发现其他事务造成的锁未处理掉,事务将决定其他事务是否失败,以及清理其他事务的那些锁。


当客户端在执行两阶段提交的 commit 阶段 crash 时,事务会留下一个提交点 commit point(至少已经写入一条 write 记录),但可能会留下一些 lock 未被处理掉


  • 如果 priarmy lock 已被 write 所替代:意味着该事务已被提交,事务需要 roll forword,也就是对所有涉及到的、未完成提交的数据,用 write 记录替代标准的锁 standed lock。

  • 如果 primary lock 存在:事务将 roll back(因为总是最先提交 primary,所以 primary 未被提交时,可以安全地执行回滚)


这些都是基于 bigtable 的事务中的。


清理操作在 primary 锁上是同步的,所以清理 alive 客户端持有的锁是安全的;然而回滚会强迫事务取消,这会严重影响性能。所以,一个事务将不会清理一个锁除非它猜测这个锁属于一个僵死的 worker。


Percolator 使用简单的机制来确定另一个事务的活跃度。运行中的 worker 会写一个 token 到 Chubby 锁服务来指示他们属于本系统,token 会被其他 worker 视为一个代表活跃度的信号(退出时 token 会被自动删除)。有些 worker 是活跃的,但不在运行中,为了处理这种情况,我们附加的写入一个 wall time 到锁中;一个锁的 wall time 如果太老,即使 token 有效也会被清理。有些操作运行很长时间才会提交,针对这种情况,在整个提交过程中 worker 会周期的更新 wall time。

通知

用户对感兴趣的列编写观察者 function 注册到 percolator,当列发生改变时,percolator 通知 percolator 的 worker 运行用户 function。


通知与写操作不是原子的


通知类似于数据库中的触发器,然而不同的是,通知在其他事务(worker)中执行,所以写操作与观察者执行不是原子的,且观察者的执行会有时效性问题。


这和传统关系型数据库的 ACID 的 C 有一定的差别,我猜: 所以这里不叫 trigger 而是通知的原因


通知与观察者无限循环


编写观察者时,用户要自己考虑通知与观察者进入无限循环的情况(通知->观察者->通知->观察者.....)。


通知的"丢失"


一个列的多次更改只会触发一次通知,所以通知和操作系统的中断一样会存在“丢失”的问题。


实现通知机制


为了实现通知机制,Percolator 需要高效找到被观察的脏 cell。Percolator 在 Bigtable 维护一个“notify”列(notify 列为一个独立的 Bigtable locality group),表示此 cell 是否为脏。当事务修改被观察的 cell 时,则设置 cell 的 notify。worker 对 notify 列执行一个分布式扫描来找到脏 cell。找到 notify 则触发观察者并且等到观察者事务提交成功后,会删除对应的 notify cell。

tidb 的实现

percolator 定义了 5 个列:data, write, lock, ack, notify。


tidb 定义了其中 3 个:data, write, lock。所以 tidb 没有实现 notify 功能(tidb 不需要增量处理能力)。


tidb 定义了 3 个 rocksdb 的 column family:


  • CF_DEFAULT:对应 percolator 的 data 列

  • CF_LOCK:对应 percolator 的 lock 列

  • CF_WRITE:对应 percolator 的 write 列


CF_DEFAULT


(key, start_ts) ==> value
复制代码


CF_LOCK


key ==> lock_info
复制代码


同一时刻一个 key 最多只有一个锁,所以,tidb 的锁没有 start_ts。


CF_WRITE


(key, commit_ts) ==> write_info
复制代码

tidb

  • 1 client 向 tidb 发起开启事务 begin

  • 2 tidb 向 pd 获取 tso 作为当前事务的 start_ts

  • 3 client 向 tidb 执行以下请求:

  • 读操作,从 tikv 读取版本 start_ts 对应具体数据.

  • 写操作,写入 memory 中。

  • 4 client 向 tidb 发起 commit 提交事务请求

  • 5 tidb 开始两阶段提交。

  • 6 tidb 按照 region 对需要写的数据进行分组。

  • 7 tidb 开始 prewrite 操作:向所有涉及改动的 region 并发执行 prewrite 请求。若其中某个 prewrite 失败,根据错误类型决定处理方式:

  • KeyIsLock:尝试 Resolve Lock 后,若成功,则重试当前 region 的 prewrite[步骤 7]。否则,重新获取 tso 作为 start_ts 启动 2pc 提交(步骤 5)。

  • WriteConfict 有其它事务在写当前 key, abort 事务

  • 其它错误,向 client 返回失败。

  • 8 commit : tidb 向 pd 获取 tso 作为当前事务的 commit_ts。

  • 9 tidb 开始 commit:tidb 向 primary 所在 region 发起 commit。 若 commit primary 失败,则先执行 rollback keys,然后根据错误判断是否重试:

  • LockNotExist abort 事务

  • 其它错误,向 client 返回失败。

  • 10 tidb 向 tikv 异步并发向剩余 region 发起 commit。

  • 11 tidb 向 client 返回事务提交成功信息。


所有涉及重新获取 tso 重启事务的两阶段提交的地方,会先检查当前事务是否可以满足重试条件:只有单条语句组成的事务才可以重新获取 tso 作为 start_ts。

tikv

Prewrite

伪代码


  • -> 代表 rpc 调用, 例如tidb->tikv.Prewrite tidb 调用 tikv 的 Prewrite 接口

  • . 代表进程内调用, 例如memory.Put往内存模型写数据


start_ts = tidb->pd.GetTso() // get start_ts
// tidb调用tikv prewrite接口 tidb->tikv.Prewrite(start_ts, data_list){ // tikv prewrite实现 keyIsLockedArray = [] // prewrite each key with start_ts in memory,中间出现失败,则整个prewrite失败 for key in data_list { // check write conflict: 通过raft获取key的数据 record = raft.Get(WriteColumn, key, start_ts) if record.commit_ts >= start_ts { return error(write conflict, END) } // check lock lock = raft.Get(LockColumn, key) if lock != null && lock.ts != start_ts // lock已存在且为其他tx的锁 { keyIsLockedArray.append(key) continue } // 往内存中的 lock 列写入 lock(start_ts,key) 为当前key加锁, // 若当前key被选为 primary, 则标记为 primary, // 若为secondary,则标明指向primary的信息。 memory.Put(LockColumn, key, start_ts, (primary|secondary), ttl, short_value) memory.Put(DataColumn, key, start_ts, long_value) } if len(keyIsLockedArray) > 0 { return error(keyIsLockedArray) } // 将此事务在内存模型中写入的数据 持久化到raft中 raft.Commit(memory_data) return ok}
复制代码

Commit

伪代码


// tidb调用rikv的Commit接口,进行2PC的Commit阶段tidb->tikv.Commit(keys, start_ts, commit_ts){    for key in keys // do commit    {        lock = raft.Get(LockColumn, key)        // lock存在且匹配,则提交        if lock != null && lock.ts == start_ts         {            memory.Put(WriteColumn, key, commit_ts, start_ts)            memory.Del(LockColumn, key, start_ts)           }       // lock does not exist or tx dismatch       else if lock == null || lock.ts != start_ts        {           record = raft.Get(WriteColumn, key, start_ts, commit_ts)           if record != null && record.write_type == (PUT|DELETE|Lock)           {               continue; // already commited           } else if record == null || record.write_type == RollBack           {               return error(tx conflict, lock not exist)           }       }           }    // commit to raft    raft.Save(memory.data)    return ok}
复制代码

Rollback

当事务在两阶段提交过程中失败时, tidb 会向当前事务涉及到的所有 tikv 发起回滚操作。


伪代码


// tidb调用tikv的Rollback接口tidb->tikv.Rollback(keys){   // Rollback接口实现
// 检查合法性 for key in keys { // 检查当前key的锁 lock=memory.GetLockColumn(start_ts, key) if lock != null and lock.ts = start_ts { // 如果锁还存在且是之前的锁,则删除锁,写入的数据 // 且在WriteColumn写入rollback记录防止后面commit请求的到来 memory.Del(DataColumn, key, start_ts) memory.Put(WriteColumn, key, start_ts, rollback) meomry.Del(LockColumn, key, start_ts) continue } // 检查提交情况 record = raft.Get(WriteColumn, key, start_ts) if record != null { if record.status == (PUT|DELETE) { return error(transaction is already commited.) } else if record.status == RollBack { continue; // already rollbacked } } else { // record is null // 提交纪录不存在,说明当前 key 尚未被 prewrite 过, // 为预防 prewrite 在rollback之后过来(可能网络原因), // 在这里留下 (key,start_ts,rollback)记录 memory.Put(WriteColumn, key, start_ts, rollback) continue } // persist raft->Save(memory.data) } return ok}
复制代码

Resolve Lock

若客户端在 Commit 一个事务时,出现了异常,Prepare 时产生的锁会被留下。为避免将新事务 hang 住,Percolator 必须清理这些锁。


Percolator 用 lazy 方式处理这些锁:当事务 A 在执行时,发现事务 B 造成的锁冲突,事务 A 将决定事务 B 是否失败,以及清理事务 B 的那些锁。


tidb 在执行 prewrite, get 过程中,若遇到锁,在锁超时的情况下,会向 tikv 发起清锁操作。


// tidb调用tikv的ResolveLock接口tidb->tikv.ResolveLock(start_ts, commit_ts){    // 找出所有 lock.ts==start_ts 的锁并执行清锁操作    locks = raft.Scan(LockColumn, lock.ts = start_ts)    for (lock in locks)    {        // commit_ts存在,则说明已提交        if (commit_ts != null)        {            // 对已上锁的key进行提交            memory.Commit(lock.key, commit_ts)        } else {            memory.Rollback(lock.key)        }           }    raft->Save(memory.data)        return ok}
复制代码

Get

// tidb调用tikv的Get接口tidb->tikv.Get(key, start_ts){    // check lock: 如果锁存在且在此之前加的锁,则返回锁冲突    lock = raft->Get(LockColumn, key)    if (lock != null && lock.ts <= start_ts)        return error(isLocked);    version = start_ts - 1;    RAFTGET:    // get writeColumn: 获取小于start_ts的最新提交记录    data = raft->Get(WriteColumn, key, version)    if (data != null)    {        if (data.writeType == PUT)        {            if (data.isShortValue)                return data.shortValue;            // long value            return raft->Get(DataColumn, key, start_ts)        } else if (data.writeType == DELETE)        {            // 没有出现过该值,或该值最近已被删除,返回tidb空            return ok(None);        } else if (data.writeType == "LOCK | ROLLBACK")        {            // version=commit_ts-1, 继续查找下一个最近版本            version=commit_ts-1            goto RAFTGET        }    } else {        return ok(None)    }}
复制代码

GC

TiDB 的事务的实现采用了 MVCC 机制,当新写入的数据覆盖旧的数据时,旧的数据不会被替换掉,而是与新写入的数据同时保留,并以时间戳来区分版本。


GC 的任务便是清理不再需要的旧数据。


一个 TiDB 集群中会有一个 TiDB 实例被选举为 GC leader,GC 的运行由 GC leader 来控制。


GC 会被定期触发。每次 GC 时,首先,TiDB 会计算一个称为 safe point 的时间戳,接下来 TiDB 会在保证 safe point 之后的快照全部拥有正确数据的前提下,删除更早的过期数据。


每一轮 GC 分为以下三个步骤:


  • Resolve Locks:该阶段会对所有 Region 扫描 safe point 之前的锁,并清理这些锁

  • Delete Ranges:该阶段快速地删除由于 DROP TABLE/DROP INDEX 等操作产生的整区间的废弃数据

  • Do GC:该阶段每个 TiKV 节点将会各自扫描该节点上的数据,并对每一个 key 删除其不再需要的旧版本


// tidb 向 tikv 发起 GC操作,要求清理 safe-point 版本之前的所有无意义版本
// tidb调用tikv的GC接口tidb->tikv.Gc(savePoint){ startKey=null; for { // scan (startKey, maxKey] keys = raft.Scan(WriteColumn, startKey, batchSize) if (batch == null) return ok; // 对每个key进行gc操作 for (key in keys) { bool remove_older = false; // 清理write中commit_ts <= startPoint的每个item // item: 一个key的所有write记录 for (item in key) { if (remove_older is true) { memory.Del(WriteColumn, item); memory.Del(DataColumn, item); } // check if is ordest version to save if (writeType == PUT) { // 保留该提交,清理所有该提交之前的数据 memory.Set(memoryOrder, true); } else if (writeType == DELETE) { // 清理所有safe-point 之前的数据 remove_older = true; memory.Del(WriteColumn, item); } else if (writeType == ROLLBACK | LOCK) { // 清理所有小于 safe-point 的 Rollback 和 Lock memory.Del(WriteColumn, item); } } } startKey = keys[keys.length() - 1]; }}
复制代码

优化

Parallel Prewrite

tikv 分批的并发进行 prewrite,不会像 percolator 要先 prewrite primary,再去 prewrite secondary。


如果事务冲突,导致 rollback,在 tikv 的 rollback 实现中,其会留下 rollback 记录,这样就会导致事务的 prewrite 失败,而不会产生副作用。

Short Value

对于 percolator,先读取 column:write 列,提取到 key 的 start_ts,再去 column:data 列读取 key 数据本身。


这样会造成两次读取,tidb 的优化是,如果数据本身很小,那么就直接存储在 colulmn:write 中,只需读取一次即可。

Point Read Without Timestamp

为了减少一次 RPC 调用和减轻 TSO 压力,对于单点读,并不需要获取 timestamp。


因为单点读不存在跨行一致性问题(读取多行数据时,必须是同一个版本的数据),所以直接可以读取最新的数据即可。

Calculated Commit Timestamp

如果不通过 TSO 获取 commit_ts,则会减少一次 RPC 交互从而降低事务的时延。


然而,为了实现 SI 的 RR 特性(repeatable read),所以 commit_ts 需要确保其他事务多次读取的值是一样的。那么 commit_ts 就和其他事务的读取有相关性。


下面公式可以计算出一个 commit_ts


max{start_ts, max_read_ts_of_written_keys} < commit_ts <= now
复制代码


由于不可能记录每个 key 的最大的读取时间,但是可以记录每个 region 的最大读取时间,所以公式转换为:


commit_ts = max{start_ts, region_1_max_read_ts, region_2_max_read_ts, ...} + 1
复制代码


region_x_max_read_ts : 事务涉及到的 key 的 region。

Single Region 1PC

对于事务只涉及到一个 Region,那么其实是没有必要走 2PC 流程的。直接提交事务即可。


发布于: 刚刚阅读数: 4
用户头像

关注

还未添加个人签名 2018.06.12 加入

还未添加个人简介

评论

发布
暂无评论
percolator的理解与开源实现分析_数据库_楚_InfoQ写作社区