写点什么

Greenplum 内核源码分析 - 分布式事务 (三)

  • 2021 年 12 月 31 日
  • 本文字数:3787 字

    阅读完需:约 12 分钟

目录

  • 前言

  • 初始化和 begin 命令

前言

这一篇我们会正式开始介绍 Greenplum 分布式事务的源码。涉及到一些基本的概念,比如 Gang,Slice,读者可以参考相关文档。

作者对 Greenplum 源码的分析会使用Greenplum 5.x 版本,读者可以去 github 上面自行获取。

初始化和 begin 命令

这一篇讲的内容对应 sql 命令就是 begin 关键字。


$ psql -d gpadminpsql (8.3.23)Type "help" for help.
gpadmin=# select version();
version
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ PostgreSQL 8.3.23 (Greenplum Database 5.0.0 build dev) on x86_64-pc-linux-gnu,compiled by GCC gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit compiled onSep 23 2021 20:14:48 (with assert checking)(1 row)
gpadmin=# begin;BEGINgpadmin=# commit;COMMIT
复制代码


上一篇介绍了 PostgreSQL 本地事务的一个简单例子,我们先贴在这里。

     /  StartTransactionCommand;    /       StartTransaction;1) <    ProcessUtility;                 << BEGIN    \       BeginTransactionBlock;     \  CommitTransactionCommand;
复制代码


每一个 SQL 命令包括这里的 Begin,都被 StartTransactionCommand 和 CommitTransactionCommand 包裹起来,里面还有 StartTransaction 和 BeginTransactionBlock 函数调用。在 Greenplum 里面,这几个函数的执行过程中,会被加上其他的和分布式相关的代码。

关于几个参与者之间的通信关系,因为只有 begin 命令,所以也都只用了 libpq 来做通信。如果后面有复杂的 Query,会使用 Greenplum 自研的 Interconnect 机制做数据交互。



结合这个图我们简单描述一遍通信的过程。

首先,用"psql -d gpadmin" 连接 master node,这个命令会先连接 postgres 主进程,然后会 fork 出一个子进程出来,这个也是 PostgreSQL 单机版的常规动作。然后从客户端执行 "begin;" 命令。这个命令是通过 libpq 的协议发送到 master node 的。

case 'Q':			/* simple query */
复制代码

在 master node 上,被新 fork 出来的进程里面,会一直在"ReadCommand" 这个函数这里等待命令。

/* * (3) read a command (loop blocks here) */firstchar = ReadCommand(&input_message);
复制代码


/* ---------------- *              ReadCommand reads a command from either the frontend or *              standard input, places it in inBuf, and returns the *              message type code (first byte of the message). *              EOF is returned if end of file. * ---------------- */static intReadCommand(StringInfo inBuf){        int                     result;
SIMPLE_FAULT_INJECTOR(BeforeReadCommand);
if (whereToSendOutput == DestRemote) result = SocketBackend(inBuf); else result = InteractiveBackend(inBuf); return result;}
复制代码

ReadCommand 是从 SocketBackend 读入数据的。读到以后,发现"firstchar == 'Q'",所以这是一个 libpq 的 simple query,经过 query statement 的 parsing 工作,发现是一个单独的 begin 命令,就在本地执行了 BeginTransactionBlock。

这之后,开始调用 sendDtxExplicitBegin,开始做分布式的工作。首先会到这个函数

dtmPreCommand("sendDtxExplicitBegin", "(none)", NULL,                        /* is two-phase */ true, /* withSnapshot */ true, /* inCursor */ false );
复制代码

这个函数是用来 mark 目前的分布式事务是否要使用两阶段提交协议的,也就是修改 currentGxact->state 的状态。

begin 命令是不需要的,所以简单的返回了。 但是,后面的 commit 是需要的。这个函数在以前的 Greenplum 版本里面以前叫做 setCurrentDtxTwoPhase,后来加了一些其他功能在里面,然后改名了。


接着后面,到了 dispatchDtxCommand 函数,再到 cdbdisp_dispatchCommandInternal,然后又调用了

dtmPreCommand("cdbdisp_dispatchCommandOrSerializedQuerytree", strCommand, NULL, needTwoPhase, withSnapshot, false /* inCursor */);
复制代码


最后到 AllocateWriterGang,这里面检测到 Gang 是没有的,然后开始创建 gang

writerGang = createGang(GANGTYPE_PRIMARY_WRITER, PRIMARY_WRITER_GANG_ID, nsegdb, -1);
复制代码

Gang 是 Greenplum 里面工作在不同 segment 上面,但是为了同一个 Slice 而生成的一组内存资源。看下面这段 code 就能大概了解 Gang 的物理存在形式,这里的 Gang 是 master node 上面的 Gang。

if (writerGang == NULL)        {                int nsegdb = getgpsegmentCount();
insist_log(IsTransactionOrTransactionBlock(), "cannot allocate segworker group outside of transaction");
if (GangContext == NULL) { GangContext = AllocSetContextCreate(TopMemoryContext, "Gang Context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); } Assert(GangContext != NULL); oldContext = MemoryContextSwitchTo(GangContext);
writerGang = createGang(GANGTYPE_PRIMARY_WRITER, PRIMARY_WRITER_GANG_ID, nsegdb, -1); writerGang->allocated = true;
/* * set "whoami" for utility statement. * non-utility statement will overwrite it in function getCdbProcessList. */ for(i = 0; i < writerGang->size; i++) setQEIdentifier(&writerGang->db_descriptors[i], -1, writerGang->perGangContext);
MemoryContextSwitchTo(oldContext); }
复制代码


我们接着从 createGang 开始讲。有个 GUC 叫做 gp_connections_per_thread,这个 GUC 能决定是使用多线程的方式去建立 master 到 segment 的数据库连接,还是用异步的方式来建立连接。default 的数值是 0,如果是 0 就是用异步方式,大概就是用 connect 做连接,然后 poll 做 socket 的异步监控,直到最后把所有连接都建立好,把 fd 存好。如果这个 GUC 的数值不是 0,那么就会用多线程的方式来连接 segment,起多少个线程需要根据 GUC 的值和 segment 的数量来计算。这是同一的目标的两种不同实现,code 也比较清楚,可以自己翻看源码。

因为我们用的是 default 的值,所以用的是异步的方式,通过 createGang_async 最后调用了 PQconnectStartParams,这个函数就相当于 psql 客户端执行(psql -d gpadmin)会去连接每个 segment 数据库的 postgres 进程。

这些进程也会 fork 子进程出来,然后开始准备环境,执行后续 sql 命令。代码到了这里,master 连接 segments 的工作就完成了。


后面的函数就是在发送具体的命令,就是 Begin 命令。cdbdisp_dispatchToGang 发送,因为是异步,所以 cdbdisp_waitDispatchFinish 等待发送完成,然后 cdbdisp_getDispatchResults 等待 segments 回复结果。


cdbdisp_dispatchToGang 发送用到了一个新的消息协议,就是 firstchar = 'M'。回顾前面的协议,我们讲过 Greenplum 在 PostgreSQL 的通信协议基础上增加了两个新的协议。


case 'M':     /* MPP dispatched stmt from QD */case 'T':     /* MPP dispatched dtx protocol command from QD */
复制代码


master node 上面发送了消息以后,就开始等待所有的 segments 的回复。现在我们再转到 segments 上面继续看逻辑顺序。 segment 在启动以后,还是和一般的 PostgreSQL 应用一样,用 ReadCommand 等待输入。

这里等到的是 firstchar = 'M'。针对这个分支,segment 会进入一个函数叫做 setupQEDtxContext,在这个函数里面完成了单机版 begin 命令的所有操作。然后用 ReadyForQuery 函数给 master node 回复 ready for query 信息,这个信息也是 libpq 协议定义过的。


case 'Z':		/* backend is ready for new query */
复制代码


仔细看 code 实现,能看出来发回的是一个 firstchar = 'Z'的消息。

我们再切换回 master node,通过函数 cdbdisp_returnResults 把结果返回,然后清理 Gang 里面的 context,这里因为是用的异步的方式,所以函数是 cdbdisp_destroyDispatcherState,如果是多线程的方式,函数应该是 cdbdisp_destroyDispatchThreads。 接着 master node 用 CommitTransactionCommand 结束这个 begin 命令,然后用和 segment 相同的方式返回 ready for query 信息给 psql 客户端,整个过程结束。


总结,回顾一下这个简单的 begin 命令的执行过程,psql 永远是发起命令的客户端,master node 对于 psql 是 server 端,但是 Greenplum 集群里面,它还扮演了 libpq 客户端的角色去访问每个 segment。如果只传送关键字命令,比如 begin/commit/rollback 之类的,segment 上面的逻辑略微简单,后面会分析比较复杂的 SQL 语句,整个过程会变得更加复杂,但是命令逻辑变化还是不大的。

发布于: 2021 年 12 月 31 日
用户头像

还未添加个人签名 2021.12.30 加入

https://github.com/ginobiliwang

评论

发布
暂无评论
Greenplum内核源码分析-分布式事务(三)