
使用强大的 DBPack 处理分布式事务(PHP 使用教程)

  • 2022 年 7 月 01 日
  2022 年 7 月 01 日

    阅读完需:约 20 分钟




早期的基于 XA 协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。

现在主流的 TCC 事务方案和 SAGA 事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。

新兴的 AT 事务解决方案,例如SeataSeata-golang,通过数据源代理层的资源管理器 RM 记录 SQL 回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如 Seata 只支持 Java 语言类型的微服务,Seata-golang 只支持 Go 语言类型的微服务。

为了突破 AT 事务对业务编程语言的限制,现在业界正在往 DB Mesh 的方向发展,通过将事务中间件部署在 SideCar 的方式,达到任何编程语言都能使用分布式事务中间件的效果。

DBPack是一个处理分布式事务的数据库代理,其能够拦截 MySQL 流量,生成对应的事务回滚镜像,通过与 ETCD 协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿 SQL 操作,支持接入任何编程语言。DBPack 还支持 TCC 事务模式,能够自动补偿 HTTP 请求。目前其 demo 已经有 Java、Go、Python 和 PHP,TCC 的 sample 也已经在路上了,demo 示例可以关注dbpack-samples

最新版 DBPack 不仅支持预处理的 sql 语句,还支持 text 类型的 sql。DBPack 最新版还兼容了 php8 的 pdo_mysql 扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示


07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 header00 // 标识 payload 为 OKPacket00 // affected row00 // last insert id03 00 // 状态标志位00 00 // warning 数量

dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active 异常。最新版本已经修复了这个问题。

下图是具体的 DBPack 事务流程图。


  1. 客户端向聚合层服务的 DBPack 代理发起 HTTP 请求。

  2. DBPack 生成全局唯一的 XID,存储到 ETCD 中。注意请求的地址和端口指向 DBPack,并不直接指向实际 API。

  3. 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过 HTTP header(X-Dbpack-Xid)拿到 XID 了。此时,聚合服务调用服务 1 并传递 XID。

  4. 服务 1 拿到 XID,通过 DBPack 代理,注册分支事务(生成 BranchID 等信息,并存储到 ETCD)。

  5. 服务 1 的分支事务注册成功后,生成本地事务的回滚镜像,随着本地事务一起 commit。

  6. 服务 2 进行与服务 1 相同的步骤 4 和 5。

  7. 聚合层服务根据服务 1 和服务 2 的结果,决议是全局事务提交还是回滚,如果是成功,则返回 HTTP 200 给 DBPack(除 200 以外的状态码都会被 DBPack 认为是失败)。DBPack 更新 ETCD 中的全局事务状态为全局提交中或回滚中。

  8. 服务 1 和服务 2 的 DBPack,通过 ETCD 的 watch 机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。

  9. 所有的分支事务提交或回滚完成后,聚合层服务的 DBPack 的协程会检测到事务已经完成,将从 ETCD 删除 XID 和 BranchID 等事务信息。

本文将以 PHP 语言为例,详细介绍如何使用 PHP 对接 DBPack 完成分布式事务。实际使用其他语言时,对接过程也是类似的。

使用 PHP 对接 DBPack 实现分布式事务


  • 业务数据库为 mysql 数据库

  • 业务数据表为 innodb 类型

  • 业务数据表必须有主键

Step0: 安装 ETCD

# choose either URLGOOGLE_URL=https://storage.googleapis.com/etcdGITHUB_URL=https://github.com/etcd-io/etcd/releases/downloadDOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gzrm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gztar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version/tmp/etcd-download-test/etcdctl version/tmp/etcd-download-test/etcdutl version

Step1: 在业务数据库中创建 undo_log 表

undo_log 表用于存储本地事务的回滚镜像。

-- ------------------------------ Table structure for undo_log-- ----------------------------DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `branch_id` bigint(20) NOT NULL,  `xid` varchar(100) NOT NULL,  `context` varchar(128) NOT NULL,  `rollback_info` longblob NOT NULL,  `log_status` int(11) NOT NULL,  `log_created` datetime NOT NULL,  `log_modified` datetime NOT NULL,  `ext` varchar(100) DEFAULT NULL,  PRIMARY KEY (`id`),  KEY `idx_unionkey` (`xid`,`branch_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step2: 编写配置文件,对接 DBPack

# 更新distributed_transaction.etcd_config.endpoints# 更新listeners配置项,调整为实际聚合层服务的地址和端口# 更新filters配置项,配置聚合层服务的API endpointvim /path/to/your/aggregation-service/config-aggregation.yaml
# 更新distributed_transaction.etcd_config.endpoints# 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口# 更新data_source_cluster.dsnvim /path/to/your/business-service/config-service.yaml

Step3: 运行 DBPack

git clone git@github.com:cectc/dbpack.git
cd dbpack# build on local envmake build-local# build on production envmake build
./dist/dbpack start --config /path/to/your/config-aggregation.yaml
./dist/dbpack start --config /path/to/your/config-service.yaml

Step4: 配置 vhost,监听 php 项目端口

以 Nginx 为例,配置如下

server {listen 3001; # 暴露的服务端口 index index.php index.html;root /var/www/code/; # 业务代码根目录

location / { try_files $uri /index.php?$args; }
location ~ \.php$ { fastcgi_split_path_info ^(.+\.php)(/.+)$; fastcgi_pass order-svc-app:9000; # php-fpm 端口 fastcgi_index index.php; include fastcgi_params; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; fastcgi_param PATH_INFO $fastcgi_path_info; }}

Step5: 编写应用程序

aggregation service example

class AggregationSvc{
public function CreateSo(string $xid, bool $rollback): bool { $createSoSuccess = $this->createSoRequest($xid); if (!$createSoSuccess) { return false; } $allocateInventorySuccess = $this->allocateInventoryRequest($xid); if (!$allocateInventorySuccess) { return false; } if ($rollback) { return false; } return true; }
// private function createSoRequest(string $xid) ... // private function allocateInventoryRequest(string $xid) ...}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');$reaHeaders = getallheaders();
$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';
if (empty($xid)) { die('xid is not provided!');}
$aggregationSvc = new AggregationSvc();
if ($_SERVER['REQUEST_METHOD'] === 'POST') { switch ($reqPath) { case '/v1/order/create': if ($aggregationSvc->CreateOrder($xid, false)) { responseOK(); } else { responseError(); } case '/v1/order/create2': if ($aggregationSvc->CreateSo($xid, true)) { responseOK(); } else { responseError(); } break; default: die('api not found'); }}
function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]);}
function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]);}

order service example

class OrderDB{ private PDO $_connection; private static OrderDB $_instance; private string $_host = 'dbpack-order'; private int $_port = 13308; private string $_username = 'dksl'; private string $_password = '123456'; private string $_database = 'order';
const insertSoMaster = "INSERT /*+ XID('%s') */ INTO order.so_master (sysno, so_id, buyer_user_sysno, seller_company_code, receive_division_sysno, receive_address, receive_zip, receive_contact, receive_contact_phone, stock_sysno, payment_type, so_amt, status, order_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";
const insertSoItem = "INSERT /*+ XID('%s') */ INTO order.so_item(sysno, so_sysno, product_sysno, product_name, cost_price, original_price, deal_price, quantity) VALUES (?,?,?,?,?,?,?,?)";
public static function getInstance(): OrderDB { if (empty(self::$_instance)) { self::$_instance = new self(); } return self::$_instance; }
private function __construct() { try { $this->_connection = new PDO( "mysql:host=$this->_host;port=$this->_port;dbname=$this->_database;charset=utf8", $this->_username, $this->_password, [ PDO::ATTR_PERSISTENT => true, PDO::ATTR_EMULATE_PREPARES => false, // to let DBPack handle prepread sql ] ); } catch (PDOException $e) { die($e->getMessage()); } }
private function __clone() { }
public function getConnection(): PDO { return $this->_connection; }
public function createSo(string $xid, array $soMasters): bool { $this->getConnection()->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); try { $this->getConnection()->beginTransaction(); foreach ($soMasters as $master) { if (!$this->insertSo($xid, $master)) { throw new PDOException("failed to insert soMaster"); } } $this->getConnection()->commit(); } catch (PDOException $e) { $this->getConnection()->rollBack(); return false; } return true; }
private function insertSo(string $xid, array $soMaster): bool { // insert into so_master, so_item ... }}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');$reqHeaders = getallheaders();
$xid = $reqHeaders['Xid'] ?? '';
if (empty($xid)) { die('xid is not provided!');}
if ($_SERVER['REQUEST_METHOD'] === 'POST') { if ($reqPath === '/createSo') { $reqBody = file_get_contents('php://input'); $soMasters = json_decode($reqBody, true);
$orderDB = OrderDB::getInstance(); $result = $orderDB->createSo($xid, $soMasters);
if ($result) { responseOK(); } else { responseError(); } }}
function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]);}
function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]);}

Step6: 访问聚合层业务接口

curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}


  • 无论是使用 mysqli 驱动、pdo_mysql 驱动,还是通过mysql_connect()连接数据库(<=php5.4),在start transaction;开始之后,后续的业务操作必须在同一个数据库连接上进行。

  • DBPack 通过 xid(全局事务唯一 ID)在事务上下文中传播,业务数据库执行的业务 SQL 语句中,需要加入 xid 注释,这样 DBPack 才能根据 xid 处理对应的事务。例如insert /*+ XID('%s') */ into xx ...;



卜贺贺。就职于日本楽天 Rakuten CNTD,任 Application Engineer,熟悉 AT 事务、Seata-golang 和 DBPack。GitHub:https://github.com/bohehe

发布于: 2022 年 07 月 01 日阅读数: 7

