写点什么

如何通过事件溯源实现百万 TPS 全内存撮合交易引擎

作者:Damon
  • 2024-10-29
    福建
  • 本文字数:8749 字

    阅读完需:约 29 分钟

如何通过事件溯源实现百万TPS全内存撮合交易引擎

​在现代金融市场中,股票撮合交易系统是核心组件之一。它的主要功能是接收买卖订单,并在最合适的价格下进行匹配。随着技术的发展,越来越多的系统开始采用事件溯源(Event Sourcing)架构来实现这种功能。本文将深入探讨事件溯源的概念及其在股票撮合交易中的应用。本人并没有从事过股票交易相关系统的开发经验,只是单纯个人喜好,根据网络上相关的一些撮合业务的描述,按照自己的理解实现的一套基于事件溯源的全内存撮合引擎。(熬了两晚才实现的  😴😴)。具体的代码实现以粘贴在文档末尾。并未完整进行测试,存在理解不到位或者 BUG,也请大家轻点拍砖,只是给大家提供一个新的思路。😜😜😜😜😜😜


什么是事件溯源?


事件溯源是一种软件架构模式,它通过记录所有状态变化的事件来替代传统的状态存储。每个事件都代表一个状态变化,应用程序的状态是通过这些事件的序列重建的。这种方式的主要优点在于可以追溯到每一个状态变化,提供了完整的历史记录,并且可以方便地进行版本管理和审计。


股票撮合交易系统的基本概念


在股票交易中,主要有两个角色:买方(投资者)和卖方(交易者)。买方提交买入订单,卖方提交卖出订单。撮合引擎需要根据价格和时间优先级来匹配这些订单,从而达成交易。


基本流程:


  1. 接收订单:撮合系统接收买入和卖出的订单。

  2. 存储订单:订单通过事件溯源的方式被记录为事件。

  3. 撮合过程:系统根据一定的规则(如价格优先、时间优先)来匹配订单。

  4. 生成交易事件:成功匹配后,生成交易事件,并更新系统状态。


整体架构


这个架构图其实就是我原来写的一个事件溯源架构的架构图,撮合交易只是在这个架构上面的做的上层应用,直接拿过来用了。


https://github.com/654894017/cqrs


事件溯源在股票撮合中的应用


  1. 订单创建事件


当用户提交订单时,系统将生成一个订单创建事件,记录下订单的所有信息,包括订单类型、数量、价格等。这使得系统可以完整地记录所有的订单变化。


  1. 订单撮合事件


在订单成功匹配后,系统会生成一个订单撮合事件,记录下匹配的详细信息,包括成交价格和成交数量。


  1. 订单取消事件


如果用户取消订单,系统会记录下这个取消事件,以便于后续审计和状态回溯。


撮合引擎的实现


通过事件溯源,撮合引擎的实现变得更加灵活。只用了 400 行不到的代码就实现列: 集合竞价、市价单、限价单、取消订单的功能。


https://github.com/654894017/cqrs/tree/dev/cqrs-sample/cqrs-sample-generic-test/src/main/java/com/damon/cqrs/sample/trade_matching


以下是一个实现的核心代码,展示了如何处理订单创建和撮合事件。

package com.damon.cqrs.sample.trade_matching.domain.aggregate;
import com.damon.cqrs.domain.AggregateRoot;import com.damon.cqrs.sample.trade_matching.domain.cmd.*;import com.damon.cqrs.sample.trade_matching.domain.event.*;import lombok.Getter;import lombok.Setter;
import java.util.*;
@Getter@Setterpublic class Stock extends AggregateRoot { /** * 股票实时价格 */ private Long realtimePrice; /** * 一个档位的价格 */ private Long notchPrice; private Map<Long, Boolean> tradeMap = new HashMap<>(); /** * 先按价格档位降序, 在同档位内按下单时间升序, 类型: <price,<orderId, order>> */ private TreeMap<Long, TreeMap<Long, StockBuyOrder>> buyOrderMap = new TreeMap<>(Comparator.reverseOrder()); /** * 先按价格档位升序, 在同档位内按下单时间升序, 类型: <price,<orderId, order>> */ private TreeMap<Long, TreeMap<Long, StockSellOrder>> sellOrderMap = new TreeMap<>();
public Stock(Long id) { super(id); }
/** * 集合竞价(开盘价/收盘价都是通过此方法) * <p> * https://m.gelonghui.com/p/513097 * * @return */ public int callAuction(CallAuctionCmd cmd) { Map<Long, Long> sellPriceMap = new HashMap<>(); sellOrderMap.keySet().forEach(price -> { Long totalNumber = sellOrderMap.tailMap(price).values().stream().flatMap(treeMap -> treeMap.values().stream()) .mapToLong(StockSellOrder::getNumber).sum(); sellPriceMap.put(price, totalNumber); });
Map<Long, Long> buyPriceMap = new HashMap<>(); buyOrderMap.keySet().forEach(price -> { Long totalNumber = buyOrderMap.tailMap(price).values().stream().flatMap(treeMap -> treeMap.values().stream()) .mapToLong(StockBuyOrder::getNumber).sum(); buyPriceMap.put(price, totalNumber); });
TreeMap<Long, Long> maxTradeMap = new TreeMap<>(Comparator.reverseOrder()); sellPriceMap.forEach((price, totalNumber) -> { Long maxTradeNumber = Math.min(buyPriceMap.get(price), totalNumber); maxTradeMap.put(maxTradeNumber, price); }); Map.Entry<Long, Long> entry = maxTradeMap.firstEntry(); if (!maxTradeMap.isEmpty() && entry.getValue() != null) { applyNewEvent(new CallAuctionSucceedEvent(entry.getValue())); } return 0; }
/** * 市价单购买 * * @param cmd * @return */ public int buy(StockMarketBuyCmd cmd) { Integer remainingNumber = cmd.getNumber(); NavigableMap<Long, TreeMap<Long, StockSellOrder>> market5NotchMap = sellOrderMap.subMap( realtimePrice, true, realtimePrice + 5 * notchPrice, true ); LinkedHashSet<MarketOrderBoughtEvent.TradeOrder> tradeOrders = new LinkedHashSet<>(); MarketOrderBoughtEvent orderBoughtEvent = new MarketOrderBoughtEvent( cmd.getOrderId(), tradeOrders, cmd.getStockId(), cmd.getNumber(), cmd.getEntrustmentType() ); for (TreeMap<Long, StockSellOrder> sellOrders : market5NotchMap.values()) { for (StockSellOrder sellOrder : sellOrders.values()) { int sellOrderNumber = sellOrder.getNumber(); MarketOrderBoughtEvent.TradeOrder tradeOrder = new MarketOrderBoughtEvent.TradeOrder( sellOrder.getOrderId(), sellOrderNumber <= remainingNumber, Math.min(remainingNumber, sellOrderNumber), sellOrder.getPrice() ); tradeOrders.add(tradeOrder); remainingNumber -= tradeOrder.getNumber(); if (remainingNumber <= 0) { applyNewEvent(orderBoughtEvent); return 0; } } } if (tradeOrders.isEmpty()) { return -1; } applyNewEvent(orderBoughtEvent); return 0; }
/** * 市价单售卖 * * @param cmd * @return */ public int sell(StockMarketSellCmd cmd) { Integer remainingNumber = cmd.getNumber(); NavigableMap<Long, TreeMap<Long, StockBuyOrder>> market5NotchMap = buyOrderMap.subMap( realtimePrice + 5 * notchPrice, true, realtimePrice, true ); LinkedHashSet<MarketOrderSelledEvent.TradeOrder> tradeOrders = new LinkedHashSet<>(); MarketOrderSelledEvent orderSelledEvent = new MarketOrderSelledEvent( cmd.getOrderId(), tradeOrders, cmd.getStockId(), cmd.getNumber(), cmd.getEntrustmentType() );
for (TreeMap<Long, StockBuyOrder> buyOrders : market5NotchMap.values()) { for (StockBuyOrder buyOrder : buyOrders.values()) { int buyOrderNumber = buyOrder.getNumber(); MarketOrderSelledEvent.TradeOrder tradeOrder = new MarketOrderSelledEvent.TradeOrder( buyOrder.getOrderId(), buyOrderNumber <= remainingNumber, Math.min(remainingNumber, buyOrderNumber), buyOrder.getPrice() ); tradeOrders.add(tradeOrder); remainingNumber -= tradeOrder.getNumber(); if (remainingNumber <= 0) { applyNewEvent(orderSelledEvent); return 0; } } } if (tradeOrders.isEmpty()) { return -1; } applyNewEvent(orderSelledEvent); return 0; }
/** * 限价卖单 * * @param cmd * @return */ public int sell(StockSellCmd cmd) { Boolean isTrade = tradeMap.get(cmd.getOrderId()); if (isTrade == null) { applyNewEvent(new OrderSelleEntrustSucceedEvent(cmd.getOrderId(), cmd.getPrice(), System.nanoTime(), cmd.getNumber())); return 0; } else { return -1; } }
/** * 取消委托 * * @param cmd * @return */ public int cancel(StockOrderCancelCmd cmd) { Boolean isTrade = tradeMap.get(cmd.getOrderId()); if (isTrade == null) { return -1; } else { applyNewEvent(new OrderCancelledEvent(cmd.getOrderId(), cmd.isBuyOrder() ? 1 : 0, cmd.getPrice())); return 0; } }
/** * 限价买单 * * @param cmd * @return */ public int buy(StockBuyCmd cmd) { Boolean isTrade = tradeMap.get(cmd.getOrderId()); if (isTrade == null) { applyNewEvent(new OrderBuyEntrustSucceedEvent(cmd.getOrderId(), cmd.getPrice(), System.nanoTime(), cmd.getNumber())); return 0; } else { return -1; } }
/** * 撮合交易 * * @param cmd * @return */ public int match(StockOrderMatchCmd cmd) { if (buyOrderMap.isEmpty()) { return -1; } TreeMap<Long, StockBuyOrder> buyOrders = buyOrderMap.firstEntry().getValue(); if (buyOrders.isEmpty()) { return -1; } Map.Entry<Long, StockBuyOrder> buyOrderEntry = buyOrders.firstEntry(); if (buyOrderEntry == null) { return -1; } StockBuyOrder buyOrder = buyOrderEntry.getValue(); if (buyOrder == null) { return -1; } if (sellOrderMap.isEmpty()) { return -1; } TreeMap<Long, StockSellOrder> sellOrders = sellOrderMap.firstEntry().getValue(); if (sellOrders.isEmpty()) { return -1; } Map.Entry<Long, StockSellOrder> sellOrderEntry = sellOrders.firstEntry(); if (sellOrderEntry == null) { return -1; } StockSellOrder sellOrder = sellOrderEntry.getValue(); if (sellOrder == null) { return -1; } if (buyOrder.getPrice() < sellOrder.getPrice()) { return -1; } boolean isSellDone = sellOrder.getNumber() <= buyOrder.getNumber(); boolean isBuyDone = sellOrder.getNumber() >= buyOrder.getNumber(); applyNewEvent(new OrderSelledEvent(sellOrder.getPrice(), sellOrder.getOrderId(), sellOrder.getOriginalNumber(), Math.min(buyOrder.getNumber(), sellOrder.getNumber()), isSellDone )); applyNewEvent(new OrderBoughtEvent( getId(), buyOrder.getPrice(), sellOrder.getPrice(), buyOrder.getOrderId(), sellOrder.getOriginalNumber(), Math.min(buyOrder.getNumber(), sellOrder.getNumber()), isBuyDone )); return 0; }
private void apply(MarketOrderBoughtEvent event) { event.getTradeOrders().forEach(tradeOrder -> { TreeMap<Long, StockSellOrder> priceSellOrders = sellOrderMap.get(tradeOrder.getPrice()); if (tradeOrder.isDone()) { priceSellOrders.remove(tradeOrder.getSellerOrderId()); } else { StockSellOrder sellOrder = priceSellOrders.get(tradeOrder.getSellerOrderId()); sellOrder.subtract(tradeOrder.getNumber()); } }); if (event.isUndone() && event.isTransferLimitOrderEntrustment()) { TreeMap<Long, StockBuyOrder> stockBuyOrders = buyOrderMap.computeIfAbsent( realtimePrice, price -> new TreeMap<>() ); StockBuyOrder buyOrder = new StockBuyOrder(event.getOrderId(), realtimePrice, System.nanoTime(), event.undoneNumber()); stockBuyOrders.put(event.getOrderId(), buyOrder); tradeMap.put(event.getOrderId(), true); }
if (!event.getTradeOrders().isEmpty()) { MarketOrderBoughtEvent.TradeOrder tradeOrder = event.getTradeOrders().getLast(); this.realtimePrice = tradeOrder.getPrice(); } }
private void apply(MarketOrderSelledEvent event) { event.getTradeOrders().forEach(tradeOrder -> { TreeMap<Long, StockBuyOrder> priceSellOrders = buyOrderMap.get(tradeOrder.getPrice()); if (tradeOrder.isDone()) { priceSellOrders.remove(tradeOrder.getBuyerOrderId()); } else { StockBuyOrder buyOrder = priceSellOrders.get(tradeOrder.getBuyerOrderId()); buyOrder.subtract(tradeOrder.getNumber()); } });
if (event.isUndone() && event.isTransferLimitOrderEntrustment()) { TreeMap<Long, StockSellOrder> stockSellOrders = sellOrderMap.computeIfAbsent( realtimePrice, price -> new TreeMap<>() ); StockSellOrder sellOrder = new StockSellOrder(event.getOrderId(), realtimePrice, event.undoneNumber(), System.nanoTime()); stockSellOrders.put(event.getOrderId(), sellOrder); tradeMap.put(event.getOrderId(), true); }
if (!event.getTradeOrders().isEmpty()) { MarketOrderSelledEvent.TradeOrder tradeOrder = event.getTradeOrders().getLast(); this.realtimePrice = tradeOrder.getPrice(); } }
private void apply(OrderCancelledEvent event) { if (event.isBuyOrder()) { TreeMap<Long, StockBuyOrder> buyOrders = buyOrderMap.get(event.getPrice()); buyOrders.remove(event.getOrderId()); } else { TreeMap<Long, StockSellOrder> sellOrders = sellOrderMap.get(event.getPrice()); sellOrders.remove(event.getOrderId()); } }
private void apply(OrderSelledEvent event) { TreeMap<Long, StockSellOrder> stockSellOrders = sellOrderMap.get(event.getPrice()); if (event.isDone()) { stockSellOrders.remove(event.getOrderId()); } else { StockSellOrder sellOrder = stockSellOrders.get(event.getOrderId()); sellOrder.subtract(event.getTradingNumber()); } this.realtimePrice = event.getPrice(); }
private void apply(OrderBoughtEvent event) { TreeMap<Long, StockBuyOrder> stockBuyOrders = buyOrderMap.get(event.getEntrustPrice()); if (event.isDone()) { stockBuyOrders.remove(event.getOrderId()); } else { StockBuyOrder buyOrder = stockBuyOrders.get(event.getOrderId()); buyOrder.subtract(event.getTradingNumber()); } this.realtimePrice = event.getBuyPrice(); }
private void apply(OrderBuyEntrustSucceedEvent event) { TreeMap<Long, StockBuyOrder> stockBuyOrders = buyOrderMap.computeIfAbsent( event.getPrice(), price -> new TreeMap<>() ); StockBuyOrder buyOrder = new StockBuyOrder(event.getOrderId(), event.getPrice(), event.getCreateTime(), event.getNumber()); stockBuyOrders.put(buyOrder.getOrderId(), buyOrder); tradeMap.put(buyOrder.getOrderId(), true); }
private void apply(OrderSelleEntrustSucceedEvent event) { TreeMap<Long, StockSellOrder> stockSellOrders = sellOrderMap.computeIfAbsent( event.getPrice(), price -> new TreeMap<>() ); StockSellOrder sellOrder = new StockSellOrder(event.getOrderId(), event.getPrice(), event.getNumber(), event.getCreateTime()); stockSellOrders.put(sellOrder.getOrderId(), sellOrder); tradeMap.put(sellOrder.getOrderId(), true); }
private void apply(CallAuctionSucceedEvent event) { this.realtimePrice = event.getPrice(); }}
复制代码

事件溯源的优势


  1. 历史追溯:由于所有状态变化都以事件的形式存储,用户可以随时回溯至任意时刻的状态。

  2. 高可扩展性:新增功能时,可以通过引入新的事件类型,而不需要改变已有的模型。

  3. 易于审计:所有的交易和操作都有记录,可以满足监管需求。


几个要说明的地方


市面上很多开源的撮合引擎并未对一下以下几点进行说明,又或者根本没有实现。我觉得以下几个问题应该是大家最关心的问题。


  1. 全内存撮合如何保证机器断电不丢失数据


撮合产生的交易 Event 信息,是采用批量组提交的模式进行入库到数据库磁盘中的,即使断电了也可以通过磁盘中的 Event 恢复的最新的状态。


  1. 集群部署如何保证一个股票只在一台机器上完成撮合


这个比较简单,我们可以通过负载均衡策略,根据股票的 ID 进行负载,保证同一支股票只会在一台机器处理。可以在网关层面做,也可以使用 dubbo 的软负载均衡,使用一致性 Hash 实现。


  1. 机器扩容、缩容时如何保证不冲突


当撮合引擎在扩容、缩容的时候,存在一个股票聚合根在多个服务存在的可能性,这就可能导致存在更新冲突的问题。解决的思路比较简单,就是我们产生的 Event 都带有 Version 版本号字段。同时 Event 存储表根据股票 ID +  事件版本号做了唯一索引。当对股票聚合根在多台机器出现的时候,就会有产生 Event 版本号冲突的问题,所以在 Event 入库的时候,只要出现唯一索引冲突,就代表了出现了更新冲突。当出现了冲突了,我们只需要在服务内部重新进行一次事件回溯,就可以保证当前的股票聚合根恢复的最新的状态。同时抛出异常状态码给到调用者交由调用者重新进行请求。


  1. 撮合性能


并没有较好的服务器,用的苹果 M1 + 16G 的开发机进行测试,事件存储用的 MYSQL,都部署在这台机器上。4 只股票的同时进行撮合的性能是 2.5w/s,换好一点的服务器单机干到 10w/s 还是很轻松的。


  1. 高性能底层实现原理


1.单线程模型处理聚合根。(可以理解为 REDIS 的单线程那样)


2.事件批量组提交模式。(符合磁盘顺序写的特征)


3.全内存处理,每一支股票的信息都是存储在内存,操作都是通过内存进行的。所有的结果都通过 Event 传递出去。


总结


通过事件溯源构建股票撮合交易系统,不仅提高了系统的灵活性和可维护性,还能确保数据的完整性和一致性。随着技术的不断发展,事件溯源的应用将会在金融领域变得愈发重要。希望本文能够为您提供一些关于如何实现股票撮合交易的思路和灵感。


https://github.com/654894017/cqrs/tree/dev/cqrs-sample/cqrs-sample-generic-test/src/main/java/com/damon/cqrs/sample/trade_matching


发布于: 刚刚阅读数: 3
用户头像

Damon

关注

还未添加个人签名 2020-04-26 加入

还未添加个人简介

评论

发布
暂无评论
如何通过事件溯源实现百万TPS全内存撮合交易引擎_事件溯源_Damon_InfoQ写作社区