写点什么

CQRS 与 Event Sourcing

作者:胖子笑西风
  • 2022-11-18
    上海
  • 本文字数:6920 字

    阅读完需:约 23 分钟

CQRS与Event Sourcing

Event Souring

采购单不简单

小吴是一个公司的资深 SRM 开发,精通三层架构,代码写的贼六。有天,小吴收到业务小张的一个需求,实现一个发货逻辑。小吴一想,简单,不就是接收到一个请求,然后处理请求,将处理后的业务数据持久化一下么。于是坑哧坑哧...


public class PurchaseOrderService {
PurchaseOrderMapper purchaseOrderMapper;
public PurchaseOrder ship(){ PurchaseOrder po = purchaseOrderMapper.get(123); po.changeStatus("已发货"); purchaseOrderMapper.update(po); return po; }
}
复制代码


嗯,一个很常见的业务代码,逻辑上来看并没有什么问题。最重的是,格式还好看,完美!代码上线了,跑了一段时间。业务小张提了一个需求,他们发现一个采购单的状态不对,想知道 3 天前订单是一个什么状态。这时候小吴就有点懵了,好像。。。。没有打日志于是跟小张道了个歉,并且坑哧坑哧的补上一个日志。。。代码上线了,跑了一段时间。业务小张又来了,叫帮忙查一下 3 个月前某时某分某秒订单是什么状态,小吴熟练的登上一个机器,看了一下,糟糕,日志只保留 1 个月,查了一下中间还有小伙伴把日志的格式给改了,而且不知道改过几版。。。于是小吴给跟小张又道了个歉...吸收了教训,小吴心想,那把日志存到数据库吧,后面直接在数据库里查。。。心想这回没问题了吧代码上线了,跑了一段时间。业务小张又来了,叫帮忙查一下 5 个月前某时某分某秒这个订单是什么价格,糟糕,好像没打价格变更的日志。。。。小吴一下又懵了,再看下代码,造成订单变更的代码太分散了,而且开发人员不一样,日志总是打不全。不管怎么样,先道歉吧,于是小吴很熟练的给小张道了个歉。怎么办呢,小吴想起来前段时间学习了 DDD,于是花了大半个月实现了 DDD,这下好了,订单状态变更的方法全在 PurchaseOrder 这个 Entity 里


@Slf4jpublic class PurchaseOrder {
PurchaseOrderRepo purchaseOrderRepo; LogRepo logRepo;
public void ship(){ purchaseOrderRepo.updateStatus("已发货"); logRepo.save(new Log(this,"ship","已发货")); }
public void negotiated(BigDecimal price){ purchaseOrderRepo.changePrice(prices); logRepo.save(new Log(this,"negotiated",price)); }
}
复制代码


看起来没啥大问题了,表里所有的方法都加上日志了,想看啥时候的价格,查一下日志。想看啥时候的价格,查一下日志。小吴心里得意了。但是,业务小张又来了。业务小丁跟供应商那边谈崩了,供应商要求把三个月前某时某分某秒状态为未付款的采购单价格改到该订单两个月前某时某分某秒的价格,再加个三折。这。。。好吧,道歉已经没啥用了,做,肯定能做!于是小吴直接写个脚本改数据库,查下日志,把某时刻状态为未付款的采购单找出来。再查下日志,找到这些订单两个月的价格,然后 mapping 一下,修改价格。。。好不容易做完了,业务方小张又来了,这回不光价格了,采购数量,优惠返回,账期通通给我改了。。。于是小吴又双叒叕的道歉了。。经历了这几件事,小吴就想,有没有什么办法,可以追溯状态的变更。

采购单与账单的故事

还没等想好,业务小张又来了,让在采购单修改成发货状态的时候,同时生成一个账单。财务要算账。小吴心想,这简单,等等,简单?总感觉有坑,但又不知道是什么坑。不管了,不想道歉就先干吧。于是坑哧坑哧的又加了代码


@Slf4jpublic class PurchaseOrder {
PurchaseOrderRepo purchaseOrderRepo;
BillFactory billFactory; LogRepo logRepo;
public void ship(){ purchaseOrderRepo.updateStatus("已发货"); logRepo.save(new Log(this,"ship","已发货")); Bill bill = billFactory.createBill(this); bill.save(); }
public void negotiated(BigDecimal price){ purchaseOrderRepo.changePrice(price); logRepo.save(new Log(this,"negotiated",price)); }
}
复制代码


刚上线没多久,业务小张又来了,财务同学说账单来了,他收不到消息。果然,坑来了,于是小吴又加上了代码,测试,发布,上线。


@Slf4jpublic class PurchaseOrder {
PurchaseOrderRepo purchaseOrderRepo;
BillFactory billFactory; LogRepo logRepo;
NoticeService noticeService;
public void ship(){ purchaseOrderRepo.updateStatus("已发货"); logRepo.save(new Log(this,"ship","已发货")); Bill bill = billFactory.createBill(this); bill.save(); noticeService.notice('财务',this); }
public void negotiated(BigDecimal price){ purchaseOrderRepo.changePrice(price); logRepo.save(new Log(this,"negotiated",price)); }
}
复制代码


刚加完,业务小张又来了。。。小吴本能的脑袋里飘来两个字:耦合。于是小吴又想,能不能在通知状态的时候进行解耦呢。好了,现在小吴的问题有 3 个了:1.可以追溯状态的变更 2.通知状态的变更 3.解耦

办法总是有的

追溯状态的变更

先来第一个问题 ,追溯状态的变更,刚才我们说的日志我们记录下来了,如果我们可以像 mysql 的 binlog 同步一亲,通过重刷一下日志,就能把对象恢复到当时所有的状态。是不是就可能解决问题。怎么把日志重新刷,简单来说就是日志规范化。我们给日志定一个格式,我们姑且叫这个格式的日志为事件。


public class Event<T> implements Serializable{    private String eventId;    private String eventType;    private String eventTopic;    private T data;}
复制代码


而我们的 Aggregate 通过处理 Event 来变更自己的状态。


public interface Aggregate {        public void apply(Event event);    }
复制代码


这里我们可以看到,当一个 Aggregate 产生一个 Event,又通过消费这个 Event 来改变状态。这样做是不是多此一举呢。


public class PurchaseOrder implements Aggregate{
private EventStore eventStore;
public void ship(){ Event shipEvent = new ShipEvent(this); eventStore.save(shipEvent); apply(shipEvent); }
@Override public void apply(Event event) { purchaseOrderRepo.updateStatus(((ShipEvent)event).getData().getStatus()); }}
复制代码


这里就回到我们刚才说通过回刷事件来重溯状态的目的了。像上面,PurchaseOrder 的 ship 方法可以里面可以有很多逻辑,比如调用发个钉钉消息通知一下物流人员,比如调用一个供应商的服务。如果我们改变状态跟 ship(也就是业务逻辑)耦合在一起,那么重刷状态的时候就相当于多跑了一遍业务逻辑。

通知状态的变更

再来说一下通知事件。常用的通知事件就是观察者模式。通过定义 EventHandler 来处理事件。


public class PurchaseOrder implements Aggregate{
private EventStore eventStore;
List<EventHandler> handlerList;
public void ship(){ Event shipEvent = new ShipEvent(this); eventStore.save(shipEvent); for(EventHandler handler:handlerList){ handler.handle(shipEvent); } } }
复制代码


正如上面小吴所遇到的问题,这时候,PurchaseOrder 本身、账单系统、通知系统都作为一个事件处理者。


public class PurchaseOrder implements Aggregate{
private EventStore eventStore;
List<EventHandler> handlerList;
public void ship(){ Event shipEvent = new ShipEvent(this); eventStore.save(shipEvent); for(EventHandler handler:handlerList){ handler.handle(shipEvent); } }
@Override public void apply(Event event) { purchaseOrderRepo.updateStatus(((ShipEvent)event).getData().getStatus()); }}
复制代码


public class Bill implements Aggregate{
@Override public void apply(Event event) { Bill bill = billFactory.createBill(this); //发出账单创建事件 }}
复制代码


public class Notice implements Aggregate{
NoticeService noticeService;
@Override public void apply(Event event) { noticeService.notice('财务',event); }}
复制代码

解耦

好了,现在剩下最后一个问题了,解耦,在单体系统中,我们通过观察者模式来进行一个弱依赖,是可以被接受的。毕竟账单与采购单的逻辑都比较稳定,且通过观察者模式实现,修改了账单与采购单本身的逻辑并不会对另外一个有影响。但我们注意到,上述的代码还有一个不太谐的地方,那就是 Notice。我们很少在一个业务形态系统中抽象出来一个 Notice 实体,通知并不是业务系统的主要逻辑,只是为了方便交流而已。我们通常会把通知独立出来一个系统。我们知道,观察者模式在一个单体应用中要实现统一接口。怎么办呢?我们可以引入事件总线。事件总线负责发布命令,事件的处理者不仅仅是单体应用中的其它对象,事件总线还可以通过消息中间件,或者 RPC 调用,将命令传播至其它服务,使其它服务来同样来进行事件的处理。


public class PurchaseOrder implements Aggregate{
private EventStore eventStore;
private EventBus eventBus;
public void ship(){ Event shipEvent = new ShipEvent(this); eventStore.save(shipEvent); eventBus.send(shipEvent); }
}
复制代码


好了,解决了小吴所留下的三个问题。我们也就引入了一个新的概念,Event Sourcing(事件溯源)。我们可以把 Event Souring 简单的理解为以下两点:1.事件是不变的,以追加方式记录事件,形成事件日志 2.聚合的每次状态变化,都是一个事件的发生。聚合的状态变化是通过事件来更新。


事件溯源有几个重要的好处。例如,它保留了聚合的历史记录,这对于实现审计和监管的功能非常有帮助。它可靠地发布领域事件,这在微服务架构中特别有用。


小吴解决了这几个问题,业务上碰到的问题再也难不倒他了。要把采购单回溯到哪个状态,新建一个聚合实例,把事件重刷。要查询采购单某时候的价格,新建一个聚合实例,把事件重刷。业务小张又来了,业务小张又回去了。小吴很得意,又有点失落,不道歉还不习惯了。。。既然有点失落,小吴就想想目前做的事情,想到,既我采购单都是要回到某个时候的状态,现在不也是某个时候么,那么现在的状态是不是也是可以通过事件来刷回来。这样不是业务在页面上想查什么时候的状态就查什么时候的,底层一套逻辑就搞定了。说干就干。小吴不把采购单保留在数据库了,每次启动的时候,把所有采购单命令重新刷一次。当需要把某个采购单回复到某个时候的状态时,把当前状态清空,重刷采购单命令。


@Componentpublic class EventRestore {
private EventStore eventStore; private EventBus eventBus; @PostConstruct public void restore(){ List<Event> events = eventStore.queryAll(); for (Event event: events){ //false控制所有事件只刷本地的 eventBus.send(event,false); } }
}
复制代码


public class PurchaseOrderService {
private PurchaseOrderFactory purchaseOrderFactory;
private EventStore eventStore;
public PurchaseOrder queryPoByTime(Long poId,Date queryTime){ PurchaseOrder purchaseOrder = purchaseOrderFactory.get(poId); List<Event> eventList = eventStore.listBy(queryTime,poId); eventList.stream().forEach(event->purchaseOrder.apply(event)); return purchaseOrder; }
}
复制代码


小吴上网一查,找到一个牛逼哄哄的名词,In Memory。确实很 Memory 嘛。但是光 Memory 可能还不行。渐渐的小吴发现,系统里面有一些采购单,从 1 月份到 10 月份状态,状态一直变,价格一直在变更。造成每个重刷采购单都要花上好几秒的时间。嗯,这是一个大 V 采购单。怎么办呢。没想到办法,小吴决定去找了业务小张聊聊,聊聊,说不就聊出。。。灵感来了。小张很厚道,完全没有业务甲方爸爸的觉悟,跟小吴解释说,这个订单是一个长期订单,一开始给供应商定了一个预估的大量采购单,供应商一批一批给,价格也随时发生变化。但是业务上只关心三个月内的情况。三个月内,那不是前几个月的订单状态都白刷了。。。灵感来了,小吴回去坑哧坑哧,给订单建了一个三个月前的快照,每次刷的时候,都从这个时候开始刷。完美!世界一下子又美好了。


public class PurchaseOrderService {
private PurchaseOrderFactory purchaseOrderFactory;
private EventStore eventStore;
private SnapshotRepository snapshotRepo;
public PurchaseOrder queryPoByTime(Long poId,Date queryTime){ Snapshot snapshot = snapshotRepo.snapshotOf(poId); PurchaseOrder purchaseOrder = snapshot.rebuildTo(PurchaseOrder.getClass()); List<Event> eventList = eventStore.listBy(queryTime,poId); eventList.stream().forEach(event->purchaseOrder.apply(event)); return purchaseOrder; }
}
复制代码


事件是一成不变的。快照可以加快事件的重刷速度。

CQRS

业务飞速发展

系统上线了一段时间,小吴的得意劲也差不多过了,业务小张也好久没来找他了,正感到无聊,系统突然间报警了,系统负载太高。小吴看了一下,公司这段时候业务高速发展,不仅采购单大大的增加,业务查询量也大大增长。看了一下系统,采购单修改平均 20 的 qps,高的时候能到 100。 采购单查询平均 300 的 qps,高的时候能到 2000 多。主要是业务那边来了几十个分析大师,需要实时对订单进行分析。业务那边也增加了很多人,实时跟进。小吴敏锐的觉查到,查询应该与修改分开。不能互相影响。于是上网查了一下,找到了一个专有名词,CQRS,Command and Query Responsibility Segregation。



先不管 Command Bus,小吴发现,事件这部分他已经做完了。于是乎,小吴重新部署了一个应用,原来的那个专门负责接收请求,进行更新,小吴叫这个应用为应用 A。新部署的应用负责查询,叫应用 R。应用 A 与应用 R 共用一个事件存储来回刷事件。应用 A 部署了两台服务器,应用 B 部署了 5 台。互不影响。Perfect。

业务想看的更多

业务小张又来了,说现在大家要跟进的事情太多,能不能把当前的采购单与账单,发货单的信息集中在一起,形成一个报表。小吴心想,刷一下采购单的状态,刷一下发货单的状态,刷一下账单的状态,mapping。做的事情有点多啊,性能能不能跟上。于上问小张是要看这些单据上的所有信息么,当然不是,主要是价格信息还要仓库信息,要看到商品的最低价、最高价、移动加权平均价及到达的仓库。于上小吴本能的想到,那就重建一个模型,将三个单据的价格整合在一起,接收三个单据的事件,计算价格、汇总到达仓库。将计算的数据存储起来,供业务查询。


public class PuchaseFinanceAnalysicDTO {
private BigDecimal maxPurchasePrice;
private BigDecimal minPurchasePrice;
private BigDecimal avgPurchasePrice;
private BigDecimal maxBillPrice;
private BigDecimal minBillPrice;
private BigDecimal avgBillPrice;
private String arriveWh;
}
复制代码


小吴 Get 到了两个个新技能:1.查询模型不必遵循 DDD 业务逻辑,业务爸爸想要啥,给啥!2.不同的系统,可以通过消费不同的事件,得到自己关注的业务信息。

业务想要在手机上操作

业务小张说现在大家都要出去跑业务,电脑不能一直带在身边,能不能加一个手机操作页面。客户端开发人员说,手机那边一直都是用 TCP 长连接来与后端通信。小吴想起来前两天学的六边形架构,心想,传进来的参数是不是可以统一,不管是手机还是电脑,TCP 还是 HTTP,将传进来的报文 Adapt 成一个格式不就好了。CQRS 那个 Command 就很好用。于是乎,小吴将修改业务抽象出来了一个个 Command,并且吸收了 Event 的灵感,添加了 CommandBus,Command 可以通过 CommandBus 发到单体应用中,还可以通过 CommandBus 发往其它应用。这样,当后面业务再发展,采购单进来的 qps 太高,就可以将处理采购单请求与处理采购单逻辑分开,变成两个应用。处理采购单请求的应用只要负责将采购单转化成标准的 Command 就行了。而且,测试也可以能过命令行直接发一个 Command 就行,学以致用!Perfect!


public class Command<T> {
private String bizCode;
private T content; }
复制代码

总结

像之前讲六边形架构一样,CQRS 的核心在于首尾的标准化。抽离出来的命令与事件最好不要改变。这样可以保证核心领域的业务逻辑不变。Event Sourcing 讲的是 Event 的溯源,但有时候,溯源是要到 Command 的,所以有时候,我们也需要把 Command 存储起来,比如笔者后面要讲的业采一体系统设计。什么,你们不关心,想看小吴与小张后面的故事,额。。。


原文:

CQRS与Event Sourcing

微信公共号

胖子笑西风-Event Sourcing


发布于: 2022-11-18阅读数: 90
用户头像

不见明月,唯有醉客笑西风 2021-03-31 加入

还未添加个人简介

评论

发布
暂无评论
CQRS与Event Sourcing_架构_胖子笑西风_InfoQ写作社区