写点什么

使用强大的 DBPack 处理分布式事务(PHP 使用教程)

作者:峨嵋闲散人
  • 2022 年 7 月 01 日
  • 本文字数:5997 字

    阅读完需:约 20 分钟

使用强大的DBPack处理分布式事务(PHP使用教程)

主流的分布式事务的处理方案


近些年,随着微服务的广泛使用,业务对系统的分布式事务处理能力的要求越来越高。

早期的基于 XA 协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。


现在主流的 TCC 事务方案和 SAGA 事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。


新兴的 AT 事务解决方案,例如SeataSeata-golang,通过数据源代理层的资源管理器 RM 记录 SQL 回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如 Seata 只支持 Java 语言类型的微服务,Seata-golang 只支持 Go 语言类型的微服务。


为了突破 AT 事务对业务编程语言的限制,现在业界正在往 DB Mesh 的方向发展,通过将事务中间件部署在 SideCar 的方式,达到任何编程语言都能使用分布式事务中间件的效果。


DBPack是一个处理分布式事务的数据库代理,其能够拦截 MySQL 流量,生成对应的事务回滚镜像,通过与 ETCD 协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿 SQL 操作,支持接入任何编程语言。DBPack 还支持 TCC 事务模式,能够自动补偿 HTTP 请求。目前其 demo 已经有 Java、Go、Python 和 PHP,TCC 的 sample 也已经在路上了,demo 示例可以关注dbpack-samples


最新版 DBPack 不仅支持预处理的 sql 语句,还支持 text 类型的 sql。DBPack 最新版还兼容了 php8 的 pdo_mysql 扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示

这个包的内容为:

07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 header00 // 标识 payload 为 OKPacket00 // affected row00 // last insert id03 00 // 状态标志位00 00 // warning 数量
复制代码

dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active 异常。最新版本已经修复了这个问题。


下图是具体的 DBPack 事务流程图。


其事务流程简要描述如下:

  1. 客户端向聚合层服务的 DBPack 代理发起 HTTP 请求。

  2. DBPack 生成全局唯一的 XID,存储到 ETCD 中。注意请求的地址和端口指向 DBPack,并不直接指向实际 API。

  3. 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过 HTTP header(X-Dbpack-Xid)拿到 XID 了。此时,聚合服务调用服务 1 并传递 XID。

  4. 服务 1 拿到 XID,通过 DBPack 代理,注册分支事务(生成 BranchID 等信息,并存储到 ETCD)。

  5. 服务 1 的分支事务注册成功后,生成本地事务的回滚镜像,随着本地事务一起 commit。

  6. 服务 2 进行与服务 1 相同的步骤 4 和 5。

  7. 聚合层服务根据服务 1 和服务 2 的结果,决议是全局事务提交还是回滚,如果是成功,则返回 HTTP 200 给 DBPack(除 200 以外的状态码都会被 DBPack 认为是失败)。DBPack 更新 ETCD 中的全局事务状态为全局提交中或回滚中。

  8. 服务 1 和服务 2 的 DBPack,通过 ETCD 的 watch 机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。

  9. 所有的分支事务提交或回滚完成后,聚合层服务的 DBPack 的协程会检测到事务已经完成,将从 ETCD 删除 XID 和 BranchID 等事务信息。

本文将以 PHP 语言为例,详细介绍如何使用 PHP 对接 DBPack 完成分布式事务。实际使用其他语言时,对接过程也是类似的。


使用 PHP 对接 DBPack 实现分布式事务

前置条件

  • 业务数据库为 mysql 数据库

  • 业务数据表为 innodb 类型

  • 业务数据表必须有主键

Step0: 安装 ETCD

ETCD_VER=v3.5.3
# choose either URLGOOGLE_URL=https://storage.googleapis.com/etcdGITHUB_URL=https://github.com/etcd-io/etcd/releases/downloadDOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gzrm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gztar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version/tmp/etcd-download-test/etcdctl version/tmp/etcd-download-test/etcdutl version
复制代码

Step1: 在业务数据库中创建 undo_log 表

undo_log 表用于存储本地事务的回滚镜像。

-- ------------------------------ Table structure for undo_log-- ----------------------------DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `branch_id` bigint(20) NOT NULL,  `xid` varchar(100) NOT NULL,  `context` varchar(128) NOT NULL,  `rollback_info` longblob NOT NULL,  `log_status` int(11) NOT NULL,  `log_created` datetime NOT NULL,  `log_modified` datetime NOT NULL,  `ext` varchar(100) DEFAULT NULL,  PRIMARY KEY (`id`),  KEY `idx_unionkey` (`xid`,`branch_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
复制代码


Step2: 编写配置文件,对接 DBPack

# 更新distributed_transaction.etcd_config.endpoints# 更新listeners配置项,调整为实际聚合层服务的地址和端口# 更新filters配置项,配置聚合层服务的API endpointvim /path/to/your/aggregation-service/config-aggregation.yaml
# 更新distributed_transaction.etcd_config.endpoints# 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口# 更新data_source_cluster.dsnvim /path/to/your/business-service/config-service.yaml
复制代码


Step3: 运行 DBPack

git clone git@github.com:cectc/dbpack.git
cd dbpack# build on local envmake build-local# build on production envmake build
./dist/dbpack start --config /path/to/your/config-aggregation.yaml
./dist/dbpack start --config /path/to/your/config-service.yaml
复制代码


Step4: 配置 vhost,监听 php 项目端口

以 Nginx 为例,配置如下


server {listen 3001; # 暴露的服务端口 index index.php index.html;root /var/www/code/; # 业务代码根目录


server {    listen 3001; # 暴露的服务端口    index index.php index.html;    root /var/www/code/; # 业务代码根目录
location / { try_files $uri /index.php?$args; }
location ~ \.php$ { fastcgi_split_path_info ^(.+\.php)(/.+)$; fastcgi_pass order-svc-app:9000; # php-fpm 端口 fastcgi_index index.php; include fastcgi_params; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; fastcgi_param PATH_INFO $fastcgi_path_info; }}
复制代码


Step5: 编写应用程序

aggregation service example

<?php
class AggregationSvc{
public function CreateSo(string $xid, bool $rollback): bool { $createSoSuccess = $this->createSoRequest($xid); if (!$createSoSuccess) { return false; } $allocateInventorySuccess = $this->allocateInventoryRequest($xid); if (!$allocateInventorySuccess) { return false; } if ($rollback) { return false; } return true; }
// private function createSoRequest(string $xid) ... // private function allocateInventoryRequest(string $xid) ...}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');$reaHeaders = getallheaders();
$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';
if (empty($xid)) { die('xid is not provided!');}
$aggregationSvc = new AggregationSvc();
if ($_SERVER['REQUEST_METHOD'] === 'POST') { switch ($reqPath) { case '/v1/order/create': if ($aggregationSvc->CreateOrder($xid, false)) { responseOK(); } else { responseError(); } case '/v1/order/create2': if ($aggregationSvc->CreateSo($xid, true)) { responseOK(); } else { responseError(); } break; default: die('api not found'); }}
function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]);}
function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]);}
复制代码


order service example

<?php
class OrderDB{ private PDO $_connection; private static OrderDB $_instance; private string $_host = 'dbpack-order'; private int $_port = 13308; private string $_username = 'dksl'; private string $_password = '123456'; private string $_database = 'order';
const insertSoMaster = "INSERT /*+ XID('%s') */ INTO order.so_master (sysno, so_id, buyer_user_sysno, seller_company_code, receive_division_sysno, receive_address, receive_zip, receive_contact, receive_contact_phone, stock_sysno, payment_type, so_amt, status, order_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";
const insertSoItem = "INSERT /*+ XID('%s') */ INTO order.so_item(sysno, so_sysno, product_sysno, product_name, cost_price, original_price, deal_price, quantity) VALUES (?,?,?,?,?,?,?,?)";
public static function getInstance(): OrderDB { if (empty(self::$_instance)) { self::$_instance = new self(); } return self::$_instance; }
private function __construct() { try { $this->_connection = new PDO( "mysql:host=$this->_host;port=$this->_port;dbname=$this->_database;charset=utf8", $this->_username, $this->_password, [ PDO::ATTR_PERSISTENT => true, PDO::ATTR_EMULATE_PREPARES => false, // to let DBPack handle prepread sql ] ); } catch (PDOException $e) { die($e->getMessage()); } }
private function __clone() { }
public function getConnection(): PDO { return $this->_connection; }
public function createSo(string $xid, array $soMasters): bool { $this->getConnection()->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); try { $this->getConnection()->beginTransaction(); foreach ($soMasters as $master) { if (!$this->insertSo($xid, $master)) { throw new PDOException("failed to insert soMaster"); } } $this->getConnection()->commit(); } catch (PDOException $e) { $this->getConnection()->rollBack(); return false; } return true; }
private function insertSo(string $xid, array $soMaster): bool { // insert into so_master, so_item ... }}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');$reqHeaders = getallheaders();
$xid = $reqHeaders['Xid'] ?? '';
if (empty($xid)) { die('xid is not provided!');}
if ($_SERVER['REQUEST_METHOD'] === 'POST') { if ($reqPath === '/createSo') { $reqBody = file_get_contents('php://input'); $soMasters = json_decode($reqBody, true);
$orderDB = OrderDB::getInstance(); $result = $orderDB->createSo($xid, $soMasters);
if ($result) { responseOK(); } else { responseError(); } }}
function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]);}
function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]);}
复制代码

Step6: 访问聚合层业务接口

curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}
复制代码

注意的点

  • 无论是使用 mysqli 驱动、pdo_mysql 驱动,还是通过mysql_connect()连接数据库(<=php5.4),在start transaction;开始之后,后续的业务操作必须在同一个数据库连接上进行。

  • DBPack 通过 xid(全局事务唯一 ID)在事务上下文中传播,业务数据库执行的业务 SQL 语句中,需要加入 xid 注释,这样 DBPack 才能根据 xid 处理对应的事务。例如insert /*+ XID('%s') */ into xx ...;

参考链接


作者简介:

卜贺贺。就职于日本楽天 Rakuten CNTD,任 Application Engineer,熟悉 AT 事务、Seata-golang 和 DBPack。GitHub:https://github.com/bohehe

发布于: 2022 年 07 月 01 日阅读数: 7
用户头像

还未添加个人签名 2019.12.17 加入

还未添加个人简介

评论

发布
暂无评论
使用强大的DBPack处理分布式事务(PHP使用教程)_分布式事务_峨嵋闲散人_InfoQ写作社区