写点什么

Greenplum 内核源码分析 - 分布式事务 (五)

  • 2022 年 1 月 05 日
  • 本文字数:9354 字

    阅读完需:约 31 分钟

目录

  • 前言

  • PostgreSQL 状态机

  • Greenplum 两阶段提交的状态机


前言

前面的文章我们介绍了 begin 命令和初始化工作,insert 操作。

begin 命令的主要工作是初始化,创建 gang,把 master node 上面的状态机走一走。

然后 begin 命令也会发送到 segment 上面,segment 上面的实例也开始初始化,准备接收新的命令。insert 操作就是把 insert 命令发送到 QE writer 上面,然后被执行。这一篇我们还不会讲最后的 commit 操作,而是先介绍两阶段提交的状态机。状态机对于分布式事务非常重要,如果我们在分布式事务或者两阶段提交的过程中遇到异常,那么就要根据状态机的状态进行纠错,不同的状态下纠错的路径是不同。


后面文中的术语:

QD---> master node

QE--->segment node


作者对 Greenplum 源码的分析会使用Greenplum 5.x 版本,读者可以去 github 上面自行获取。


PostgreSQL 状态机

Postgresql 的事务块分为上层事务块和底层事务块。

底层事务需要做的是执行每条命令,负责处理资源和锁的获取和释放,信号的处理,日志记录等相关操作。

状态机就是下面这个结构,比较简单。

/* *      transaction states - transaction state from server perspective */typedef enum TransState{        TRANS_DEFAULT,                          /* idle */        TRANS_START,                            /* transaction starting */        TRANS_INPROGRESS,                       /* inside a valid transaction */        TRANS_COMMIT,                           /* commit in progress */        TRANS_ABORT,                            /* abort in progress */        TRANS_PREPARE                           /* prepare in progress */} TransState;
复制代码

相关的函数主要有六个:

StartTransaction:由 BEGIN 的 startTransactionCommand 调用,调用结束后事务块状态为 TBLOCK_STARTED

CommitTransaction:由 COMMIT 的 commitTransactionCommand 调用,提交事务

PrepareTransaction:在两阶段提交过程中,做 prepare 操作

FinishPreparedTransaction:在两阶段提交过程中,做 commit prepare 操作

AbortTransaction 和 CleanupTransaction:释放资源,恢复默认状态


上层事务块有如下的状态机状态。

/* *      transaction block states - transaction state of client queries * * Note: the subtransaction states are used only for non-topmost * transactions; the others appear only in the topmost transaction. */typedef enum TBlockState{        /* not-in-transaction-block states */        TBLOCK_DEFAULT,                         /* idle */        TBLOCK_STARTED,                         /* running single-query transaction */
/* transaction block states */ TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */ TBLOCK_PREPARE, /* live xact, PREPARE received */
/* subtransaction states */ TBLOCK_SUBBEGIN, /* starting a subtransaction */ TBLOCK_SUBINPROGRESS, /* live subtransaction */ TBLOCK_SUBEND, /* RELEASE received */ TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK */ TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */ TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */ TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */ TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */} TBlockState;
复制代码


下面是互联网上的素材,可知的常见的事务块状态转换图



StartTransactionCommand:事务块中每条语句执行前都会调用

CommitTransactionCommand:事务块中每条语句执行结束都会调用

AbortCurrentTransaction:事务块中语句执行错误

BeginTransactionBlock:遇见 BEGIN 命令调用,状态变为 TBLOCK_BEGIN

EndTransactionBlock:遇见 COMMIT 调用,可能成功提交,也可能回滚

UserAbortTransactionBlock:遇见 ROLLBACK 调用


Greenplum 状态机

我们先给出一个重要的数据结构

typedef struct TMGXACT{        /*         * These fields will be recorded in the log.  They are the same         * as those in the TMGXACT_LOG struct.  We will be copying the         * fields individually, so they dont have to match the same order,         * but it a good idea.         */        char                                            gid[TMGIDSIZE];        DistributedTransactionId        gxid;
/* * Memory only fields. */ DtxState state;
int sessionId;
bool explicitBeginRemembered;
/* * This is similar to xmin of PROC, stores lowest dxid on first snapshot * by process with this as currentGXact. */ DistributedTransactionId xminDistributedSnapshot;
bool badPrepareGangs;
int debugIndex;
bool directTransaction; uint16 directTransactionContentId;} TMGXACT;
复制代码

这个结构是 Greenplum 里面非常重要的数据结构,和本篇相关的信息 DtxState state; 也存在这个结构里面。


DtxState 的相关类型如下


/** * DTX states, used to track the state of the distributed transaction *   from the QD's point of view. */typedef enum{        /**         * Uninitialized transaction         */        DTX_STATE_NONE = 0,
/** * The distributed transaction is active but distributed coordination * is not required (because it is auto-commit on the QEs). */ DTX_STATE_ACTIVE_NOT_DISTRIBUTED,
/** * The distributed transaction is active and requires distributed coordination * (because it is explicit or an implicit writer transaction) */ DTX_STATE_ACTIVE_DISTRIBUTED,
/** * For two-phase commit, the first phase is about to run */ DTX_STATE_PREPARING,
/** * For two-phase commit, the first phase has completed */ DTX_STATE_PREPARED, DTX_STATE_INSERTING_COMMITTED, DTX_STATE_INSERTED_COMMITTED, DTX_STATE_FORCED_COMMITTED, DTX_STATE_NOTIFYING_COMMIT_PREPARED, DTX_STATE_INSERTING_FORGET_COMMITTED, DTX_STATE_INSERTED_FORGET_COMMITTED,
/** * Transaction rollback has been requested and QD is notifying all QD processes. * * _NO_PREPARED means that no QEs have started work on the first phase of two-phase commit. */ DTX_STATE_NOTIFYING_ABORT_NO_PREPARED,
/** * Transaction rollback has been requested and QD is notifying all QD processes. * * _SOME_PREPARED means that at least one QE has done the first phase of two-phase commit. */ DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED, /** * Transaction rollback has been requested and QD is notifying all QD processes. * * _PREPARED means that the QE processes have done the first phase of two-phase commit. */ DTX_STATE_NOTIFYING_ABORT_PREPARED, DTX_STATE_RETRY_COMMIT_PREPARED, DTX_STATE_RETRY_ABORT_PREPARED, DTX_STATE_CRASH_COMMITTED} DtxState;
复制代码


然后是 Greenplum 里两阶段提交的状态机迁移图


上面的代码片段里的状态机在示意图里面都有展示。只有 DTX_STATE_CRASH_COMMITTED 没有,这个状态会在数据库恢复 redo 的过程里面出现。

我们简要把状态机的变迁过一遍,这个状态的变化也是两阶段提交流程的体现,读者可以慢慢体会。


CreateDtx 的时候,调用 initGxact,QD 的状态被设置成 DTX_STATE_NONE,

然后设置成 DTX_STATE_ACTIVE_NOT_DISTRIBUTED。

进入 dtmPreCommand 函数会进行一些 prepare 操作的准备工作,

状态被设置成 DTX_STATE_ACTIVE_DISTRIBUTED。


程序现在开始进行 prepare 操作,如果现在就发生错误,就会进入 rollbackDtxTransaction 函数,

状态被设置成 DTX_STATE_NOTIFYING_ABORT_NO_PREPARED。

在这个状态的时候,也会给 QE 发送 DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED 信息。 当然,收到这样信息的 QE 就开始 abort 事务了。


再跟着主线走,还是在 doPrepareTransaction 函数内,状态会变成 DTX_STATE_PREPARING,

然后变成 DTX_STATE_PREPARED。 如果逻辑最终到达了这个状态,第一阶段的 prepare 就都完成了


这个状态的下如果发生异常,会引发出另外两个状态,DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED 和

DTX_STATE_RETRY_ABORT_PREPARED。

后一种状态主要的成因应该是 QD 到 QE 的 Gang 通信失败,前一种状态就是其他的情况,比如有可能有的 QE prepare 成功了,有的 QE prepare 失败了。相应的信息也会发给 QE,

比如 DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED。


同时,在 QE 端收到比如 DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED 命令的时候,会根据自己当前的不同状态进行处理。所以,在 QD 端不同的状态反映到 QE 端,也有对应的分支。


再回到主线,接着 DTX_STATE_PREPARED 状态往后,QD 端就开始插入 Xlog 了,在插入 Xlog 之前,如果出现异常就会进入 DTX_STATE_NOTIFYING_ABORT_PREPARED。 这个状态里面会向所有的 QE 发送 abort 信息,将之前 prepare 好的事务 abort 掉。


继续往后,就进入第二阶段提交了。程序会调用 doNotifyingCommitPrepared 向所有 QE 发 commit prepare 信息,如果失败会进入 retry 状态 DTX_STATE_RETRY_COMMIT_PREPARED。这个状态里会调用 doDtxPhase2Retry 函数,按照 GUC 设置的次数不停的 retry,直到所有的 QE 都 commit prepare 成功。这时候如果还不成功,就会被硬核的直接 PANIC 掉!


我们来试想 PANIC 的原因,因为有可能有的 QE 成功了,有的 QE 失败了,这样的情况是两阶段提交所不能解决的,所以只有反复 retry,别无他法。


(题外话,直接 PANIC 的根本原因是两阶段提交这个算法策略导致的。其他的使用两阶段提交的分布式数据库,比如 Google Spanner 也会有这样的问题。读者可以自己阅读下 Spanner 的相关论文。工程师大都会根据具体的场景,对两阶段提交的算法进行优化。)


再回到主线,如果 doNotifyingCommitPrepared 成功了,那么 QD 就进入最后的状态,把自己本地的 Xlog 插入好,doInsertForgetCommitted,最后结束。

两阶段提交的状态机转移过程,到这里就介绍清楚了。


前面描述的是 QD 端的状态变化,QE 端因为都是在接受来自 QD 的信息,所以变化比较简单。

我们把 QE 接受信息的函数 performDtxProtocolCommand 贴出来,删掉了不重要的代码和子事务的一些代码,力求体现主要关系。


/** * On the QE, handle a DtxProtocolCommand */voidperformDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,                                                  int flags  __attribute__((unused)) ,                                                  const char *loggingStr __attribute__((unused)) , const char *gid,                                                  DistributedTransactionId gxid __attribute__((unused)),                                                  DtxContextInfo *contextInfo) {        switch (dtxProtocolCommand)        {                case DTX_PROTOCOL_COMMAND_STAY_AT_OR_BECOME_IMPLIED_WRITER:                        switch(DistributedTransactionContext)                        {                        case DTX_CONTEXT_LOCAL_ONLY:                                /** convert to implicit_writer! */                                setupQEDtxContext(contextInfo);                                StartTransactionCommand();                                break;                        case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:                                /** already the state we like */                                break;                        default:                                if ( isQEContext() || isQDContext())                                {                                        elog(FATAL, "Unexpected segment distributed transaction context: '%s'",                                                 DtxContextToString(DistributedTransactionContext));                                }                                else                                {                                        elog(PANIC, "Unexpected segment distributed transaction context value: %d",                                                 (int) DistributedTransactionContext);                                }                                break;                        }                        break;

case DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED: AbortOutOfAnyTransaction(); break;
case DTX_PROTOCOL_COMMAND_PREPARE: /* * The QD has directed us to read-only commit or prepare an implicit or explicit * distributed transaction. */
switch (DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /* * Spontaneously aborted while we were back at the QD? */ elog(ERROR, "Distributed transaction %s not found", gid); break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: performDtxProtocolPrepare(gid); break;
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: case DTX_CONTEXT_QD_RETRY_PHASE_2: case DTX_CONTEXT_QE_PREPARED: case DTX_CONTEXT_QE_FINISH_PREPARED: case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_READER: elog(FATAL, "Unexpected segment distribute transaction context: '%s'", DtxContextToString(DistributedTransactionContext)); break;
default: elog(PANIC, "Unexpected segment distribute transaction context value: %d", (int) DistributedTransactionContext); break; } break; case DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED: switch (DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /* * Spontaneously aborted while we were back at the QD? */ elog(ERROR, "Distributed transaction %s not found", gid); break;
case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: AbortOutOfAnyTransaction(); break;
case DTX_CONTEXT_QE_PREPARED: setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true); break;
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: case DTX_CONTEXT_QD_RETRY_PHASE_2: case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_READER: elog(PANIC, "Unexpected segment distribute transaction context: '%s'", DtxContextToString(DistributedTransactionContext)); break;
default: elog(PANIC, "Unexpected segment distribute transaction context value: %d", (int) DistributedTransactionContext); break; } break; case DTX_PROTOCOL_COMMAND_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_QE_PREPARED ); setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ true); break;
case DTX_PROTOCOL_COMMAND_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_QE_PREPARED ); setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true); break;
case DTX_PROTOCOL_COMMAND_RETRY_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false); break;
case DTX_PROTOCOL_COMMAND_RETRY_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false); break;
case DTX_PROTOCOL_COMMAND_RECOVERY_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false); break;
case DTX_PROTOCOL_COMMAND_RECOVERY_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false); break;
default: elog(ERROR, "Unrecognized dtx protocol command: %d", (int) dtxProtocolCommand); break; }}
复制代码


总结,这一篇我们详细介绍了 Greenplum 分布式事务两阶段提交的状态机转移的过程。状态机对于分布式事务非常重要,读者可以自己调试,多多体会。后面我们会继续介绍两阶段提交在代码层面的逻辑。


相关阅读:

Greenplum内核源码分析 - 分布式事务(一):一致性算法,Steal/Force和WAL协议

Greenplum内核源码分析 - 分布式事务(二):PostgreSQL和Greenplum的通信协议,PostgreSQL 的事务处理简介

Greenplum内核源码分析 - 分布式事务(三):初始化和 begin命令

Greenplum内核源码分析 - 分布式事务(四):insert命令

Greenplum内核源码分析 - 分布式事务(五): 状态机


发布于: 2 小时前
用户头像

还未添加个人签名 2021.12.30 加入

https://github.com/ginobiliwang

评论

发布
暂无评论
Greenplum 内核源码分析 - 分布式事务 (五)