写点什么

悲观事务死锁检测

  • 2022 年 7 月 11 日
  • 本文字数:5453 字

    阅读完需:约 18 分钟

作者: jiyf 原文来源:https://tidb.net/blog/9722d003


【是否原创】是


【首发渠道】TiDB 社区

死锁检测 leader

每个 tikv 都会开启死锁检测进程,开启的进程有 leader 和 follow 两种角色可以切换,默认为 follow 角色。


leader 角色:维护锁的 DAG 信息,接受检测请求,计算 DAG 检测是否有锁存在等


follow 角色:通过 GRPC 将检测请求发送给 leader,接受检测结果


死锁检测 的 leader 为:包含 key 为空字符串的 region,也就是集群按 key 排序第一个 region 的 region leader 所在的 store 将成为死锁检测的 leader。


    const LEADER_KEY: &[u8] = b"";    fn is_leader_region(region: &Region) -> bool {        // The key range of a new created region is empty which misleads the leader        // of the deadlock detector stepping down.        //        // If the peers of a region is not empty, the region info is complete.        is_region_initialized(region)            && region.get_start_key() <= LEADER_KEY            && (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())    }
复制代码


通过订阅 region 变化的信息,包括 region 创建、更新、role 角色变更来捕获 leader region 的角色变化,然后同步死锁检测的角色。


    pub(crate) fn register(self, host: &mut CoprocessorHost<RocksEngine>) {        host.registry            .register_role_observer(1, BoxRoleObserver::new(self.clone()));  // role 变化的时候调用        host.registry            .register_region_change_observer(1, BoxRegionChangeObserver::new(self));  // region 变化的时候改变    }
复制代码

检测接口函数

死锁检测接口函数,当自己是 leader 时候,直接调用 local 函数接口,如果是 follower,那么通过 grpc 向 leader 查询。


    /// Handles detect requests of itself.    /// 处理锁 detect.    fn handle_detect(&mut self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) {        if self.is_leader() {            self.handle_detect_locally(tp, txn_ts, lock);        } else {            for _ in 0..2 {                ...                if self.send_request_to_leader(tp, txn_ts, lock) {                    return;                }                ...            }            ...        }    }
复制代码


从这里接口对应三种查询类型:


  • DetectType::Detect,也就是死锁检测的接口,当悲观事务遇到一个锁的时候,就会通过这个接口来检测是否产生了死锁

  • DetectType::CleanUpWaitFor,删除事务等待的一个锁,事务对这个锁没有等待了

  • DetectType::CleanUp,删除这个事务所有等待的锁,比如事务回滚了,所以就清除这个事务的锁等待信息

死锁检测算法

当悲观事务过程中,尝试锁定一个 key,发现 key 已经被上锁,这时候会调用死锁检测接口,假设当前事务是 txn_1,持有锁的事务是 txn_lock,检测当事务 txn_1 等待 txn_lock 的锁的情况下,存不存在死锁。


检测算法是构建一个 DAG 有向无环图,如果目前集群存在的锁中存在一条从 txn_lock 到 txn_1 的边,那么就代表死锁将会存在。


/// `Locks` is a set of locks belonging to one transaction.struct Locks {    ts: TimeStamp,    // 事务ts吧。    hashes: Vec<u64>,    last_detect_time: Instant,}
/// Used to detect the deadlock of wait-for-lock in the cluster.pub struct DetectTable { /// Keeps the DAG of wait-for-lock. Every edge from `txn_ts` to `lock_ts` has a survival time -- `ttl`. /// When checking the deadlock, if the ttl has elpased, the corresponding edge will be removed. /// `last_detect_time` is the start time of the edge. `Detect` requests will refresh it. // txn_ts => (lock_ts => Locks) wait_for_map: HashMap<TimeStamp, HashMap<TimeStamp, Locks>>,
/// The ttl of every edge. ttl: Duration,
/// The time of last `active_expire`. last_active_expire: Instant,
now: Instant,}其中 wait_for_map 是个两层的 hashMap,第一层 key 是等待锁的事务 txn_ts,第二层 key 是等待的事务 txn_lock,第二层 value 是事务 txn_ts 等待事务 txn_lock 持有的锁列表。
复制代码


wait_for_map 描述了集群事务锁等待的关系,通过 txn_lock,可以查询出当前事务在等待哪些事务的锁、等待哪些锁。


    /// Returns the key hash which causes deadlock.    /// // 检查是否存在死锁。    pub fn detect(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) -> Option<u64> {        let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();        TASK_COUNTER_METRICS.detect.inc();
self.now = Instant::now_coarse(); self.active_expire(); // 清理过期的。
// If `txn_ts` is waiting for `lock_ts`, it won't cause deadlock. // 已经有 txn_tx 等待 lock_ts,那么就不会存在 lock_ts 等待 txn_ts,也就是不会存在死锁。 if self.register_if_existed(txn_ts, lock_ts, lock_hash) { return None; }
if let Some(deadlock_key_hash) = self.do_detect(txn_ts, lock_ts) { ERROR_COUNTER_METRICS.deadlock.inc(); return Some(deadlock_key_hash); // 存在这个死锁。 } self.register(txn_ts, lock_ts, lock_hash); None }
复制代码


算法流程:


  1. 清理过期锁(一般很少走这里,只有等待的事务数量达到 100000,且距离上次清理达到 1 个小时才会执行)

  2. 检查是否存在 txn_ts 在等待事务 txn_lock 的锁,如果已经存在,那么必然不存在 txn_lock 到 txn_ts 的边,必然不会有死锁,那么加入新的锁,返回

  3. 调用 do_detect 函数,遍历构建所有 DAG 检查是否有存在 txn_lock 到 txn_ts 的边,如果存在那么死锁就存在

  4. 如果没有死锁存在,那么说明 txn_ts 等待 txn_lock 不会产生死锁,把 txn_ts 等待 txn_lock 的锁信息添加进去


do_detect 函数构建 DAG 遍历所有从 wait_for_ts(txn_lock)出发的可能,检查有没有到 txn_ts 的边,如果有,那么返回一个存在的锁的 hash,告诉死锁的存在。

唤醒锁等待

死锁检测中维护了从事务出发可以找到所有等待的锁的信息,当锁被释放、超时、死锁存在情况下,需要唤醒等待锁的事务,这里就需要根据锁 id 找到等待的事务,进行唤醒操作。

锁等待信息

/// If a pessimistic transaction meets a lock, it will wait for the lock/// released in `WaiterManager`.////// `Waiter` contains the context of the pessimistic transaction. Each `Waiter`/// has a timeout. Transaction will be notified when the lock is released/// or the corresponding waiter times out.pub(crate) struct Waiter {    pub(crate) start_ts: TimeStamp,    pub(crate) cb: StorageCallback,    /// The result of `Command::AcquirePessimisticLock`.    ///    /// It contains a `KeyIsLocked` error at the beginning. It will be changed    /// to `WriteConflict` error if the lock is released or `Deadlock` error if    /// it causes deadlock.    pub(crate) pr: ProcessResult,    pub(crate) lock: Lock,    delay: Delay,    _lifetime_timer: HistogramTimer,}
复制代码


  • start_ts: 代表等待锁的事务 ts

  • lock: 代表等待的锁

  • pr: 代表等待的锁的结果,例如锁冲突、死锁等

  • delay: 等待超时时间

  • cb: 回调函数,唤醒函数,把锁等待结果 pr 返回给等待锁的事务的钩子函数


wait_table 维护了等待某个锁的所有事务列表,key 为锁的 hashId,value 是等待这个锁的所有 Waiter。


至此,当某个 key 上的锁被释放时候,根据锁的 hash ID 查找到所有的 Waiter,选择等待时间最早的事务进行直接唤醒。

锁唤醒

当事务提交或者回滚以后,事务持有的锁将会被释放,事务持有的每一个锁,都会对其 Waiter 进行唤醒操作(只唤醒等待锁最久的 Waiter)。


    fn handle_wake_up(&mut self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp) {        ...        for hash in hashes {    // 对于事务的每一个锁都进行唤醒操作            let lock = Lock { ts: lock_ts, hash };            // 找到最老的 waiter 进行唤醒            if let Some((mut oldest, others)) = wait_table.remove_oldest_waiter(lock) {                // Notify the oldest one immediately.                self.detector_scheduler                    .clean_up_wait_for(oldest.start_ts, oldest.lock);                oldest.conflict_with(lock_ts, commit_ts);                oldest.notify();                // Others will be waked up after `wake_up_delay_duration`.                //                // NOTE: Actually these waiters are waiting for an unknown transaction.                // If there is a deadlock between them, it will be detected after timeout.                if others.is_empty() {                    // Remove the empty entry here.                    wait_table.remove(lock);                } else {                    others.iter_mut().for_each(|waiter| {                        waiter.conflict_with(lock_ts, commit_ts);                        waiter.reset_timeout(new_timeout);                    });                }            }        }    }
复制代码


参数 lock_ts 代表是有锁的事务,hashes 代表持有的锁信息。


  1. 找到等待时间最久的 Waiter,从 Waiter 列表中删除

  2. 删除死锁检测维护的 txn_ts 到 txn_lock 的锁等待信息

  3. 构建唤醒的 pr 结果,调用唤醒函数

  4. 对于其他的 waiter(除了等待最久剩余的),构建唤醒的 pr,通过等待超时方式唤醒

锁管理接口

/// `LockManager` has two components working in two threads:///   * One is the `WaiterManager` which manages transactions waiting for locks.///   * The other one is the `Detector` which detects deadlocks between transactions.pub struct LockManager {    waiter_mgr_worker: Option<FutureWorker<waiter_manager::Task>>,    detector_worker: Option<FutureWorker<deadlock::Task>>,
waiter_mgr_scheduler: WaiterMgrScheduler, detector_scheduler: DetectorScheduler,
waiter_count: Arc<AtomicUsize>,
/// Record transactions which have sent requests to detect deadlock. detected: Arc<[CachePadded<Mutex<HashSet<TimeStamp>>>]>,
pipelined: Arc<AtomicBool>,}

impl LockManagerTrait for LockManager { fn wait_for( &self, start_ts: TimeStamp, cb: StorageCallback, pr: ProcessResult, lock: Lock, is_first_lock: bool, timeout: Option<WaitTimeout>, ) { let timeout = match timeout { Some(t) => t, None => { cb.execute(pr); return; } };
// Increase `waiter_count` here to prevent there is an on-the-fly WaitFor msg // but the waiter_mgr haven't processed it, subsequent WakeUp msgs may be lost. self.waiter_count.fetch_add(1, Ordering::SeqCst); self.waiter_mgr_scheduler .wait_for(start_ts, cb, pr, lock, timeout);
// If it is the first lock the transaction tries to lock, it won't cause deadlock. if !is_first_lock { // 不是第一个锁的时候,不检测?问题是不加入这个锁信息。那这个锁等待不会被以后的锁检查 self.add_to_detected(start_ts); self.detector_scheduler.detect(start_ts, lock); // 这里检测一下。 } }
fn wake_up( &self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp, is_pessimistic_txn: bool, ) { // If `hashes` is some, there may be some waiters waiting for these locks. // Try to wake up them. if !hashes.is_empty() && self.has_waiter() { self.waiter_mgr_scheduler .wake_up(lock_ts, hashes, commit_ts); } // If a pessimistic transaction is committed or rolled back and it once sent requests to // detect deadlock, clean up its wait-for entries in the deadlock detector. if is_pessimistic_txn && self.remove_from_detected(lock_ts) { self.detector_scheduler.clean_up(lock_ts); } }
fn has_waiter(&self) -> bool { self.waiter_count.load(Ordering::SeqCst) > 0 }}
复制代码


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
悲观事务死锁检测_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区