openGauss 数据库源码解析系列文章——事务机制源码解析(三)
**三、**锁机制
数据库对公共资源的并发控制是通过锁来实现的,根据锁的用途不同,通常可以分为 3 种:自旋锁(spinlock)、轻量级锁(LWLock,light weight lock)和常规锁(或基于这 3 种锁的进一步封装)。使用锁的一般操作流程可以简述为 3 步:加锁、临界区操作、放锁。在保证正确性的情况下,锁的使用及争抢成为制约性能的重要因素,下面先简单介绍 openGauss 中的 3 种锁,最后再着重介绍 openGauss 基于鲲鹏架构所做的锁相关性能优化。
(一)自旋锁
自旋锁一般是使用 CPU 的原子指令 TAS(test-and-set)实现的。只有 2 种状态:锁定和解锁。自旋锁最多只能被一个进程持有。自旋锁与信号量的区别在于,当进程无法得到资源时,信号量使进程处于睡眠阻塞状态,而自旋锁使进程处于忙等待状态。自旋锁主要用于加锁时间非常短的场合,比如修改标志或者读取标志字段,在几十个指令之内。在编写代码时,自旋锁的加锁和解锁要保证在一个函数内。自旋锁由编码保证不会产生死锁,没有死锁检测,并且没有等待队列。由于自旋锁消耗 CPU,当使用不当长期持有时会触发内核 core dump(核心转储),openGauss 中将许多 32/64/128 位变量的更新改用 CAS 原子操作,避免或减少使用自旋锁。
与自旋锁相关的操作主要有下面几个:
(1) SpinLockInit:自旋锁的初始化。
(2) SpinLockAcquire:自旋锁加锁。
(3) SpinLockRelease:自旋锁释放锁。
(4) SpinLockFree:自旋锁销毁并清理相关资源。
(二)LWLock 轻量级锁
轻量级锁是使用原子操作、等待队列和信号量实现的。存在 2 种类型:共享锁和排他锁。多个进程可以同时获取共享锁,但排他锁只能被一个进程拥有。当进程无法得到资源时,轻量级锁会使进程处于睡眠阻塞状态。轻量级锁主要用于内部临界区操作比较久的场合,加锁和解锁的操作可以跨越函数,但使用完后要立即释放。轻量级锁应由编码保证不会产生死锁。但是由于代码复杂度及各类异常处理,openGauss 提供了 LWLock 的死锁检测机制,避免各类异常场景产生的 LWLock 死锁问题。
与轻量级锁相关的函数有如下几个。
(1) LWLockAssign:申请一个 LWLock。
(2) LWLockAcquire:加锁。
(3 )LWLockConditionalAcquire:条件加锁,如果没有获取锁则返回 false,并不一直等待。
(4) LWLockRelease:释放锁。
(5) LWLockReleaseAll:释放拥有的所有锁。当事务过程中出错了,会将持有的所有 LWLock 全部回滚释放,避免残留阻塞后续操作。
相关结构体代码如下:
#define LW_FLAG_HAS_WAITERS ((uint32)1 << 30)
#define LW_FLAG_RELEASE_OK ((uint32)1 << 29)
#define LW_FLAG_LOCKED ((uint32)1 << 28)
#define LW_VAL_EXCLUSIVE ((uint32)1 << 24)
#define LW_VAL_SHARED 1 /* 用于标记LWLock的state状态,实现锁的获取和释放*/
typedef struct LWLock {
uint16 tranche; /* 轻量级锁的ID标识 */
pg_atomic_uint32 state; /* 锁的状态位 */
dlist_head waiters; /* 等锁线程的链表 */
#ifdef LOCK_DEBUG
pg_atomic_uint32 nwaiters; /* 等锁线程的个数 */
struct PGPROC *owner; /* 最后独占锁的持有者 */
#endif
#ifdef ENABLE_THREAD_CHECK
pg_atomic_uint32 rwlock;
pg_atomic_uint32 listlock;
#endif
} LWLock;
复制代码
(三)常规锁
常规锁是使用哈希表实现的。常规锁支持多种锁模式(lock modes),这些锁模式之间的语义和冲突是通过冲突表来定义的。常规锁主要用于业务访问的数据库对象加锁。常规锁的加锁遵守数据库的两阶段加锁协议,即访问过程中加锁,事务提交时释放锁。
常规锁有等待队列并提供了死锁检测机制,当检测到死锁发生时选择一个事务进行回滚。
openGauss 提供了 8 个锁级别分别用于不同的语句并发:1 级锁一般用于 SELECT 查询操作;3 级锁一般用于基本的 INSERT、UPDATE、DELETE 操作;4 级锁用于 VACUUM、analyze 等操作;8 级锁一般用于各类 DDL 语句,具体宏定义及命名代码如下:
#define AccessShareLock 1 /* SELECT语句 */
#define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE语句 */
#define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE语句 */
#define ShareUpdateExclusiveLock \
4 /* VACUUM (non-FULL),ANALYZE, CREATE INDEX CONCURRENTLY语句 */
#define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY)语句 */
#define ShareRowExclusiveLock \
6 /* 类似于独占模式, 但是允许ROW SHARE模式并发 */
#define ExclusiveLock \
7 /* 阻塞ROW SHARE,如SELECT...FOR UPDATE语句 */
#define AccessExclusiveLock \
8 /* ALTER TABLE, DROP TABLE, VACUUM FULL, LOCK TABLE语句 */
复制代码
这 8 个级别的锁冲突及并发控制如表 1 所示,其中表示两个锁操作可以并发。
表 1 锁冲突及并发控制
加锁对象数据结构,通过对 field1->field5 赋值标识不同的锁对象,使用 locktag_type 标识锁对象类型,如 relation 表级对象、tuple 行级对象、事务对象等,对应的代码如下:
typedef struct LOCKTAG {
uint32 locktag_field1; /* 32比特位*/
uint32 locktag_field2; /* 32比特位*/
uint32 locktag_field3; /* 32比特位*/
uint32 locktag_field4; /* 32比特位*/
uint16 locktag_field5; /* 32比特位*/
uint8 locktag_type; /* 详情见枚举类LockTagType*/
uint8 locktag_lockmethodid; /* 锁方法类型*/
} LOCKTAG;
typedef enum LockTagType {
LOCKTAG_RELATION, /* 表关系*/
/* LOCKTAG_RELATION的ID信息为所属库的OID+表OID;如果库的OID为0表示此表是共享表,其中OID为openGauss内核通用对象标识符 */
LOCKTAG_RELATION_EXTEND, /* 扩展表的优先权*/
/* LOCKTAG_RELATION_EXTEND的ID信息 */
LOCKTAG_PARTITION, /* 分区*/
LOCKTAG_PARTITION_SEQUENCE, /* 分区序列*/
LOCKTAG_PAGE, /* 表中的页*/
/* LOCKTAG_PAGE的ID信息为RELATION信息+BlockNumber(页面号)*/
LOCKTAG_TUPLE, /* 物理元组*/
/* LOCKTAG_TUPLE的ID信息为PAGE信息+OffsetNumber(页面上的偏移量) */
LOCKTAG_TRANSACTION, /* 事务ID (为了等待相应的事务结束) */
/* LOCKTAG_TRANSACTION的ID信息为事务ID号 */
LOCKTAG_VIRTUALTRANSACTION, /* 虚拟事务ID */
/* LOCKTAG_VIRTUALTRANSACTION的ID信息为它的虚拟事务ID号 */
LOCKTAG_OBJECT, /* 非表关系的数据库对象 */
/* LOCKTAG_OBJECT的ID信息为数据OID+类OID+对象OID+子ID */
LOCKTAG_CSTORE_FREESPACE, /* 列存储空闲空间 */
LOCKTAG_USERLOCK, /* 预留给用户锁的锁对象 */
LOCKTAG_ADVISORY, /* 用户顾问锁 */
LOCK_EVENT_NUM
} LockTagType;
复制代码
常规锁 LOCK 结构,tag 是常规锁对象的唯一标识,procLocks 是将该锁所有的持有、等待线程串联起来的结构体指针。对应的代码如下:
typedef struct LOCK {
/* 哈希键 */
LOCKTAG tag; /* 锁对象的唯一标识 */
/* 数据 */
LOCKMASK grantMask; /* 已经获取锁对象的位掩码 */
LOCKMASK waitMask; /* 等待锁对象的位掩码 */
SHM_QUEUE procLocks; /* 与锁关联的PROCLOCK对象链表 */
PROC_QUEUE waitProcs; /* 等待锁的PGPROC对象链表 */
int requested[MAX_LOCKMODES]; /* 请求锁的计数 */
int nRequested; /* requested数组总数 */
int granted[MAX_LOCKMODES]; /* 已获取锁的计数 */
int nGranted; /* granted数组总数 */
} LOCK;
复制代码
PROCLOCK 结构,主要是将同一锁对象等待和持有者的线程信息串联起来的结构体。对应的代码如下:
typedef struct PROCLOCK {
/* 标识 */
PROCLOCKTAG tag; /* proclock对象的唯一标识 */
/* 数据 */
LOCKMASK holdMask; /* 已获取锁类型的位掩码 */
LOCKMASK releaseMask; /* 预释放锁类型的位掩码 */
SHM_QUEUE lockLink; /* 指向锁对象链表的指针 */
SHM_QUEUE procLink; /* 指向PGPROC链表的指针 */
} PROCLOCK;
复制代码
t_thrd.proc 结构体里 waitLock 字段记录了该线程等待的锁,该结构体中 procLocks 字段将所有跟该锁有关的持有者和等着串起来,其队列关系如图 1 所示。
图 1 t_thrd.proc 结构体队列关系图
常规锁的主要函数如下。
(1) LockAcquire:对锁对象加锁。
(2) LockRelease:对锁对象释放锁。
(3 )LockReleaseAll:释放所有锁资源。
(四)死锁检测机制
死锁主要是由于进程 B 要访问进程 A 所在的资源,而进程 A 又由于种种原因不释放掉其锁占用的资源,从而数据库就会一直处于阻塞状态。如图 2 中,T1 使用资源 R1 去请求 R2,而 T2 事务持有 R2 的资源去申请 R1。
图 2 死锁状态
形成死锁的必要条件是:资源的请求与保持。每一个进程都可以在使用一个资源的同时去申请访问另一个资源。打破死锁的常见处理方式:中断其中的一个事务的执行,打破环状的等待。openGauss 提供了 LWLock 的死锁检测及常规锁的死锁检测机制,下面简单介绍一下相关原理及代码。
1. LWLock 死锁检测自愈
openGauss 使用一个独立的监控线程来完成轻量级锁的死锁探测、诊断和解除。工作线程在请求轻量级锁成功之前会写入一个时间戳数值,成功获得锁之后置该时间戳为 0。监测线程可以通过快速对比时间戳数值来发现长时间获得不到锁资源的线程,这一过程是快速轻量的。只有发现长时间的锁等待,死锁检测的诊断才会触发。这么做的目的是防止频繁诊断影响业务正常执行能。一旦确定了死锁环的存在,监控线程首先会将死锁信息记录到日志中去,然后采取恢复措施使得死锁自愈,即选择死锁环中的一个线程报错退出。机制如图 3 所示。
图 3 LWLock 死锁检测自愈
因为检测死锁是否真正发生是一个重 CPU 操作,为了不影响数据库性能和运行稳定性,轻量级死锁检测使用了一种轻量式的探测,用来快速判断是否可能发生了死锁。采用看门狗(watchdog)的方法,利用时间戳来探测。工作线程在锁请求进入时会在全局内存上写入开始等待的时间戳;在锁请求成功后,将该时间戳清 0。对于一个发生死锁的线程,它的锁请求是 wait 状态,时间戳也不会清 0,且与当前运行时间戳数值的差值越来越大。由 GUC 参数 fault_mon_timeout 控制检测间隔时间,默认为 5s。LWLock 死锁检测每隔 fault_mon_timeout 去进行检测,如果当前发现有同样线程、同样锁 id,且时间戳等待时间超过检测间隔时间值,则触发真正死锁检测。时间统计及轻量级检测函数如下。
(1) pgstat_read_light_detect:从统计信息结构体中读取线程及锁 id 相关的时间戳,并记录到指针队列中。
(2) lwm_compare_light_detect:跟几秒检测之前的时间对比,如果找到可能发生死锁的线程及锁 id 则返回 true,否则返回 false。
真正的 LWLock 死锁检测是一个有向无环图的判定过程,它的实现跟常规锁类似,这部分会在下面一小节中详细介绍。死锁检测需要两部分的信息:锁,包括请求和分配的信息;线程,包括等待和持有的信息,这些信息记录到相应的全局变量中,死锁监控线程可以访问并进行判断。相关的函数如下。
(1) lwm_heavy_diagnosis:检测是否有死锁。
(2) lwm_deadlock_report:报告死锁详细信息,方便定位诊断。
(3) lw_deadlock_auto_healing:治愈死锁,选择环中一个线程退出。
用于死锁检测的锁和线程相关数据结构如下。
(1) lock_entry_id 记录线程信息,有 thread_id 及 sessionid 是为了适配线程池框架,可以准确的从统计信息中找到相应的信息。对应的代码如下:
typedef struct {
ThreadId thread_id;
uint64 st_sessionid;
} lock_entry_id;
复制代码
(2) lwm_light_detect 记录可能出现死锁的线程,最后用一个链表的形式将当前所有信息串联起来。对应的代码如下:
typedef struct {
/* 线程ID */
lock_entry_id entry_id;
/* 轻量级锁检测引用计数 */
int lw_count;
} lwm_light_detect;
复制代码
(3) lwm_lwlocks 记录线程相关的锁信息,持有锁数量,以及等锁信息。对应的代码如下:
typedef struct {
lock_entry_id be_tid; /* 线程ID */
int be_idx; /* 后台线程的位置 */
LWLockAddr want_lwlock; /* 预获取锁的信息 */
int lwlocks_num; /* 线程持有的轻量级锁个数 */
lwlock_id_mode* held_lwlocks; /* 线程持有的轻量级锁数组 */
} lwm_lwlocks;
复制代码
2. 常规锁死锁检测
openGauss 在获取锁时如果没有冲突可以直接上锁;如果有冲突则设置一个定时器 timer,并进入等待,过一段时间会被 timer 唤起进行死锁检测。如果在某个锁的等锁队列中,进程 T2 排在进程 T1 后面,且进程 T2 需要获取的锁与 T1 需要获取的锁资源冲突,则 T2 到 T1 会有一条软等待边(soft edge)。如果进程 T2 的加锁请求与 T1 进程所持有的锁冲突,则有一条硬等待边(hard edge)。那么整体思路就是通过递归调用,从当前顶点等锁的顶点出发,沿着等待边向前走,看是否存在环,如果环中有 soft edge,说明环中两个进程都在等锁,重新排序,尝试解决死锁冲突。如果没有 soft edge,那么只能终止当前等锁的事务,解决死锁等待环。如图 5-19 所示,虚线代表 soft edge,实线代表 hard fdge。线程 A 等待线程 B,线程 B 等待线程 C,线程 C 等待线程 A,因为线程 A 等待线程 B 的是 soft edge,进行一次调整成为图 4 右边的等待关系,此时发现线程 A 等待线程 C,线程 C 等待线程 A,没有 soft edge,检测到死锁。
图 4 常规锁死锁检测示意图
主要函数如下。
(1) DeadLockCheck:死锁检测函数。
(2) DeadLockCheckRecurse:如果死锁则返回 true,如果有 soft edge,返回 false 并且尝试解决死锁冲突。
(3) check_stack_depth:openGauss 会检查死锁递归检测堆栈(死锁检测递归栈过长,会引发在死锁检测时,长期持有所有锁的 LWLock 分区,从而阻塞业务)。
(4) CheckDeadLockRunningTooLong:openGauss 会检查死锁检测时间,防止死锁检测时间过长,阻塞后面所有业务。对应的代码如下:
static void CheckDeadLockRunningTooLong(int depth)
{ /* 每4层检测一下 */
if (depth > 0 && ((depth % 4) == 0)) {
TimestampTz now = GetCurrentTimestamp();
long secs = 0;
int usecs = 0;
if (now > t_thrd.storage_cxt.deadlock_checker_start_time) {
TimestampDifference(t_thrd.storage_cxt.deadlock_checker_start_time, now, &secs, &usecs);
if (secs > 600) { /* 如果从死锁检测开始超过十分钟,则报错处理。*/
#ifdef USE_ASSERT_CHECKING
DumpAllLocks();/* 在debug版本时,导出所有的锁信息,便于定位问题。*/
#endif
ereport(defence_errlevel(), (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Deadlock checker runs too long and is greater than 10 minutes.")));
}
}
}
}
复制代码
(5) FindLockCycle:检查是否有死锁环。
(6) FindLockCycleRecurse:死锁检测内部递归调用函数。
相应的数据结构有:
(1) 死锁检测中最核心最关键的有向边数据结构。对应的代码如下:
typedef struct EDGE {
PGPROC *waiter; /* 等待的线程 */
PGPROC *blocker; /* 阻塞的线程 */
int pred; /* 拓扑排序的工作区 */
int link; /* 拓扑排序的工作区 */
} EDGE;
复制代码
(2) 可重排的一个等待队列。对应的代码如下:
typedef struct WAIT_ORDER {
LOCK *lock; /* the lock whose wait queue is described */
PGPROC **procs; /* array of PGPROC *'s in new wait order */
int nProcs;
} WAIT_ORDER;
复制代码
(五)无锁原子操作
openGauss 封装了 32、64、128 的原子操作,主要用于取代自旋锁,实现简单变量的原子更新操作。
(1) gs_atomic_add_32:32 位原子加,并且返回加之后的值。对应的代码如下:
static inline int32 gs_atomic_add_32(volatile int32* ptr, int32 inc)
{
return __sync_fetch_and_add(ptr, inc) + inc;
}
复制代码
(2) gs_atomic_add_64:64 位原子加,并且返回加之后的值。对应的代码如下:
static inline int64 gs_atomic_add_64(int64* ptr, int64 inc)
{
return __sync_fetch_and_add(ptr, inc) + inc;
}
复制代码
(3) gs_compare_and_swap_32:32 位 CAS 操作,如果 dest 在更新前没有被更新,则将 newval 写到 dest 地址。dest 地址的值没有被更新,就返回 true;否则返回 false。对应的代码如下:
static inline bool gs_compare_and_swap_32(int32* dest, int32 oldval, int32 newval)
{
if (oldval == newval)
return true;
volatile bool res = __sync_bool_compare_and_swap(dest, oldval, newval);
return res;
}
复制代码
(4) gs_compare_and_swap_64:64 位 CAS 操作,如果 dest 在更新前没有被更新,则将 newval 写到 dest 地址。dest 地址的值没有被更新,就返回 true;否则返回 false。对应的代码如下:
static inline bool gs_compare_and_swap_64(int64* dest, int64 oldval, int64 newval)
{
if (oldval == newval)
return true;
return __sync_bool_compare_and_swap(dest, oldval, newval);
}
复制代码
(5) arm_compare_and_swap_u128:openGauss 提供跨平台的 128 位 CAS 操作,在 ARM 平台下,使用单独的指令集汇编了 128 位原子操作,用于提升内核测锁并发的性能,详情可以参考下一小结。对应的代码如下:
static inline uint128_u arm_compare_and_swap_u128(volatile uint128_u* ptr, uint128_u oldval, uint128_u newval)
{
#ifdef __ARM_LSE
return __lse_compare_and_swap_u128(ptr, oldval, newval);
#else
return __excl_compare_and_swap_u128(ptr, oldval, newval);
#endif
}
#endif
复制代码
(6) atomic_compare_and_swap_u128:128 位 CAS 操作,如果 dest 地址的值在更新前没有被其他线程更新,则将 newval 写到 dest 地址。dest 地址的值没有被更新,就返回新值;否则返回被别人更新后的值。需要注意必须由上层的调用者保证传入的参数是 128 位对齐的。对应的代码如下:
static inline uint128_u atomic_compare_and_swap_u128(
volatile uint128_u* ptr,
uint128_u oldval = uint128_u{0},
uint128_u newval = uint128_u{0})
{
#ifdef __aarch64__
return arm_compare_and_swap_u128(ptr, oldval, newval);
#else
uint128_u ret;
ret.u128 = __sync_val_compare_and_swap(&ptr->u128, oldval.u128, newval.u128);
return ret;
#endif
}
复制代码
评论