什么是分布式事务
说到事务,相信大家都接触过 MySQL 的事务,但是 MySQL 的事务并不能解决分布式场景下的事务问题。如下图。这时候,我们就需要用到分布式事务来解决分布式场景下的事务问题了。
通过上图,如果是让我们自己去实现一个分布式事务,要如何实现?
通过补偿的方式来进行分布式事务
通过全局的事务来控制
基于消息队列做可靠事件
...
实现的方法特别多,但是具体如何实现,还是需要根据公司的业务场景来决定。下面来介绍两种强一致性解决方案
强一致性解决方案
XA 分布式事务
最早的分布式事务模型是 X/Open 国际联盟提出的 X/Open Distributed Transaction Processing(DTP)模型,也就是大家常说的 X/Open XA 协议,简称 XA 协议。最典型的 XA 实现就是两阶段提交协议(2PC)。
二阶段提交协议
顾名思义,说明一整套流程分为两个阶段。
2PC 执行流程
阶段一:
协调者询问各个参与者是否可以正常执行事务操作,并开始等待各个参与者响应
各个参与者执行本地事务(写本地 undo/redo 日志),但是不进行提交事务。
各参与者向协调者反馈事务询问响应
阶段二:
所有参与者反馈 Yes(提交事务)
协调者像所有参与者发出 commit 请求
参与者收到 commit 请求之后,会正式执行事务提交操作,并在完成之后释放
参与者发送 ack 信息之后,协调者接收并完成事务。
有一个或多个参与者反馈 No(中断事务)
协调者向所有参与者发出 Rollback 请求
参与者收到 Rollback 请求后,会利用其在阶段一中记录的 Undo 日志来执行事务的回滚操作,并在完成回滚之后释放整个事务执行期间占用的资源。
参与者在完成事务回滚之后,向协调者发送 ack 信息。
协调者接收到所有参与者反馈的 ack 信息之后,完成事务中断
2PC 的一阶段和二阶段所有参与者协调者通知,都是串行执行
总上所述,大家也能发现 2PC 的优缺点
优点:原理简单
缺点:
同步阻塞:在 1,2 阶段的执行过程中,所有参与者的事务操作都是处于阻塞状态。
单点故障:协调者是单点,如果协调者出现问题,那么整个流程就锁住执行不了,也没设置参与者和协调者的超时机制
数据不一致:协调者向参与者发送 Commit 请求后,如果发生网络抖动,部分参与者收到 Commit 请求部分参与者没收到,会出现数据不一致的现象。
正因为 2PC 有这些缺点,所有出现了三阶段提交协议来弥补二阶段提交协议的部分缺点。
三阶段提交协议
三阶段提交协议(3PC)是 2PC 的改进版,将 2PC 的"提交事务请求"过程一份为二,变成了 CanCommit,PreCommit 和 doCommit 三个阶段组成的事务处理协议。并且引入了超时机制。
3PC 执行流程
阶段一(canCommit 阶段):
协调者向所有参与者串行发送 CanCommit 请求。询问是否可以执行事务提交操作。然后开始等待参与者的响应。
参与者收到 CanCommit 请求之后,正常情况下,如果其自身认为可以顺利执行事务,则返回 Yes 响应,并进入预备状态,否则返回 No。
阶段二(preCommit 阶段):
所有参与者反馈 Yes
协调者向参与者发送 PreCommit 请求,并进入 Prepared 阶段。
参与者收到 PreCommit 请求,会执行事务操作,并将 undo 和 redo 信息记录到事务日志中。
如果参与者成功的执行了事务,则会返回 ack 响应,同时开始等待最终命令
部分参与者反馈 No 或者等待超时
协调者向所有参与者发送 abort 请求
参与者收到来自协调者的 abort 请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断
阶段三(doCommit 阶段):
执行成功
协调者收到参与者发送的 ack 响应,会从预提交状态变成提交状态,向所有参与者发送 doCommit 请求。
参与者收到 doCommit 请求之后,执行正式的事务提交。并在完成之后释放事务资源。
释放完事务资源,向协调者发送 ack 响应
协调者收到所有参与者的 ack 响应之后完成事务。
中断事务
协调者向所有参与者发送 abort 请求
参与者收到 abort 请求之后,利用 undo 日志来执行事务的回滚操作,并在完成回滚之后释放所有事务的资源
参与者完成事务回滚之后,向协调者发送 ack 信息
协调者收到所有 ack 信息之后,执行事务的中断
三阶段提交虽然解决了二阶段提交的同步阻塞和没有超时机制的问题。但是三阶段提交还是有很多问题没解决:
单点故障
协调者和参与者之间网络故障,最终还是会导致数据不一致现象。
不过,2PC 和 3PC 相对于互联网项目来说,在高并发的情况下,其实是并不适用的
TCC 分布式事务解决方案
TCC(Try-Confirm-Cancel)也被称为两阶段补偿事务,在蚂蚁金服中 TCC 有大量的应用。
第一阶段 - Try:检测预留资源
第二阶段 - Confirm:真正的业务操作提交
第三阶段 - Cancel:预留资源释放
这三个业务逻辑都需要在业务逻辑中进行实现。
这是一个下单流程
第二步扣减库存失败,但是订单状态和第三部,第四步都成功了,这显然是不符合逻辑的,所以这里我们就需要在业务上做处理,修改业务逻辑和数据表。我们要在商品表上新加一个冻结库存数,积分表加一个预加积分数,其他同理
下单成功,修改库存是 98,但是要在冻结库存上把扣减的库存变成 2,其他同理。完成后做本地的事务提交。这也就是 Try 阶段做的事情。如果全部成功就到 Confirm 阶段
Confirm 阶段:这个阶段就做正式的操作,也就是把冻结库存和预先增加积分这些全部清空。预增加积分清空,加到会员积分中,就是变成了 1200。这个阶段如果失败就需要一直重试。
但是如果 Try 阶段有一个事务进行了回滚,这时候就到了 Cancel 阶段。
Cancel 阶段:这时候 TCC 事务框架就会感知到。于是会通知所有服务去进行回滚,也就是把冻结库存补回到库存里面,其他同理。Cancel 阶段如果事务操作没成功也是需要一直重试。
TCC 的一些优化单独放在一章讲解。
自研分布式事务:本地事务表+消息最终一致性解决方案
2PC 等强一致性的场景其实是很窄的,更多的场景下,其实只需要保证数据最终一致就可以了。经过一小段延迟时间之后,数据最终是一致的。比如在一个下单流程,下单成功之后,我们需要清空购物车,和增长积分经验等,其实这些晚个几秒都是 ok 的。我们只需要保证,下单成功和清空购物车或者发优惠券,红包,涨经验这些,要么都成功,要么都失败。
所以这里我们的实现思路是:在下单请求之后,订单的事务去更新订单状态,但是在这个事务执行的过程中,我们在本地记录一条消息。这个消息就是一个日志,比如:清空购物车的日志或者增加积分的日志。因为这个日志是记录在本地,比如数据库中或文件形式都 ok,所以这里就没有分布式的问题。所以也就是说,这样操作就是把一个分布式事务变成了本地事务,这样就可以保证订单事务和本地消息记录是事务一致的。完成这一步就可以直接给客户端返回响应就 ok 了。
那既然本地事务操作日志已经记录了,接下来就需要一个异步的服务来去读取记录的本地消息,调用购物车的服务清空购物车和积分的服务增加积分就 ok 了。购物车清空和积分完成之后,我们去修改本地消息的状态变成完成。这个请求的过程中,如果操作失败了,可以通过重试来解决。最终,可以保证订单系统和其他系统的数据是一致的。
按照上面说的,我们一步一步进行实现。
首先需要有一个本地消息的实体类:
/**
* 事务消息实体 和数据库是一一对应的字段
*/
public class MsgInfo {
/**
* 主键
*/
private Long id;
/**
* 事务消息
*/
private String content;
/**
* 主题和RocketMQ对应
*/
private String topic;
/**
* 标签和RocketMQ对应
*/
private String tag;
/**
* 状态:1-等待,2-发送
*/
private int status;
/**
* 创建时间
*/
private Date createTime;
/**
* 延迟时间(单位:s)
* 最迟几秒钟要发的MQ中
*/
private int delay;
}
复制代码
复制代码
实体消息类创建完之后,这里需要一个去操作实体的类
/**
* 事务操作实体 实际上就是一个Queue
*/
public class Msg {
/**
* 主键 和MsgInfo是一一对应的
*/
private Long id;
/**
* db-url key, 跟 数据源map做映射
*/
private String url;
/**
* 已经处理次数
*/
private int haveDealedTimes;
/**
* 创建时间
*/
private long createTime;
/**
* 下次超时时间
*/
private long nextExpireTime;
}
复制代码
复制代码
实体类创建完成之后,我们来想一下,按照我们之前说的步骤,我们需要把订单放入 MQ 中,但是这里我们一般是不直接发送到 MQ,我们写完 DB 之后,需要先发送到 Queue 一个队列中去,Queue 里面存储的就是 Msg 实体类,所以这里我们就需要有一个投递工作线程,从 Queue 中弹出数据,投递工作线程拿到和数据库中的消息表做比较,查看数据库中是否有本地事务消息。如果没有,这条信息就结束。如果有,那我们就需要去创建 MQ 消息,创建消息之前,我们是要设置一个最大重试次数用来保证准确性(可能会出现事务还没提交去读取没读取出来),可以有重试的机会,如果没超限且查询出来了,就要创建 MQ 消息用来投递。如果成功就直接结束。
创建 MQ 如果消息投递失败了,那这个消息就放到放入一个重试的时间轮队列中,所以这里我们也需要有一个对应的时间轮队列读取的线程。从时间轮队列里面获取到线程之后,去判断是否是超时,根据 MsgInfo 的 delay 属性,如果超时就结束了。如果没超时就加入到事务操作队列中。
如果这个时候,服务挂了,队列里面东西都丢了,这时候,我们要怎么办呢?所以这时候我们还需要一个补漏线程。我们重启之后,补漏线程查看是否持有锁,如果持有锁就获取从 DB 里面最近十分钟的等待事务线程根据 MsgInfo 的 status 判断,之后就去创建 MQ 消息,并且投递,如果投递成功就结束,如果投递失败就又重新走到重试时间轮队列。
这里为了可以快速查询锁信息,我们还需要一个持锁线程用来强锁,强到就加一个锁标识,如果强不到就说明不是这个服务来解决补漏问题。同时,也需要一个清理线程来定期的去清理已提交的任务。
接下来看一下初始化代码:
/**
* @方法名称 init
* @功能描述 init 初始化,config才是ok
* @param config 配置对象
*/
public void init(Config config) {
if (state.get().equals(State.RUNNING)) {
LOGGER.info("Msg Processor have inited return");
return;
}
LOGGER.info("MsgProcessor init start");
state.compareAndSet(State.CREATE, State.RUNNING);
// 1、设置环境
this.serverType = ConfigUtil.getServerType();
if (config.getEtcdHosts() != null && config.getEtcdHosts().length >= 1) {
envNeedLock = true;
defaultEtcd = config.getEtcdHosts();
LOGGER.info("serverType {} envNeedLock {} etcdhosts {}", serverType, envNeedLock, defaultEtcd);
}
// 2、设置配置
this.config = config;
// 3、设置 事务消息处理线程数
exeService = Executors.newFixedThreadPool(config.getThreadNum(), new ThreadFactory("MsgProcessorThread-"));
for (int i = 0; i < config.getThreadNum(); i++) {
exeService.submit(new MsgDeliverTask());
}
// 4、设置 其他线程
scheService = Executors.newScheduledThreadPool(config.getSchedThreadNum(), new ThreadFactory("MsgScheduledThread-"));
// 设置时间转动线程:时间轮重试投递失败的事务操作
scheService.scheduleAtFixedRate(new TimeWheelTask(), TIME_WHEEL_PERIOD, TIME_WHEEL_PERIOD, TimeUnit.MILLISECONDS);
// 设置事务消息删除线程
scheService.scheduleAtFixedRate(new CleanMsgTask(), config.deleteTimePeriod, config.deleteTimePeriod, TimeUnit.SECONDS);
// 设置 补漏线程:防止最近10分钟的线程被漏提交
scheService.scheduleAtFixedRate(new ScanMsgTask(), config.schedScanTimePeriod, config.schedScanTimePeriod, TimeUnit.SECONDS);
// 设置心跳线程:汇报 事务提交队列的堆积情况
scheService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
LOGGER.info("stats info msgQueue size {} timeWheelQueue size {}", msgQueue.size(), timeWheel.size());
}
}, 20, config.getStatsTimePeriod(), TimeUnit.SECONDS);
// 6、初始化锁客户端
initLock();
LOGGER.info("MsgProcessor init end");
}
复制代码
复制代码
事务投递线程:
//事务投递线程
class MsgDeliverTask implements Runnable {
@Override
public void run() {
while (true) {
if (!state.get().equals(State.RUNNING)) {
break;
}
try {
// 1、每100ms从 队列 弹出一条事务操作消息
Msg msg = null;
try {
//拉出来消息
msg = msgQueue.poll(DEF_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
}
if (msg == null) {
continue;
}
LOGGER.debug("poll msg {}", msg);
int dealedTime = msg.getHaveDealedTimes() + 1;
msg.setHaveDealedTimes(dealedTime);
// 2、从db获取实际事务消息(这里我们不知道是否事务已经提交,所以需要从DB里面拿)
MsgInfo msgInfo = msgStorage.getMsgById(msg);
LOGGER.debug("getMsgInfo from DB {}", msgInfo);
if (msgInfo == null) {
if (dealedTime < MAX_DEAL_TIME) {
// 3.1、加入时间轮转动队列:重试投递
long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
msg.setNextExpireTime(nextExpireTime);
timeWheel.put(msg);
LOGGER.debug("put msg in timeWhellQueue {} ", msg);
}
} else {
// 3.2、投递事务消息
Message mqMsg = buildMsg(msgInfo);
LOGGER.debug("will sendMsg {}", mqMsg);
SendResult result = producer.send(mqMsg);
LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
if (null == result || result.getSendStatus() != SendStatus.SEND_OK) {
// 投递失败,重入时间轮
if (dealedTime < MAX_DEAL_TIME) {
long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
msg.setNextExpireTime(nextExpireTime);
timeWheel.put(msg);
// 这里可以优化 ,因为已经确认事务提交了,可以从DB中拿到了
LOGGER.debug("put msg in timeWhellQueue {} ", msg);
}
} else if (result.getSendStatus() == SendStatus.SEND_OK) {
// 投递成功,修改数据库的状态(标识已提交)
int res = msgStorage.updateSendMsg(msg);
LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res);
}
}
} catch (Throwable t) {
LOGGER.error("MsgProcessor deal msg fail", t);
}
}
}
}
复制代码
复制代码
重试时间轮线程
class TimeWheelTask implements Runnable {
@Override
public void run() {
try {
if (state.get().equals(State.RUNNING)) {
long cruTime = System.currentTimeMillis();
//检查是否有Msg
Msg msg = timeWheel.peek();
// 拿出来的时候有可能还没有超时
while (msg != null && msg.getNextExpireTime() <= cruTime) {
msg = timeWheel.poll();
LOGGER.debug("timeWheel poll msg ,return to msgQueue {}", msg);
// 重新放进去
msgQueue.put(msg);
msg = timeWheel.peek();
}
}
} catch (Exception ex) {
LOGGER.error("pool timequeue error", ex);
}
}
}
复制代码
复制代码
删除本地消息线程
class CleanMsgTask implements Runnable {
@Override
public void run() {
if (state.get().equals(State.RUNNING)) {
LOGGER.debug("DeleteMsg start run");
try {
Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
while (it.hasNext()) {
DataSource dataSrc = it.next();
if (holdLock) {
LOGGER.info("DeleteMsgRunnable run ");
int count = 0;
int num = config.deleteMsgOneTimeNum;
// 影响行数 不等于 删除数 及 大于最大删除数时,本次task结束
while (num == config.deleteMsgOneTimeNum && count < MAX_DEAL_NUM_ONE_TIME) {
try {
num = msgStorage.deleteSendedMsg(dataSrc, config.deleteMsgOneTimeNum);
count += num;
} catch (SQLException e) {
LOGGER.error("deleteSendedMsg fail ", e);
}
}
}
}
} catch (Exception ex) {
LOGGER.error("delete Run error ", ex);
}
}
}
}
复制代码
复制代码
补漏线程
class ScanMsgTask implements Runnable {
@Override
public void run() {
if (state.get().equals(State.RUNNING)) {
LOGGER.debug("SchedScanMsg start run");
Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
while (it.hasNext()) {
DataSource dataSrc = it.next();
boolean canExe = holdLock;
if (canExe) {
LOGGER.info("SchedScanMsgRunnable run");
int num = LIMIT_NUM;
int count = 0;
while (num == LIMIT_NUM && count < MAX_DEAL_NUM_ONE_TIME) {
try {
List<MsgInfo> list = msgStorage.getWaitingMsg(dataSrc, LIMIT_NUM);
num = list.size();
if (num > 0) {
LOGGER.debug("scan db get msg size {} ", num);
}
count += num;
for (MsgInfo msgInfo : list) {
try {
Message mqMsg = buildMsg(msgInfo);
SendResult result = producer.send(mqMsg);
LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
if (result != null && result.getSendStatus() == SendStatus.SEND_OK) {
// 修改数据库的状态
int res = msgStorage.updateMsgStatus(dataSrc, msgInfo.getId());
LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res);
}
} catch (Exception e) {
LOGGER.error("SchedScanMsg deal fail", e);
}
}
} catch (SQLException e) {
LOGGER.error("getWaitMsg fail", e);
}
}
}
}
}
}
}
复制代码
复制代码
通过上面的架构图,再结合我之前讲过的高并发情况下扣减库存的第三种方案,上面其实是有可以优化的点。所以这里其实就可以直接到创建 MQ 消息投递就可以了。之后的时间轮队列也可以直接创建 MQ 发送到投递线程中,进行直接发送 MQ 消息就 ok。
作者:Five 在努力
链接:https://juejin.cn/post/7006703502630567944
来源:掘金
评论