写点什么

如何进行秒杀场景下的异步下单实现

  • 2023-01-15
    湖南
  • 本文字数:5041 字

    阅读完需:约 17 分钟

如何进行秒杀场景下的异步下单实现

背景异步削峰,目的是削峰,方式是异步。面对瞬时压力,都需要异步削峰,其关键都在于拉长时间线,削平毛刺,最终整体提升吞吐量。核心流程


提交下单任务:用户发出下单请求后,先要获取订单许可,若获取成功才能将下单任务提交给消息队列进行处理;下单任务轮询:异步处理,不会立刻返回结果。所以用户需要一定频率来轮询处理结果;实现原理下单许可所谓下单许可,指的是在秒杀品开始秒杀前,根据秒杀品的库存配置特定比例(可调节) 的下单许可,只有获得下单许可的用户才能提交下单任务。个例子,假如秒杀品 A 有库存 10000 个,那么我们可以将秒杀许可设定为 10000*1.2 或 10000**1.5。


降低不必要的资源竞争与浪费。限制订单任务放入消息队列,如果不设置 12000 的下单许可,则可能有 10,000 甚至 100,0000 的用户将请求提交到任务队列,但是我们只有区区 12000 的库存,这会给队列和相关计算方造成巨大压力。提升用户体验。有了下单许可之后,当许可被抢完的时候,我们即可立即向用户展示“售罄”或“暂无库存”等更为友好且贴近事实的提示,不存在忽悠,是真没有库存了。确保所有库存可以卖出。按照 1.2 或 1.5 等比例设置高出库存数量的下单许可,就是为了预留一定的 Buffer,允许一些无效提交但不会影响整体售卖。


提交下单任务异步处理过程中,提交下单任务是第一步,注意事项:


提交下单任务之前,应通过基础且必要的账号、安全相关的校验;下单要加锁,防止抖动、连续点击等导致用户层面出现重复提交问题;所提交的下单任务,应该具有明确且唯一的编码以便于跟踪,即 placeOrderTaskId,当用户获得提交许可时,应向用户提供 placeOrderTaskId 用以后续的结果轮询;同一用户、同一秒杀品,不应出现重复提交(可以通过 placeOrderTaskId 判断) ;下单许可的设计应结合本地缓存+中心化缓存,以降低网络请求负载并提高处理效率;在处理本地缓存和中心化缓存时,要着重注意过期时间的设置和更新时的锁竞争问题;


生成 placeOrderTaskId 代码通过 userId 和 itemId 生成 placeOrderTaskId 可以保证同一用户只能购买一个商品/**


  • 订单任务 id

  • @param userId

  • @param itemId

  • @return*/private String generatePlaceOrderTaskId(Long userId, Long itemId) {String toEncrypt = userId + "_" + itemId;return DigestUtils.md5DigestAsHex(toEncrypt.getBytes());}复制代码生成下单许可代码结合本地缓存 + 分布式缓存的实现 private final static Cache<Long, Integer> availableOrderTokensLocalCache = CacheBuilder.newBuilder().initialCapacity(20).concurrencyLevel(5).expireAfterWrite(20, TimeUnit.MILLISECONDS).build();


// 从本地获取 orderTokenprivate Integer getAvailableOrderTokens(Long itemId) {Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);if (localAvailableToken != null) {logger.info("本地缓存命中|{}", itemId);return localAvailableToken;}


return refreshLocalAvailableTokens(itemId);
复制代码


}


// 从本地获取远程的缓存并更新 private synchronized Integer refreshLocalAvailableTokens(Long itemId) {// 再次从本地缓存获取 Integer localAvailableToken = localAvailableTokens.getIfPresent(itemId);if (localAvailableToken != null) {logger.info("本地缓存命中|{}", itemId);return localAvailableToken;}


String goodAvailableTokensKey = getGoodAvailableTokensKey(itemId);Integer goodAvailableTokensInRedis = redisCacheService.getInteger(goodAvailableTokensKey);if (goodAvailableTokensInRedis != null) {  logger.info("远程缓存命中|{}", itemId);  localAvailableTokens.put(itemId, goodAvailableTokensInRedis);  return goodAvailableTokensInRedis;}
return refreshLatestAvailableTokens(itemId);
复制代码

}


// 获取最新的库存信息 private Integer refreshLatestAvailableTokens(Long itemId) {DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(getRefreshTokensLockKey(itemId));try {


  boolean isSuccessLock = distributedLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);  if (!isSuccessLock) {    return null;  }
GoodStockCache availableItemStock = goodStockCacheService.getAvailableItemStock(-1L, itemId); if (availableItemStock == null || availableItemStock.getAvailableStock() == null || !availableItemStock.isSuccess()) { logger.info("库存不存在|{}", itemId); return null; }
Integer availableOrderTokens = (int) Math.ceil(availableItemStock.getAvailableStock() * 1.5); // 更新远程的下单许可 redisCacheService.put(getGoodAvailableTokensKey(itemId), availableOrderTokens, 24, TimeUnit.HOURS); // 存入本地缓存 localAvailableTokens.put(itemId, availableOrderTokens); return availableOrderTokens;} catch (InterruptedException e) { logger.info("刷新tokens失败|{}", itemId, e); return null;}
复制代码


}复制代码扣减下单许可代码使用 Lua 代码扣减 远程缓存中存储的 许可数量(本地缓存保存的时间非常短,基本都会到分布式缓存中读取)if (redis.call('exists', KEYS[1]) == 1) thenlocal availableTokensCount = tonumber(redis.call('get', KEYS[1]))if availableTokensCount == 0 thenreturn -1endif availableTokensCount > 0 thenredis.call('incrby', KEYS[1], -1)return 1endend


return -100 复制代码使用 Java 代码多次调用 Lua 脚本为了保证可用下单许可数量的有效性,我们给下单许可设置了过期时间,这会导致在执行 LUA 脚本时数据不存在,所以为了应对这种情况,在数据不存在时当前线程会主动尝试刷新数据,然后继续执行 LUA 脚本。也就是说,当用户抢到了下单许可但是下单失败或取消订单时,系统会定时对数据进行纠正,腾出来空余的许可给后面需要的用户,确保所有库存均可对外销售// 扣减许可 private boolean takeOrRecoverToken(PlaceOrderTask placeOrderTask) {ArrayList<String> keys = new ArrayList<>();keys.add(getGoodAvailableTokensKey(placeOrderTask.getItemId()));


for (int i = 0; i < 3; i++) {  Long result = redisCacheService.getRedisTemplate().execute(TAKE_ORDER_TOKEN_LUA, keys);  if (result == null) {    return false;  }  // 没有库存  if (result == -1L) {    logger.info("库存为0|{}", JSON.toJSONString(placeOrderTask));    return false;  }  // 数据在缓存中不存在,先去更新  if (result == -100L) {    refreshLatestAvailableTokens(placeOrderTask.getItemId());    continue;  }  return result == 1L;}return false;
复制代码


}复制代码异步处理下单任务通过异步提交到 MQ 中的订单,MQ 会根据订阅将消息推送给订阅方处理。


异步处理下单任务时,应先确定 TOPIC 和订阅关系;任务处理结果存储到缓存中,方便客户端轮询;任务处理成功后,将订单 ID 返回给客户端,方便查看订单详情;


异步发送消息使用 RabbitMQ 发送消息 @Overridepublic boolean post(PlaceOrderTask placeOrderTask) {logger.info("投递下单任务|{}", JSON.toJSONString(placeOrderTask));if (placeOrderTask == null) {logger.info("下单任务参数为空");return false;}String placeOrderTaskString = JSON.toJSONString(placeOrderTask);


try {  rabbitTemplate.convertAndSend(MqConfig.SECKILL_EXCHANGE_NAME, MqConfig.SECKILL_ROUTING_KEY, placeOrderTaskString);  logger.info("OrderTaskPostServiceImpl|任务投递成功|{}", placeOrderTaskString);  return true;} catch (AmqpException e) {  logger.info("OrderTaskPostServiceImpl|任务投递失败|{}", placeOrderTaskString);  return false;}
复制代码


}复制代码库存扣减异步下单不存在库存扣减逻辑,由于异步并发可控,将直接在数据库层面进行竞争扣减。任务处理结束后,会将结果写入到缓存中以供查询。/**


  • 处理下单任务

  • @param placeOrderTask*/@Transactionalpublic void handlePlaceOrderTask(PlaceOrderTask placeOrderTask) {Long userId = placeOrderTask.getUserId();

  • SeckillGoodResponse seckillGoodResponse = seckillGoodClient.getSeckillGood(userId, placeOrderTask.getActivityId(), placeOrderTask.getItemId()).getData();// 构造 实体类 SeckillOrder seckillOrder = SeckillOrderBuilder.toDomain(placeOrderTask);seckillOrder.setItemTitle(seckillGoodResponse.getItemTitle());seckillOrder.setFlashPrice(seckillGoodResponse.getFlashPrice());seckillOrder.setUserId(userId);

  • StockDeduction stockDeduction = new StockDeduction().setItemId(placeOrderTask.getItemId()).setUserId(userId).setQuantity(seckillOrder.getQuantity());Long orderId = null;try {

  • } catch (Exception e) {// 扣减成功了才恢复 logger.error("下单失败|{},{}", userId, JSON.toJSONString(placeOrderTask), e);placeOrderTaskService.updateTaskHandleResult(placeOrderTask.getPlaceOrderTaskId(), false);throw new BusinessException(ErrorCode.PLACE_ORDER_FAILED);}}复制代码客户端轮询结果用户提交下单任务后,会返回任务 ID,随后就可以通过任务 ID 来查询该任务的结果:


初始提交: SUBMITTED,即尚未处理;下单成功:SUCCESS,完成处理并成功下单入库;下单失败:FAILED,完成处理但下单失败;任务不存在:错误的任务 ID 或任务缓存已过期被删除。


/** * 获取订单结果 * @param userId * @param itemId * @param placeOrderTaskId * @return */public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(Long userId, Long itemId, String placeOrderTaskId) {    String orderTaskId = generatePlaceOrderTaskId(userId, itemId);    if (!orderTaskId.equals(placeOrderTaskId)) {        logger.info("下单ID错误|{}, {}, {}", userId, itemId, placeOrderTaskId);        return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID);    }
// 获取订单的状态 OrderTaskStatus taskStatus = placeOrderTaskService.getTaskStatus(placeOrderTaskId); if (taskStatus == null) { logger.info("任务状态为空|{}", placeOrderTaskId); return ResultUtils.error(ErrorCode.PLACE_ORDER_TASK_ID_INVALID); }
if (!taskStatus.getStatus().equals(OrderTaskStatus.SUCCESS.getStatus())) { logger.info("订单任务尚未成功|{}", placeOrderTaskId); return ResultUtils.success(SeckillOrderMessageResponse.ok().setCode(taskStatus.getStatus())); } // 获取订单号 Long orderId = redisCacheService.getLong(PLACE_ORDER_TASK_ORDER_ID_KEY + placeOrderTaskId); if (orderId == null) { logger.info("订单id尚未存在|{}, {}, {}", userId, itemId, placeOrderTaskId); return null; } // 封装对象 SeckillOrderMessageResponse seckillOrderMessageResponse = SeckillOrderMessageResponse.ok().setOrderId(orderId) .setPlaceOrderTaskId(orderTaskId) .setCode(OrderTaskStatus.SUCCESS.getStatus()); return ResultUtils.success(seckillOrderMessageResponse);}
复制代码


复制代码提供接口实现轮询在接口方面,本次方案仅增加了任务结果轮询接口。需要注意的是,该接口用于轮询结果,且没有复杂计算和数据状态变更,在限流方面的阈值可以相对调高些。/**


  • 获取订单结果

  • @param userId

  • @param itemId

  • @param placeOrderTaskId

  • @return*/@GetMapping("/result/{itemId}/{placeOrderTaskId}")public BaseResponse<SeckillOrderMessageResponse> getPlaceOrderResult(@RequestHeader("TokenInfo") Long userId,@PathVariable Long itemId,@PathVariable String placeOrderTaskId) {return seckillOrderService.getPlaceOrderResult(userId, itemId, placeOrderTaskId);}}复制代码总结单异步削峰的目标,即提升用户体验和提升系统吞吐能力。在核心流程上,我们将异步下单过程拆分为三个环节:提交下单任务、处理下单任务和轮询下单结果。


用户头像

欢迎关注公众号:灵风的架构笔记 2022-10-21 加入

Java后端架构领域丨面题面经丨学习路线丨职业规划丨专业知识丨互联网资讯等分享

评论

发布
暂无评论
如何进行秒杀场景下的异步下单实现_Java_风铃架构日知录_InfoQ写作社区