写点什么

PolarDB-X 源码解读:DDL 的一生(上)

  • 2022 年 5 月 17 日
  • 本文字数:9660 字

    阅读完需:约 32 分钟

概述

一条 SQL 语句进入 PolarDB-X 的 CN 后,将经历协议层、优化器、执行器的完整处理流程。首先经过解析、鉴权、校验,被解析为关系代数树后,在优化器中经历 RBO 和 CBO 生成执行计划,最终在 DN 上执行完成。与 DML 不同的是,逻辑 DDL 语句还涉及对元数据的读写和物理 DDL,直接影响系统状态一致性。

PolarDB-X 的 DDL 实现的关键目标是 DDL 的“online”和“crash safe”,即 DDL 与 DML 的并发和 DDL 本身的原子性和持久性。对于复杂逻辑 DDL(如加减全局二级索引、迁移分区数据),PolarDB-X 沿袭 online schema change 的思路引入了 CN 中的双版本元数据,DDL 仅在排空低版本事务时占用 MDL 锁,大大降低了阻塞业务 SQL 的频率;在执行器模块中,DDL 引擎统一了逻辑 DDL 的定义方法和调度执行过程,开发者只须将物理操作封装为具有时序依赖关系和 crash recover 方法的 Task 传递给 DDL 引擎,后者保证相应的执行和回滚逻辑。

本文主要解读 PolarDB-X DDL 在计算节点(CN)中的实现,为了简便起见,本文中我们将略过 DDL 在协议层、优化器的预处理以及执行器中对 Handler 的分派过程,此部分内容可参考下列源码解读文章:

我们将重点关注 DDL 在执行器中的执行流程,在阅读本文前,可预先阅读与 PolarDB-X online schema change、元数据锁及 DDL 引擎原理相关的文章:

整体流程


  1. 一条逻辑 DDL 语句在解析后进入优化器,仅做简单的类型转化后生成 logicalPlan 及 executionContext,由执行器根据 logicalPlan 的具体类型分派给对应的 DDLHandler。DDLHandler 的公共基类是 LogicalCommonDdlHandler,其公共执行入口见 com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handle

public Cursor handle(RelNode logicalPlan, ExecutionContext executionContext) {        BaseDdlOperation logicalDdlPlan = (BaseDdlOperation) logicalPlan;
initDdlContext(logicalDdlPlan, executionContext);
// Validate the plan first and then return immediately if needed. boolean returnImmediately = validatePlan(logicalDdlPlan, executionContext);
..... setPartitionDbIndexAndPhyTable(logicalDdlPlan); // Build a specific DDL job by subclass that override buildDdlJob DdlJob ddlJob = returnImmediately? new TransientDdlJob(): // @override buildDdlJob(logicalDdlPlan, executionContext);
// Validate the DDL job before request. // @override validateJob(logicalDdlPlan, ddlJob, executionContext);
// Handle the client DDL request on the worker side. handleDdlRequest(ddlJob, executionContext); ..... return buildResultCursor(logicalDdlPlan, executionContext); }
复制代码
  1. 在 handle 方法中,首先从 executionContext 中剥离出 traceId、事务 ID、ddl 类型、全局配置等通用上下文信息,然后通过 validateJob 方法校验 DDL 正确性,buildDdlJob 方法构造 DDL 任务。具体的 DDLHandler 例如 AlterTableHandler 将会重载这两个方法,由开发者根据 DDL 的实际语义定义和校验 DDL job。

  2. 至此,DDL 在 handler 中完成从 logicalPlan 到 DDL job 的转化。com.alibaba.polardbx.executor.handler.ddl.LogicalCommonDdlHandler#handleDdlRequest 方法将 DDL job 对应的 DDL Request 转发给 Leader CN 节点上的 DDL 引擎调度执行。

  3. DDL 引擎作为守护进程仅在 Leader CN 节点运行,轮询 DDLJobQueue,队列非空即触发 DDL Job 的调度执行过程。DDL 引擎中维持着两级队列,即实例级和库级的 DDLJob 队列,从而保证逻辑库间 DDL 并发执行,互不干扰。


值得注意的是,在 DDL Requester 将 DDL Request 推送至 DDL 引擎的过程中,实际上发生了一次 Worker Node 到 Leader Node 的节点间通信,该过程依赖于 `com.alibaba.polardbx.gms.sync.GmsSyncManagerHelper#sync` 方法。发起通信的节点在 sync 中封装 syncAction 作为信息,接收方监听到 sync 发生后根据 syncAction 执行相应的动作。这种节点间通信机制不仅存在于 DDL 引擎加载 DDL job 过程中,也存在于 DDL job 执行时节点间的元数据同步过程中(见后文),这两种情境下封装的 syncAction 均会触发接收方从 MetaDB 中拉取实际的通信内容(DDL job 或者元信息),这种通信机制以数据库服务作为通信中介保证了通信内容传递的高可用和一致性。


DDL job 是 DDL 执行引擎的核心概念,它实际上是 DDL 引擎层面模拟的“DDL 事务”,尝试对元信息修改、节点间通信、物理 DDL、DML 等一系列异质操作组合而成的逻辑 DDL 实现原子性和持久性,从而保证系统状态的稳定性。借鉴于通用的事务实现,undolog、锁、WAL、版本快照等事务要素在 DDL job 和 DDL 引擎中也有着平行的实现。


以 DDL 引擎入口(即前述的 handleDdlRequest 方法)作为 DDL 数据流的分界点,可将 DDL 的生命周期划分为定义和执行两个部分。接下来,我们将以添加全局二级索引为例,说明在定义 DDL Job 时构建 DDL 原子性以及 DDL 与 DML 协同流程的关键逻辑。DDL 在 DDL 引擎中的调度执行及错误处理机制,将在下篇中讲解。

## 本文涉及的SQL语句####
create database db1 mode = "auto";
use db1;
create table t1(x int, y int);
## 本文详解的DDL语句
alter table t1 add global index `g_i_y` (`y`) COVERING (`x`) partition by hash(`y`);
复制代码

元数据管理

如前所述,除去在 DN 上执行的物理 DDL 和 DML 外,DDL Job 中包含的关键操作均是对元数据的修改和同步。

PolarDB-X 中的元数据由 GMS 统一管理,按物理位置可以分为两个部分:

  1. MetaDB 中的持久化元信息。例如描述分区表 t1 的元数据表包括 table_partitions, tables, columns, indexes 等,MetaDB 中元信息的读写接口分布在 polardbx-gms/src/main/java/com/alibaba/polardbx/gms 路径下的辅助类中。

  2. CN 缓存的内存元信息。出于性能考虑,CN 节点同时在内存中缓存有分区表 t1 的元信息,该信息是直接被 CN 上的 DML 事务使用的元信息。com.alibaba.polardbx.executor.gms 中实现了不同种类元信息的缓存管理,其接口定义为 com.alibaba.polardbx.optimizer.config.table.SchemaManager, 包含 init, getTable, reload 等基本方法。

private ResultCursor executeQuery(ByteString sql, ExecutionContext executionContext,                                      AtomicBoolean trxPolicyModified) {
// Get all meta version before optimization final long[] metaVersions = MdlContext.snapshotMetaVersions();
// Planner ExecutionPlan plan = Planner.getInstance().plan(sql, executionContext);
... //for requireMDL transaction if (requireMdl && enableMdl) { if (!isClosed()) { // Acquire meta data lock for each statement modifies table data acquireTransactionalMdl(sql.toString(), plan, executionContext); } ... //update Plan if metaVersion changed, which indicate meta updated } ...
ResultCursor resultCursor = executor.execute(plan, executionContext); ... return resultCursor; }
复制代码

值得注意的是,内存元信息以逻辑表为基本单位构建锁,此锁即为内存 MDL 锁,DML 事务开始时即通过 acquireTransactionalMdl 尝试获取当前最新版本的相关逻辑表的 MDL 锁,执行完成后释放。


DDL Job 执行时,将按一定的时序修改上述两部分元信息,此过程中 DDL 和 DML 之间的同步是实现 DDL 的 online 特性的关键步骤。

DDL Job 定义

添加全局二级索引的 DDL Job Handler 为 AlterTableHandler ,它重载了 LogicalCommonDdlHandler 的 buildDdlJob 方法 com.alibaba.polardbx.executor.handler.ddl.LogicalAlterTableHandler#buildDdlJob ,该方法最终分派到 com.alibaba.polardbx.executor.ddl.job.factory.gsi.CreatePartitionGsiJobFactory 构造全局二级索引对应的 ddl Job。

public class CreatePartitionGsiJobFactory extends CreateGsiJobFactory{    @Override    protected void excludeResources(Set<String> resources) {        super.excludeResources(resources);        //meta data lock in MetaDB        resources.add(concatWithDot(schemaName, primaryTableName)); //db1.t1        resources.add(concatWithDot(schemaName, indexTableName)); //db1.g_i_y        ...    }    
@Override protected ExecutableDdlJob doCreate() { ... if (needOnlineSchemaChange) { bringUpGsi = GsiTaskFactory.addGlobalIndexTasks( schemaName, primaryTableName, indexTableName, stayAtDeleteOnly, stayAtWriteOnly, stayAtBackFill ); } ... List<DdlTask> taskList = new ArrayList<>(); //1. validate taskList.add(validateTask); //2. create gsi table //2.1 insert tablePartition meta for gsi table taskList.add(createTableAddTablesPartitionInfoMetaTask); //2.2 create gsi physical table CreateGsiPhyDdlTask createGsiPhyDdlTask = new CreateGsiPhyDdlTask(schemaName, primaryTableName, indexTableName, physicalPlanData); taskList.add(createGsiPhyDdlTask); //2.3 insert tables meta for gsi table taskList.add(addTablesMetaTask); taskList.add(showTableMetaTask); //3. //3.1 insert indexes meta for primary table taskList.add(addIndexMetaTask); //3.2 gsi status: CREATING -> DELETE_ONLY -> WRITE_ONLY -> WRITE_REORG -> PUBLIC taskList.addAll(bringUpGsi); //last tableSyncTask DdlTask tableSyncTask = new TableSyncTask(schemaName, indexTableName); taskList.add(tableSyncTask);
final ExecutableDdlJob4CreatePartitionGsi result = new ExecutableDdlJob4CreatePartitionGsi(); result.addSequentialTasks(taskList); ....
return result; } ...}
复制代码

excludeResources 方法声明了 ddlJob 对相关对象的持久化元数据锁的占用,本例涉及主表与 GSI 两张表的元数据修改,因而加锁对象包括 db1.t1, db1.g_i_y 。注意,该锁与前述的内存 MDL 锁不同,前者在 DDL 引擎执行 DDL job 的初始阶段持久化到 MetaDB 的 read_write_lock 表中,用于控制 DDL 之间的并发。 

doCreate 方法声明了按时序执行的一系列 Task,其语义依次是:


  • 校验

  • validateTask

: DDL Job 校验,检查 GSI 表名的合法性等。


  • 创建 GSI 表

  • createTableAddTablesPartitionInfoMetaTask

GSI 的分区元信息写入

  • createGsiPhyDdlTask

创建 GSI 对应物理表的物理 DDL

  • addTablesMetaTask

主表的元信息修改

  • showTableMetaTask

主表的元信息在节点之间广播

  • GSI 表的元信息同步

  • addIndexMetaTask

GSI 表的 tableMeta 元信息修改

  • bringUpGsi

添加索引后的 online schema change 过程

  • tableSyncTask

GSI 表的元信息在节点之间广播


在上述的 Task 中,持久化元信息的写入操作均作用于 MetaDB 中相应的元信息表,元信息的广播操作作用于 CN 缓存的元信息。

接下来我们简要介绍其中的三类代表性 Task,其中 CreateTableAddTablesMetaTask 包含的方法列表如下:

public class CreateTableAddTablesMetaTask extends BaseGmsTask {    @Override    public void executeImpl(Connection metaDbConnection, ExecutionContext executionContext) {        PhyInfoSchemaContext phyInfoSchemaContext = TableMetaChanger.buildPhyInfoSchemaContext(schemaName,            logicalTableName, dbIndex, phyTableName, sequenceBean, tablesExtRecord, partitioned, ifNotExists, sqlKind,            executionContext);        FailPoint.injectRandomExceptionFromHint(executionContext);        FailPoint.injectRandomSuspendFromHint(executionContext);        TableMetaChanger.addTableMeta(metaDbConnection, phyInfoSchemaContext);    }
@Override public void rollbackImpl(Connection metaDbConnection, ExecutionContext executionContext) { TableMetaChanger.removeTableMeta(metaDbConnection, schemaName, logicalTableName, false, executionContext); }
@Override protected void onRollbackSuccess(ExecutionContext executionContext) { TableMetaChanger.afterRemovingTableMeta(schemaName, logicalTableName); }}
复制代码
  1. addTableMetaTask:在主表的 tableMeta 中添加 GSI 表相关的元信息。executeImpl 调用 MetaDB 提供的元数据读写接口 com.alibaba.polardbx.executor.ddl.job.meta.TableMetaChanger#addTableMeta 写入 GSI 表的元信息。在 executeImpl 对应的主线逻辑之外,addTableMetaTask 同时提供了 rollbackImpl 方法清空 GSI 表的元信息,这实际上即是该 Task 对应的 undolog,在 DDL job 发生回滚时调用该方法即可恢复原有的 tableMeta 信息。

  2. TableSyncTask:作为前述的节点间通信机制的一个特例,实现 tableMeta 元信息在 CN 节点间的同步,通知集群内所有 CN 节点从 MetaDB 中加载 tableMeta 元信息。

  3. bringUpGsi:是一系列 Task 构成的 Task List,它参照 online schema change 定义了元信息的演化和数据回填流程,其原理见PolarDB-X Online Schema Change,其中也包含 TableSyncTask。我们将在下一节着重介绍此 Task.

除了上述 Task,在逻辑 DDL 中常用的元信息读写、物理 DDL 和 DML Task 均已预先定义,读者可在

polardbx-executor/src/main/java/com/alibaba/polardbx/executor/ddl/job/task/basic
复制代码

目录下找到。


在 doCreate 方法的最后,addSequentialTasks 向构造出的 ddlJob 中批量添加 Task 任务。作为最简单的 Task 组合接口,addSequentialTask 以参数 taskList 中元素的下标顺序作为拓扑序构造 Task 之间的依赖关系。此方法揭示出在 ddlJob 中以 DAG 评接 DDL Task 的组合方式,DDL 引擎中将根据 DAG 中的拓扑序调度 DDL Task 。此外,在声明复杂依赖关系时还可使用 addTask , addTaskRelationShip 等方法单独声明 Task 及其依赖顺序。


DDL 与 DML 的同步

上一节中的 bringUpGsi 是 online-schema-change 流程的一个完整实现,定义于

com.alibaba.polardbx.executor.ddl.job.task.factory.GsiTaskFactory#addGlobalIndexTasks
复制代码

方法中,我们以此为例介绍 DDL 与 DML 同步的几个关键组成部分。

·    public static List<DdlTask> addGlobalIndexTasks(String schemaName,                                                    String primaryTableName,                                                    String indexName,                                                    boolean stayAtDeleteOnly,                                                    boolean stayAtWriteOnly,                                                    boolean stayAtBackFill) {        ....        DdlTask writeOnlyTask = new GsiUpdateIndexStatusTask(            schemaName,            primaryTableName,            indexName,            IndexStatus.DELETE_ONLY,            IndexStatus.WRITE_ONLY        ).onExceptionTryRecoveryThenRollback();        ....        taskList.add(deleteOnlyTask);        taskList.add(new TableSyncTask(schemaName, primaryTableName));        ....        taskList.add(writeOnlyTask);        taskList.add(new TableSyncTask(schemaName, primaryTableName));        ...        taskList.add(new LogicalTableBackFillTask(schemaName, primaryTableName, indexName));        ...        taskList.add(writeReOrgTask);        taskList.add(new TableSyncTask(schemaName, primaryTableName));        taskList.add(publicTask);        taskList.add(new TableSyncTask(schemaName, primaryTableName));        return taskList;    }
复制代码
  1. 元信息的多版本演化过程。索引的元信息经历从 CREATING -> DELETE_ONLY -> WRITE_ONLY -> WRITE_REORG -> PUBLIC 的完整版本演化流程,其状态设计原理见 online schema change 一文,此状态设计过程保证系统中存在双版本元数据时不会出现不一致状态。

  2. DML 在 CN 缓存的对应版本元信息下的重写。在元信息版本演化过程中,进入执行器的 DML 语句将会分派对应的 rewriter 根据索引状态生成实际的 DML,从而满足 write_only, update_only 等元信息状态约束,并在数据回填时实现 DML 双写。

  3. MetaDB 和 CN 缓存元信息之间的同步时序。sync 操作将按一定的时序修改上述两部分元信息,一方面保证 DML 事务期间对应版本元信息的可用性,同时在相应版本 DML 事务结束后抢占 MDL 锁并 invalidate 原版本信息,另一方面通过提前加载新版本元信息,确保同一时刻其他 DML 事务总能获取可用的元信息。

DML 的重写

索引元信息中的状态定义见

com.alibaba.polardbx.gms.metadb.table.IndexStatus
复制代码

. DDL 更新该字段的同时,优化器为修改主表的 DML 语句在 RBO 中分配相应的 gsiWriter, 以简单的 Insert 语句

insert into t1 values(1,2)
复制代码

为例:


  1. Insert 类型的语句在对应的 RBO 阶段 polardbx-optimizer/src/main/java/com/alibaba/polardbx/optimizer/core/planner/rule/OptimizeLogicalInsertRule.java

根据主表元信息中的 gsiMeta 在执行计划中关联到相应 GSI 的 writer.

//OptimizeLogicalInsertRule.javaprivate LogicalInsert handlePushdown(LogicalInsert origin, boolean deterministicPushdown, ExecutionContext ec){      ...//other writers        final List<InsertWriter> gsiInsertWriters = new ArrayList<>();    IntStream.range(0, gsiMetas.size()).forEach(i -> {        final TableMeta gsiMeta = gsiMetas.get(i);        final RelOptTable gsiTable = catalog.getTableForMember(ImmutableList.of(schema, gsiMeta.getTableName()));        final List<Integer> gsiValuePermute = gsiColumnMappings.get(i);        final boolean isGsiBroadcast = TableTopologyUtil.isBroadcast(gsiMeta);        final boolean isGsiSingle = TableTopologyUtil.isSingle(gsiMeta);        //different write stragety for corresponding table type.        gsiInsertWriters.add(WriterFactory                             .createInsertOrReplaceWriter(newInsert, gsiTable, sourceRowType, gsiValuePermute, gsiMeta, gsiKeywords,                                                          null, isReplace, isGsiBroadcast, isGsiSingle, isValueSource, ec));        });   ...}
复制代码


  1. writer 在执行器的 Handler 中 apply 到物理执行计划中。其中 getInput 方法根据 gsiWriter 中包含的 indexStatus 状态信息决定是否生成当前 DML 对 GSI 的物理写操作,因此,最终 DML 仅在索引更新到 WRITE_ONLY 后开启对 GSI 的双写。

//LogicalInsertWriter.java    protected int executeInsert(LogicalInsert logicalInsert, ExecutionContext executionContext,                                HandlerParams handlerParams) {        ...       final List<InsertWriter> gsiWriters = logicalInsert.getGsiInsertWriters();       gsiWriters.stream()                .map(gsiWriter -> gsiWriter.getInput(executionContext))                .filter(w -> !w.isEmpty())                .forEach(w -> {                    writableGsiCount.incrementAndGet();                    allPhyPlan.addAll(w);                });
//IndexStatus.java...public static final EnumSet<IndexStatus> WRITABLE = EnumSet.of(WRITE_ONLY, WRITE_REORG, PUBLIC, DROP_WRITE_ONLY);
public boolean isWritable() { return WRITABLE.contains(this);}...
复制代码


元信息同步时序

在通过 sync 同步 MetaDB 和内存元信息时,将依照以下步骤进行:

  1. loadSchema: 首先预加载 MetaDB 中的新版本元信息到内存中。加载完成后,内存中新增新版本元数据及其 MDL 锁,此后进入 CN 的 DML(node1.thread2) 均会获取新版本元数据的 MDL.

  2. mdl(v0).writeLock: 然后尝试获取旧版本元数据的 MDL 锁(node1.mdl.v0),当获取锁成功时,旧版本事务即被排空。

  3. expireSchemaManager(t1, g_i1, v0):消除旧版本元信息,将新版本元信息标记为旧版本元信息。


上述 sync 调用的响应者是 CN 集群中的所有节点(包括调用者本身),当所有节点均完成元信息版本切换时,sync 调用即告成功,此时序保证了以下几点:


  1. 任一时刻整个集群中至多存在两个版本元信息,并且至少有一个版本的元信息可加 MDL 读锁,因而进入 CN 的 MDL 语句永不会阻塞。

  2. 旧版本元信息在 loadSchema 后即不再关联到 MDL 事务,可保证有限的状态切换时间。

其中 1,2,3 步均实现于

com.alibaba.polardbx.executor.gms.GmsTableMetaManager#tonewversion
复制代码

方法。

   public void tonewversion(String tableName, boolean preemptive, Long initWait, Long interval, TimeUnit timeUnit) {        synchronized (OptimizerContext.getContext(schemaName)) {            GmsTableMetaManager oldSchemaManager =                (GmsTableMetaManager) OptimizerContext.getContext(schemaName).getLatestSchemaManager();            TableMeta currentMeta = oldSchemaManager.getTableWithNull(tableName);
long version = -1;

....//查询当前MetaDB中的元数据版本并将其赋值给vesion
//1. loadSchema SchemaManager newSchemaManager = new GmsTableMetaManager(oldSchemaManager, tableName, rule); newSchemaManager.init();
OptimizerContext.getContext(schemaName).setSchemaManager(newSchemaManager);
//2. mdl(v0).writeLock final MdlContext context; if (preemptive) { context = MdlManager.addContext(schemaName, initWait, interval, timeUnit); } else { context = MdlManager.addContext(schemaName, false); }
MdlTicket ticket = context.acquireLock(new MdlRequest(1L, MdlKey .getTableKeyWithLowerTableName(schemaName, currentMeta.getDigest()), MdlType.MDL_EXCLUSIVE, MdlDuration.MDL_TRANSACTION));
//3. expireSchemaManager(t1, g_i1, v0) oldSchemaManager.expire();

....//失效使用旧版本元信息的PlanCache.
context.releaseLock(1L, ticket); } }
复制代码

通过上述几个要素,DDL Job 在定义时实现了 DDL 与 DML 的同步,保证了 DML 的 online 执行。

总结

本文主要解读了 PolarDB-X 中 CN 端的 DDL Job 定义相关的代码,以添加全局二级索引为例,对 DDL Job 定义和执行的整体流程进行了梳理,并着重阐述了 DDL job 定义中涉及 online 和 crash safe 特性的关键逻辑。对于 DDL 引擎中的 DDL job 执行流程,敬请期待下篇解读。

用户头像

还未添加个人签名 2022.01.10 加入

还未添加个人简介

评论

发布
暂无评论
PolarDB-X 源码解读:DDL的一生(上)_数据库_阿里云数据库开源_InfoQ写作社区