写点什么

Greenplum 内核源码分析 - 分布式事务 (四)

  • 2022 年 1 月 02 日
  • 本文字数:4556 字

    阅读完需:约 15 分钟

目录

  • 前言

  • insert 命令


前言

上一篇文章我们介绍了 begin 命令和初始化工作。begin 命令的主要工作是初始化,创建 Gang,把 master node 上面的状态机走一走。然后 begin 命令也会发送到 segment 上面,segment 上面的实例也开始初始化,准备接收新的命令。


后面文中的术语:

QD---> master node

QE--->segment node

QE 被分为 QE writer gang 和 QE reader gang 两种,这是在创建 Gang 的时候就确定了的。writer gang 和相关的 reader gang 属于一个 segMates 组,一个组里面有一个 writer 和 一个或多个 reader。自然,writer gang 负责的工作最全面,QE 上面数据库事务的功能基本都是 writer gang 完成的。


本篇文章我们介绍 insert 命令,也就是在 begin 以后,客户端执行"insert into t2 values (1,11); "。

请大家注意,这个时候还没有开始做两阶段提交。只是 master node 发信息,更新状态,segment node 接收信息,更新状态。

对应的 PostgreSQL 的函数逻辑再贴在这里


    /   StartTransactionCommand;3) /    ProcessQuery;                   << INSERT ...   \    CommitTransactionCommand;    \       CommandCounterIncrement;
复制代码


作者对 Greenplum 源码的分析会使用Greenplum 5.x 版本,读者可以去 github 上面自行获取。

insert 命令



结合这个图,我们简单描述一下执行的过程。

在 begin 以后,客户端执行"insert into t2 values (1,10);" psql 给 QD 发送信息,ReadCommand 返回 firstchar 为'Q'的 query。 QD 的 PortalRun 进入 ExecutorStart 函数,dtmPreCommand("ExecutorStart")没有向 QE writer 发送信息,只是在 QD 上修改了状态机的状态。


QD 在 ExecutorStart 函数里面调用 CdbDispatchPlan 把执行计划发送到各个 QE writer 里面,这些 QE writer 都是在 begin 命令的时候启动的,启动以后都在监听来自 QD 上的 libpq 客户端发出来的信息。 QD 发送完执行计划以后,就进入 ExecutorEnd 函数,等待 QE writer 发送结果回来。具体的等待函数是 CdbCheckDispatchResult。


在 QE writer 里,ReadCommand 得到 firstchar 为'M'的信息,按照这个信息,开始进行 ExecutorRun 函数进行操作。这个函数会调用 ExecutePlan 函数,这个就是开始执行执行计划的具体地方。


在 ExecInsert 被执行后,有两个分支,一个是写数据本身,函数是

heap_insert-->RelationPutHeapTuple-->PageAddItem;另外一个分支是 XLogInsert,也就是在写 WAL log。这两个函数被调用完后,就会按照正常的流程返回 ready for query message 给 QD,所有的 QE writer 都会做这样的操作,QD 也会收到所有的 QE writer 返回的 ready for query message。

当 QD 返回 ready for query message 给 psql 客户端以后,整个过程就结束了。


QD 上面的函数栈信息

Breakpoint 1, cdbdisp_dispatchX (pQueryParms=0x16e11e0, cancelOnError=1 '\001', sliceTbl=0x16dccb8, ds=0x16dcba8) at cdbdisp_query.c:11751175		SliceVec *sliceVector = NULL;(gdb) bt#0  cdbdisp_dispatchX (pQueryParms=0x16e11e0, cancelOnError=1 '\001', sliceTbl=0x16dccb8, ds=0x16dcba8) at cdbdisp_query.c:1175#1  0x0000000000ad344a in CdbDispatchPlan (queryDesc=0x16dc518, planRequiresTxn=1 '\001', cancelOnError=1 '\001', ds=0x16dcba8)    at cdbdisp_query.c:287#2  0x00000000006f1170 in ExecutorStart (queryDesc=0x16dc518, eflags=0) at execMain.c:686#3  0x00000000008f0d5f in ProcessQuery (portal=0x15782a8, stmt=0x16c7e50, params=0x0, dest=0x16c7a98, completionTag=0x7ffc82337180 "")    at pquery.c:291#4  0x00000000008f2ba9 in PortalRunMulti (portal=0x15782a8, isTopLevel=1 '\001', dest=0x16c7a98, altdest=0x16c7a98,    completionTag=0x7ffc82337180 "") at pquery.c:1467#5  0x00000000008f20dc in PortalRun (portal=0x15782a8, count=9223372036854775807, isTopLevel=1 '\001', dest=0x16c7a98, altdest=0x16c7a98,    completionTag=0x7ffc82337180 "") at pquery.c:1029#6  0x00000000008ea459 in exec_simple_query (query_string=0x160fc18 "insert into t2 values (10,11);", seqServerHost=0x0, seqServerPort=-1)    at postgres.c:1776#7  0x00000000008eef4f in PostgresMain (argc=1, argv=0x1572f40, dbname=0x1572d78 "postgres", username=0x1572d38 "gpadmin") at postgres.c:4975#8  0x00000000008822e5 in BackendRun (port=0x15834a0) at postmaster.c:6732#9  0x0000000000881971 in BackendStartup (port=0x15834a0) at postmaster.c:6406#10 0x000000000087a7e1 in ServerLoop () at postmaster.c:2444#11 0x00000000008790ea in PostmasterMain (argc=15, argv=0x154a0a0) at postmaster.c:1528#12 0x0000000000791ba9 in main (argc=15, argv=0x154a0a0) at main.c:206
复制代码


QE 上面的两个函数栈

(gdb) cContinuing.
Breakpoint 2, XLogInsert (rmid=10 '\n', info=0 '\000', rdata=0x7ffc3a2ac3b0) at xlog.c:778778 return XLogInsert_Internal(rmid, info, rdata, GetCurrentTransactionIdIfAny());(gdb) bt#0 XLogInsert (rmid=10 '\n', info=0 '\000', rdata=0x7ffc3a2ac3b0) at xlog.c:778#1 0x00000000004c2bcb in heap_insert (relation=0x7f815d3a9788, tup=0x2ce4f18, cid=1, use_wal=1 '\001', use_fsm=1 '\001', xid=1257) at heapam.c:2432#2 0x00000000006f5f68 in ExecInsert (slot=0x2ce4448, dest=0x2bf16d0, estate=0x2ce36f8, planGen=PLANGEN_PLANNER, isUpdate=0 '\000') at execMain.c:3372#3 0x00000000006f56e7 in ExecutePlan (estate=0x2ce36f8, planstate=0x2ce3dc0, operation=CMD_INSERT, numberTuples=0, direction=ForwardScanDirection, dest=0x2bf16d0) at execMain.c:3092#4 0x00000000006f17d1 in ExecutorRun (queryDesc=0x2bf5b88, direction=ForwardScanDirection, count=0) at execMain.c:912#5 0x00000000008f0d75 in ProcessQuery (portal=0x2bf2a38, stmt=0x2be2cf0, params=0x0, dest=0x2bf16d0, completionTag=0x7ffc3a2acb00 "") at pquery.c:296#6 0x00000000008f2ba9 in PortalRunMulti (portal=0x2bf2a38, isTopLevel=1 '\001', dest=0x2bf16d0, altdest=0x2bf16d0, completionTag=0x7ffc3a2acb00 "") at pquery.c:1467#7 0x00000000008f20dc in PortalRun (portal=0x2bf2a38, count=9223372036854775807, isTopLevel=1 '\001', dest=0x2bf16d0, altdest=0x2bf16d0, completionTag=0x7ffc3a2acb00 "") at pquery.c:1029#8 0x00000000008e9729 in exec_mpp_query (query_string=0x2be0f36 "insert into t2 values (10,11);", serializedQuerytree=0x0, serializedQuerytreelen=0, serializedPlantree=0x2be0f55 "\205\001", serializedPlantreelen=162, serializedParams=0x0, serializedParamslen=0, serializedQueryDispatchDesc=0x2be0ff7 "s", serializedQueryDispatchDesclen=72, seqServerHost=0x2be103f "127.0.0.1", seqServerPort=17722, localSlice=0) at postgres.c:1349#9 0x00000000008ef6da in PostgresMain (argc=1, argv=0x2be8248, dbname=0x2be81a8 "postgres", username=0x2be8168 "gpadmin") at postgres.c:5159#10 0x00000000008822e5 in BackendRun (port=0x2bf78f0) at postmaster.c:6732#11 0x0000000000881971 in BackendStartup (port=0x2bf78f0) at postmaster.c:6406#12 0x000000000087a7e1 in ServerLoop () at postmaster.c:2444#13 0x00000000008790ea in PostmasterMain (argc=12, argv=0x2bbe5c0) at postmaster.c:1528#14 0x0000000000791ba9 in main (argc=12, argv=0x2bbe5c0) at main.c:206
复制代码


(gdb) b CommandCounterIncrementBreakpoint 1 at 0x4fc262: file xact.c, line 894.(gdb) cContinuing.
Breakpoint 1, CommandCounterIncrement () at xact.c:894894 if (currentCommandIdUsed)(gdb) bt#0 CommandCounterIncrement () at xact.c:894#1 0x00000000004ffd9f in CommitTransactionCommand () at xact.c:3611#2 0x00000000008ecd5c in finish_xact_command () at postgres.c:3228#3 0x00000000008ea4a7 in exec_simple_query (query_string=0x160fc18 "insert into t2 values (10,11);", seqServerHost=0x0, seqServerPort=-1) at postgres.c:1807#4 0x00000000008eef4f in PostgresMain (argc=1, argv=0x1572f40, dbname=0x1572d78 "postgres", username=0x1572d38 "gpadmin") at postgres.c:4975#5 0x00000000008822e5 in BackendRun (port=0x15834a0) at postmaster.c:6732#6 0x0000000000881971 in BackendStartup (port=0x15834a0) at postmaster.c:6406#7 0x000000000087a7e1 in ServerLoop () at postmaster.c:2444#8 0x00000000008790ea in PostmasterMain (argc=15, argv=0x154a0a0) at postmaster.c:1528#9 0x0000000000791ba9 in main (argc=15, argv=0x154a0a0) at main.c:206(gdb)
复制代码


结合上面几个函数栈信息,我们把 PostgreSQL 源代码上面的经典例子和解释再次贴出来,然后和 Greenplum 的执行流程做对比。


For example, consider the following sequence of user commands:
1) BEGIN2) SELECT * FROM foo3) INSERT INTO foo VALUES (...)4) COMMIT
In the main processing loop, this results in the following function callsequence:
/ StartTransactionCommand; / StartTransaction;1) < ProcessUtility; << BEGIN \ BeginTransactionBlock; \ CommitTransactionCommand;
/ StartTransactionCommand;2) / ProcessQuery; << SELECT ... \ CommitTransactionCommand; \ CommandCounterIncrement;
/ StartTransactionCommand;3) / ProcessQuery; << INSERT ... \ CommitTransactionCommand; \ CommandCounterIncrement;
/ StartTransactionCommand; / ProcessUtility; << COMMIT4) < EndTransactionBlock; \ CommitTransactionCommand; \ CommitTransaction;
复制代码


我们发现,QD 上面做了所有的步骤和 PostgreSQL 的事务过程是一致的。有三个函数 ExecutorStart,ExecutorRun 和 ExecutorEnd。这三个函数,不管是在 QD 上还是在 QE 上,都会执行一遍。读者可以自行加断点进行调试和理解。


总结,本篇文章讲述了 begin 命令以后,insert 命令在 master node 和 segment node 上面的执行过程。

在下一篇文章里面,我们会介绍 Greenplum 里面的状态机。状态机对于分布式事务非常重要,如果我们在分布式事务或者两阶段提交的过程中遇到异常,那么就要根据状态机的状态进行纠错,不同的状态下的纠错路径是不同的。

发布于: 2 小时前
用户头像

还未添加个人签名 2021.12.30 加入

https://github.com/ginobiliwang

评论

发布
暂无评论
Greenplum 内核源码分析 - 分布式事务 (四)