写点什么

GaussDB 通信运维:详解 stream 连接池设计原理

  • 2024-02-19
    广东
  • 本文字数:5137 字

    阅读完需:约 17 分钟

GaussDB通信运维:详解stream连接池设计原理

本文分享自华为云社区《GaussDB(DWS) 集群通信系列二:stream线程池设计》,作者:半岛里有个小铁盒。

1.前言


适用版本:【8.1.0(及以上)】


GaussDB(DWS)分布式架构的 Stream 算子作为 SQL join 操作时频繁发生的执行算子,共存在三种模式:Gather、Redistribute、Broadcast,分别负责 CN 节点 GATHER 数据,DN 节点 REDISTRIBUTE 和 BROACAST 数据。大集群高并发场景下,Stream 算子过多可能会导致通信的性能瓶颈,引起性能劣化(2000 个 stream 同时启动,进程初始化耗时从 ms 级劣化到 s 级),因此需要尽可能减少 Stream 算子。但是在某些现场环境下,存在数据倾斜、join 查询不包含必要分布键等客观情况,Stream 算子无法有效减少,为多表 join 场景下的查询时延保障带来挑战。因此 GaussDB(DWS)对于线程初始化->线程任务执行->线程退出执行的流程方面做了 stream 线程池优化,减少了线程初始化与线程退出所带来的开销。

2.实现原理


stream 线程是临时线程,随 query 启动和退出,负责 stream 算子的执行,stream 线程初始化和退出都会争抢锁等进程级资源,在 stream 线程个数无法进一步优化的场景下,需要设计有效方案以减少 stream 线程初始化和退出的时间代价,将进程初始化耗时稳定在 ms 级,保障数据库的确定性时延查询。Stream 线程池的核心思想是等 stream 线程执行完计划任务,保留必要且可复用的线程信息,将线程放入线程池中



线程池中的线程执行过程如上图所示,其具体步骤为:


  • 步骤一:线程信息初始化

  • 步骤二:线程待唤醒后轻量级初始化(query 级初始化)

  • 步骤三:线程任务执行

  • 步骤四:线程清理

  • 返回步骤二:继续等待下条 query 执行


在返回步骤二时,当线程等待超时、超出线程池容量(最大 stream 线程个数)、异常时线程已不可用,需要销毁。


其中步骤一中在线程初始化时,需要执行的操作有:线程创建、创建相关内存上下文、信号处理函数注册、内存追踪信息初始化、初始化 GUC 选项等操作;


步骤二中在线程轻量级/查询级初始化时,需要执行的操作有恢复 GUC 参数、初始化 BackendParams、重置 GUC 参数等操作。



stream 线程池为了高效管理线程的出/入池操作,采用无锁队列实现。定义结构体 ThreadSlot 保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的 database oid、线程执行所需的信息 StreamProducer、线程唤醒所需的锁和条件变量。


当线程还未被创建时,初始化一定数量的 ThreadSlot 数量以预留 stream 线程,这些 ThreadSlot 被保存在数组 threadSlots 中。当 stream 线程执行完毕,需要将 stream 线程放置到表征可复用线程的无锁队列,称之为 idleRing;当线程因为超时、异常等原因不再复用,需要退出时,将 stream 线程对应的 ThreadSlot 放置到表征未创建线程的无锁队列,称之为 emptyRing。


idleRing 的作用是为了快速获取并复用线程池中的线程,emptyRing 的作用是快速获取一个未被使用的 ThreadSlot 结构,以创建一个新的 stream 线程。由于 stream 线程的初始化信息和 database 是强相关的,如果不保留 database 相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足 database 信息匹配。对于设计线程池而言,每一个 database 都应该对应一个 idleRing。


综上所述,基于无锁队列的 stream 线程池设计如下所示:



从上图可以看出,一个线程池包含预留 stream 线程结构的 threadSlots、一个表征未创建线程的无锁队列 emptyRing 和表征可复用线程的无锁队列 idleRing,由于每个 database 对应一个 idleRing,因此多个 idleRing 被组织为链表结构。

3.具体实现机制

3.1 数据结构设计


定义结构体 ThreadSlot 保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的 database oid、线程执行所需的信息 StreamProducer,StreamProducer 是父线程向子线程传递的唯一结构、线程唤醒所需的锁和条件变量。


typedef struct{    int status;    uint32 idx;    ThreadId tid;    Oid dbOid;    StreamProducer* streamObj;    pthread_mutex_t m_mutex;    pthread_cond_t m_cond;} ThreadSlot;
复制代码


定义结构体 StreamThreadPool 表征线程池,其中 size 表示线程池中拟预留的 ThreadSlot 个数,ThreadSlot 被保存在 threadSlots 数组中;无锁队列 emptyRing 用来保存未创建线程的 ThreadSlot,对应地,idleRing 用来保存空闲的已创建 stream 线程的 ThreadSlot。结构如下所示:


class StreamThreadPool: public BaseObject{public:    StreamThreadPool();    void Init(int num);                            				     // streamThreadPool init       int Call(StreamProducer* obj);                   			     // 获取idle线程 或 create 新线程    bool Wait();                                                      // idle线程等待唤醒或者超时退出    ThreadSlot* GetLocalSlot();                                       // get streamThreadSlot    void SetLocalSlot(int slotIdx);                                   // set streamThreadSlot    StreamPool* GetLocalPool();                                       // 获取streamDBPool 或 新建一个    ThreadSlot* PopSlot();                                            // 从idleRing/emptyRing获取一slot    void PushToEmpty(ThreadSlot* slot);                               // 将slot直接放入emptyRing    void PushToIdle(StreamPool* pool, ThreadSlot* slot);              // 将slot直接放入idleRing    void LocalPushToIdle();                                           // 根据状态,将slot放入idleRing    void LocalPushToEmpty();                                          // 根据状态,将slot放入emptyRing    int CleanStreamPool(const char *dbName, cleanOption cleanMode);   // 根据db信息清线程    void CleanInAllStreamPool(int desNum);                            // 调整线程池中stream线程个数    int GetStreamNum();                                               // 获取线程池中stream线程个数    bool Release();                                                   // 判断超时线程是否需要清理    bool TimeoutClean();                                              // 清理超时idle线程
private: int size; ThreadSlot* threadSlots; ArrayLockFreeQueue emptyRing; StreamPool* PoolListHead;}
复制代码


定义结构体 StreamPool,由于 stream 线程的初始化信息和 database 是强相关的,如果不保留 database 相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足 database 信息匹配,所以一个 emptyRing 和一个 database 相匹配,保存在链表 PoolListHead 中。


typedef struct StreamPool{    Oid dbOid;    ArrayLockFreeQueue idleRing;    struct StreamPool* next;} StreamPool;
复制代码


综上,我们可以得到各结构间组织的直观图,如下所示:



上图中 threadSlots 可以放在 idleRing(蓝色)、emptyRing(绿色)和运行空间(黄色)中。

3.2 stream 线程状态转移 DFA 设计


每一个记录线程信息的结构 ThreadSlot 中都保存了线程当前的状态 status,记录线程状态的目的是为了保障线程执行过程的有序控制,也可以通过状态的互斥避免 threadSlot 不会被两个线程同时使用。


stream 线程状态转移用确定性有限状态机(DFA,definite automata)表征,共包含 4 个状态:


STREAM_SLOT_EXIT、STREAM_SLOT_IDLE、STREAM_SLOT_HOLD 和 STREAM_SLOT_RUN 状态。其物理含义如下:


  • STREAM_SLOT_EXIT:线程退出状态,表示线程未被创建或线程已退出;

  • STREAM_SLOT_IDLE:线程可复用状态,表示线程在 idleRing 中,可以被复用;

  • STREAM_SLOT_HOLD:线程临时独占状态,表示线程在做进入下一个状态的准备工作;

  • STREAM_SLOT_RUN:线程运行状态,表示线程正在执行任务。


状态间转移条件如下所示,图中粗箭头表示状态机主循环部分:



与状态对应的,是 slot 所处的位置,slot 所处的位置有三处,分别是 idleRing、emptyRing 和运行空间,slot 从无锁队列中拿出,运行时所处的位置,我们称之为运行空间。各状态所处的位置情况如下所示:


  • STREAM_SLOT_EXIT:idleRing(idle 线程超时)、emptyRing(初始化或者 FATAL);

  • STREAM_SLOT_IDLE:idleRing

  • STREAM_SLOT_HOLD:运行空间(从无锁队列中取出)、idleRing(idle 线程超时或中断);

  • STREAM_SLOT_RUN:运行空间。


Slot 的位置变化和状态转移的关系如下,图中粗箭头表示状态机主循环部分:



根据各状态所处的位置情况,从 idleRing 中取出的 slot 可能有三种状态:EXIT、IDLE、HOLD。当取出 IDLE 状态的 slot,说明线程可复用;当取出 EXIT 状态的 slot,说明线程已退出,此时需要将 slot 转存到 emptyRing;当取出 HOLD 状态,说明线程正在被使用,此时需要放回 idleRing。


EmptyRing 中 slot 的状态只能是 EXIT,运行空间中 slot 的状态要么是 HOLD(刚取出还未运行),要么是 RUN(正在运行),不再赘述

3.3 单个 stream 线程执行流程


Stream 线程池中 stream 线程整体执行流程如下图所示:



stream 线程初始化仅初始化一次,执行完 query 之后,便将连接归还到连接池里,循环执行上图中黄色部分的语句,如果有异常则线程退出,连接销毁,slot 归还至 emptyRing;如果正常执行结束,将连接中内容清理,避免下个连接误用,并将 slot 归还至 idleRing 等待下个连接复用。


那么 stream 线程复用时如何保持参数的一致性呢,对应上图中的 set GUC params 阶段。父线程保存自己的 guc_variables 在 syncGucVariables 中,syncGucVariables 是需要传递给 stream 的结构用以保证父子线程 guc 参数的一致。然后父线程在初始化 streamProducer 时将 syncGucVariables 保存在该结构中传递。Stream 线程根据 streamProducer 初始化自己的 syncGucVariables 变量,首先 reset 所有的 guc 变量,然后根据 syncGucVariables 修正自己的 variables。

4.外部接口

4.1 GUC 参数


max_stream_pool:设置 stream 线程池能够容纳 stream 线程的最大个数。该参数 8.1.2 及以上版本支持。默认值为 65535。设置为-1 表示不开启 stream 线程池。该参数支持 reload 更新,更新规则:设置 max_stream_pool 小于当前可用线程个数,支持线程个数实时减少;当设置 max_stream_pool 大于当前 idle 线程个数,将由业务驱动线程个数的增加

4.2 视图


pg_thread_wait_status:展示了集群所有 CN/DN 进程内的所有线程的实时 等待状态,是定位集群通信问题最重要的视图



其中对于 wait_status 列状态说明如下:


  • wait stream task:空闲的 stream 线程;

  • wait node:等待其他 DN 的数据,需要关注对端状态;

  • flush data:发送数据给其他 DN 时因为对端 buffer 满而阻塞;

  • wait cmd:DN 上空闲的 postgres 线程,等待 CN 的下一个 query;

  • none:未定义状态,极有可能是阻塞原因;

  • synchronize quit:同步退出状态,自身任务已完成,在等待同一个 query 的其他线程一起退出;

5.通过表象看 stream 线程池逻辑


【场景一】集群基础行为场景——建立多数据库场景


Create database ***;(建立多库)


分别执行带 stream 算子的查询;


例:create table test_01(c1 int, c2 int)with(orientation=column) distribute by hash(c1);insert into test_01 select generate_series(1,100), generate_series(1,100);analyze test_01;select * from test_01 a, test_01 b, test_01 c, test_01 d, test_01 e, test_01 f where a.c2 =b.c2 and c.c2 = d.c2 and e.c2=f.c2 limit 100;


查询结束,查 pgxc_thread_wait_status 看 DN 节点:预期 stream 线程状态为 wait thread cond。且多 database 之间 stream 线程不复用。


【场景二】集群基础行为场景——建立多用户场景


Create user ***;(建立多用户)


分别执行带 stream 算子的查询;(参考场景一示例)


查询结束,查 pgxc_thread_wait_status 看 DN 节点:预期 stream 线程状态为 wait thread cond。且多 user 之间 stream 线程可以复用。


例:用户一执行完查询,视图中显示共有四个 stream 线程在线程池,用户二执行同样查询返回正确结果,视图中的 stream 线程个数不变,且线程号也是一致的,则说明复用。


【场景三】集群基础行为场景——线程清理场景


调整 guc 参数 max_stream_pool 的值,观测是否生效;预期:当设置 max_stream_pool 小于当前 idle 线程个数,支持线程个数实时减少;当设置 max_stream_pool 大于当前 idle 线程个数,将由业务驱动线程个数的增加,但是不会超过 max_stream_pool。


执行 clean connection(ALL force),查看 stream 线程是否被清理;预期:该 database 的 stream 线程被完全清理。


执行 drop database 命令,查看 stream 线程是否被清理;预期:该 database 的 stream 线程被完全清理。

6.总结


本文详细介绍了 stream 连接池及其原理,让我们更好的理解 GaussDB(DWS)集群通信中数据交互的具体逻辑,对于 GaussDB 通信运维也具备一定的参考意义。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 6
用户头像

提供全面深入的云计算技术干货 2020-07-14 加入

生于云,长于云,让开发者成为决定性力量

评论

发布
暂无评论
GaussDB通信运维:详解stream连接池设计原理_数据库_华为云开发者联盟_InfoQ写作社区